Module riak_pipe_vnode

The vnode, where the queues live.

Behaviours: riak_core_vnode.

Description

The vnode, where the queues live.

Data Types

chash()

chash() = chash:index()

chashfun()

chashfun() = {Module::atom(), Function::atom()} | chash() | follow | sink | fun((term()) -> chash())

1.0.x compatibility

nval()

nval() = pos_integer() | {Module::atom(), Function::atom()} | fun((term()) -> pos_integer())

1.0.x compatibility

partition()

partition() = non_neg_integer()

qerror()

qerror() = worker_limit_reached | worker_startup_failed | timeout | forwarding | preflist_exhausted

qtimeout()

qtimeout() = noblock | infinity

sender()

sender() = {sender_type(), reference(), pid()} | {server, undefined, undefined} | {fsm, undefined, pid()} | ignore

sender_type()

sender_type() = fsm | server | raw

state()

abstract datatype: state()

Function Index

delete/1Unused.
encode_handoff_item/2Produce a binary representing the worker data to handoff.
eoi/2Send end-of-inputs for a fitting to a vnode.
handle_command/3Handle a vnode command.
handle_coverage/4Coverage requests may be used to enqueue identical work on multiple vnodes.
handle_exit/3Handle an 'EXIT' message from a worker process.
handle_handoff_command/3Handle a handoff command.
handle_handoff_data/2Accept handoff data from some other node.
handle_info/2Handle a 'DOWN' message from a fitting process.
handoff_cancelled/1Stop handing off before getting started.
handoff_finished/2Note that handoff has completed.
handoff_starting/2Be prepared to handoff.
hash_for_partition/1Produce a hash value in the range handled by the given partition.
init/1Initialize the vnode.
is_empty/1Determine whether this vnode has any running workers.
next_input/2Request the next input from the queue for the given fitting from a vnode.
queue_work/2Equivalent to queue_work(Fitting, Input, infinity).
queue_work/3Equivalent to queue_work(Fitting, Input, Timeout, []).
queue_work/4Queue the given Input for processing by the Fitting.
queue_work/5Queue the given Input for processing the the Fitting on the vnode specified by Hash.
reply_archive/3Send the result of archiving a worker to the vnode that owns that worker.
start_vnode/1Start the vnode, if it isn't started already.
status/1Get some information about the worker queues on this vnode.
status/2Produces the same type of data as status/1, but only includes information for the fittings given.
terminate/2Unused.

Function Details

delete/1

delete(State::state()) -> {ok, state()}

Unused.

encode_handoff_item/2

encode_handoff_item(Fitting::riak_pipe:fitting(), X2::{queue(), queue(), term()}) -> binary()

Produce a binary representing the worker data to handoff.

eoi/2

eoi(Pid::pid(), Fitting::riak_pipe:fitting()) -> ok

Send end-of-inputs for a fitting to a vnode. Note: this should only be called by riak_pipe_fitting processes. This will cause the vnode to shutdown the worker, dispose of the queue, and send a done to the fitting, once the queue is emptied.

handle_command/3

handle_command(Cmd_enqueue::term(), Sender::sender(), State::state()) -> {reply, term(), state()} | {noreply, state()}

Handle a vnode command.

handle_coverage/4

handle_coverage(Request::term(), FilterVNodes::term(), Sender::sender(), State::state()) -> {reply, ok, state()}

Coverage requests may be used to enqueue identical work on multiple vnodes. Input is delivered to the worker as {cover, FilterVNodes, Input}.

handle_exit/3

handle_exit(Pid::pid(), Reason::term(), State::state()) -> {noreply, state()}

Handle an 'EXIT' message from a worker process.

If the worker died normally after receiving end-of-inputs and emptying its queue, send done to the fitting, and remove the worker's entry in the vnodes list.

If the worker died abnormally, attempt to restart it.

handle_handoff_command/3

handle_handoff_command(Riak_core_fold_req_v1::term(), Sender::sender(), State::state()) -> {reply, term(), state()} | {noreply, state()} | {forward, state()}

Handle a handoff command.

handle_handoff_data/2

handle_handoff_data(Data::binary(), State::state()) -> {reply, ok | {error, term()}, state()}

Accept handoff data from some other node. Data should be a term_to_binary-ed #worker_handoff{} record. See encode_handoff_item/2.

Ensure that a worker is running for the fitting, merge queues, and prepare to handle archive transfer.

handle_info/2

handle_info(X1::term(), State::state()) -> {ok, state()}

Handle a 'DOWN' message from a fitting process. Kill the worker associated with that fitting and dispose of its queue.

handoff_cancelled/1

handoff_cancelled(State::state()) -> {ok, state()}

Stop handing off before getting started.

handoff_finished/2

handoff_finished(TargetNode::node(), State::state()) -> {ok, state()}

Note that handoff has completed.

handoff_starting/2

handoff_starting(TargetNode::node(), State::state()) -> {true, state()}

Be prepared to handoff.

hash_for_partition/1

hash_for_partition(I::partition()) -> chash()

Produce a hash value in the range handled by the given partition. Used to support the follow chashfun.

init/1

init(X1::[partition()]) -> {ok, state()}

Initialize the vnode. This function validates the limits set in the application environment, and starts the worker supervisor.

Two application environment variables matter to the vnode:
worker_limit
Positive integer, default 50. The maximum number of workers allowed to operate on this vnode.
worker_queue_limit
Positive integer, default 4096. The maximum length of each worker's input queue. The actual cap for a fitting's queue is the lesser of this number and the q_limit specified in the startup spec.

is_empty/1

is_empty(State::state()) -> {boolean(), state()}

Determine whether this vnode has any running workers.

next_input/2

next_input(Pid::pid(), Fitting::riak_pipe:fitting()) -> ok

Request the next input from the queue for the given fitting from a vnode. Note: this should only be called by the worker process for that fitting-vnode pair. This will cause the vnode to send the next input to the worker process for this fitting.

queue_work/2

queue_work(Fitting, Input) -> any()

Equivalent to queue_work(Fitting, Input, infinity).

queue_work/3

queue_work(Fitting, Input, Timeout) -> any()

Equivalent to queue_work(Fitting, Input, Timeout, []).

queue_work/4

queue_work(Fitting::riak_pipe:fitting(), Input::term(), Timeout::qtimeout(), UsedPreflist::riak_core_apl:preflist()) -> ok | {error, [qerror()]}

Queue the given Input for processing by the Fitting. This function handles getting the input to the correct vnode by evaluating the fitting's consistent-hashing function (chashfun) on the input.

queue_work/5

queue_work(Fitting::riak_pipe:fitting(), Input::term(), Timeout::qtimeout(), UsedPreflist::riak_core_apl:preflist(), Hash::chash()) -> ok | {error, [qerror()]}

Queue the given Input for processing the the Fitting on the vnode specified by Hash. This version of the function is used to support the follow chashfun, by allowing a worker to send the input directly to the vnode it works for.

Timeout may be any of the following:
infinity
Never timeout. Wait as long as necessary to get the input in the queue.
noblock
Timeout if the vnode cannot immediately queue the input. noblock will wait as long as necessary for a response from the vnode, but will direct the vnode not to block the request if the queue is full.

reply_archive/3

reply_archive(Pid::pid(), Fitting::riak_pipe:fitting(), Archive::term()) -> ok

Send the result of archiving a worker to the vnode that owns that worker. Note: this should only be called by the worker being archived. This will cause the vnode to send that worker's queue and archive to its handoff partner when instructed to do so.

start_vnode/1

start_vnode(I::partition()) -> {ok, pid()}

Start the vnode, if it isn't started already.

status/1

status(Pid::pid()) -> {partition(), [[{atom(), term()}]]}

Get some information about the worker queues on this vnode. The result is a tuple of the form {PartitionNumber, [WorkerProplist]}. Each WorkerProplist contains tagged tuples, such as:

fitting
The pid of the fitting the worker implements.
name
The name of the fitting.
module
The module that implements the fitting.
state
The state of the worker. One of working, waiting, init.
inputs_done
Boolean: true if eoi has been delivered for this fitting, false otherwise.
queue_length
Integer number of items in the worker's queue.
blocking_length
Integer number of requests blocking on the queue.
started
An erlang:now/0 tuple, indicating the time that the worker started.
processed
Integer number of inputs that the worker has processed.
failures
Integer number of times that the worker has failed (and was restarted).
work_time
Total time the worker has spent processing inputs (as opposed to waiting, idle for them). Given as an integer number of microseconds.
idle_time
Total time the worker has spent waiting for inputs (as opposed to working on them). Given as an integer number of microseconds. Should be roughly equal to (now()-started)-work_time.

status/2

status(Pid::pid(), Fittings::[#fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}] | all) -> {partition(), [[{atom(), term()}]]}

Produces the same type of data as status/1, but only includes information for the fittings given.

terminate/2

terminate(Reason::term(), State::state()) -> ok

Unused.


Generated by EDoc, Nov 19 2012, 04:53:20.