Behaviours: gen_fsm.
This module defines the riak_pipe_vnode_worker behaviour.
Required callback functions: init/2, process/3, done/1.
Basic worker process implementation, to be parameterized with fitting implementation module.
Modules that implement this behavior need to export at least three functions:
init(Partition :: riak_pipe_vnode:partition(), FittingDetails :: riak_pipe_fitting:details()) -> {ok, ModuleState :: term()}
The init/2
function is called when the worker starts. The
module should do whatever it needs to get ready before
processing inputs. The module will probably also want to
store the Parition
and FittingDetails
arguments in its
state, as it will need them to send outputs later. The
ModuleState
returned from this function will be passed to
the process/3
function later.
process(Input :: term(), LastInPreflist :: boolean(), ModuleState :: term()) -> {ok, NewModuleState :: term()} |forward_preflist.
The process/3
function is called once for each input
delivered to the worker. The module should do whatever
processing is needed, sending outputs if appropriate. The
NewModuleState
returned from process/3
will be passed back
in on the next input.
LastInPreflist
is an indicator as to whether this worker is
the last in the partition preflist for this input. When this
parameter is false
, the function may return the atom
forward_preflist
to have the input sent to the next vnode in
the prefence list. When is parameter is true
, returning
forward_preflist
will cause an error trace message to be
generated, with the reason preflist_exhausted
.
done(ModuleState :: term()) -> ok.
The done/1
function is called when all inputs have been
processed, and the end-of-inputs flag has been received from
the fitting. The module should clean up any resources it
needs to clean up, and send any outputs it still needs to
send. When done/1
returns, the worker will terminate.
There are also four optional functions that a worker behavior module can export:
validate_arg(Arg :: term()) -> ok | {error, Reason :: iolist()}.
The validate_arg/1
function is called before a pipeline is
constructed. If the behavior module exports this function,
then it will be evaluated on the value of the arg
field of a
#fitting_spec{}
record that points to this module. If the
argument is valid, this function should return the atom ok
.
If the argument is invalid, the function should return an
error tuple, with the Reason being a printable iolist.
archive(ModuleState :: term()) -> {ok, Archive :: term()}.
The archive/1
function is called when the vnode that owns
this worker is being handed off to another node. The worker
should produce some erlang term that represents its state.
This Archive
term will be passed to the handoff/2
function
of the module, by the worker running on the handoff target.
handoff(Archive :: term(), ModuleState :: term()) -> {ok, NewModuleState :: term()}.
The handoff/2
function is called when a vnode receives a
handoff archive from another vnode. The module should "merge"
the Archive
with its ModuleState
(in whatever sense
"merge" may mean for this fitting), and return the resulting
NewModuleState
.
no_input_run_reduce_once() -> boolean().
If present and returns true
, then in the case that a fitting has
no input (as measured by having zero workers), then a "fake" worker
will be started for the express purpose of running its computation
once and sending some output downstream. Right now, the only
fitting that needs this feature is riak_kv_w_reduce.erl, which
needs the capability to run its reduce function once (with input of
an empty list) in order to maintain full compatibility with Riak
KV's Map/Reduce.
true
, the
#fitting_details.options
property list will contain the property
pipe_fitting_no_input
to indicate that the fitting has no input.
abstract datatype: state()
behaviour_info/1 | Get information about this behavior. |
code_change/4 | Unused. |
handle_event/3 | Unused. |
handle_info/3 | Unused. |
handle_sync_event/4 | Unused. |
init/1 | Initialize the worker. |
initial_input_request/2 | The worker has just started, and should request its first input from its owning vnode. |
recurse_input/3 | Equivalent to recurse_input(Input, FromPartition, Details, noblock). |
recurse_input/4 | Send a new input from this fitting, to itself. |
send_archive/1 | Ask the worker to archive itself. |
send_handoff/2 | Ask the worker to merge handoff data from an archived worker. |
send_input/2 | Send input to the worker. |
send_output/3 | Equivalent to send_output(Output, FromPartition, Details, infinity). |
send_output/4 | Send output from the given fitting to the next output down the line. |
send_output/5 | Send output from the given fitting to a specific fitting. |
start_link/3 | Start a worker for the specified fitting+vnode. |
terminate/3 | Unused. |
wait_for_input/2 | The worker has requested its next input item, and is waiting for it. |
behaviour_info(Other::atom()) -> undefined | [{atom(), arity()}]
Get information about this behavior.
code_change(OldVsn::term(), StateName::atom(), State::state(), Extra::term()) -> {ok, atom(), state()}
Unused.
Unused.
Unused.
handle_sync_event(Event::term(), From::term(), StateName::atom(), State::state()) -> {reply, ok, atom(), state()}
Unused.
init(X1::[riak_pipe_vnode:partition() | pid() | riak_pipe_fitting:details()]) -> {ok, initial_input_request, state(), 0} | {stop, {init_failed, term(), term()}}
Initialize the worker. This function calls the implementing
module's init function. If that init function fails, the
worker stops with an {init_failed, Type, Error}
reason.
The worker has just started, and should request its first input from its owning vnode. This is done after a zero timeout instead of in the init function to get around the deadlock that would result from having the worker wait for a message from the vnode, which is waiting for a response from this process.
recurse_input(Input, FromPartition, Details) -> any()
Equivalent to recurse_input(Input, FromPartition, Details, noblock).
recurse_input(Input::term(), FromPartition::riak_pipe_vnode:partition(), Details::riak_pipe_fitting:details(), Timeout::riak_pipe_vnode:qtimeout()) -> ok | {error, term()}
Send a new input from this fitting, to itself. This can be used to write fittings that perform recursive calculation, where steps in the recursion might be done in parallel.
For example, when walking intermediate tree nodes, using
recurse_input/3
to send children to other vnodes, instead of
processing them in the same worker, may be a useful strategy.
WARNING: Using recurse_input with a Timeout
of infinity
is
discouraged, unless you can guarantee that the queues for a
fitting will never be full. Otherwise, it's possible to
deadlock a fitting by blocking on enqueueing an input for a
worker that is blocking on enqueueing an input for the sender
(circular blocking). Use noblock
and handle timeout
failures to prevent deadlock.
done
messages. So, the vnode that
enqueues this input will still be able to ask the fitting for
details, and the fitting will know that it has to wait on that
vnode.
send_archive(WorkerPid::pid()) -> ok
Ask the worker to archive itself. The worker will send the archive data to the owning vnode when it has done so. Once it has sent the archive, the worker shuts down normally.
send_handoff(WorkerPid::pid(), Archive::term()) -> ok
Ask the worker to merge handoff data from an archived worker. Note: this should only be called by the vnode that owns the worker, as the result of the worker asking for its next input when the vnode has received handoff data for the worker's fitting.
send_input(WorkerPid::pid(), Input::done | {term(), riak_core_apl:preflist()}) -> ok
Send input to the worker. Note: this should only be called by the vnode that owns the worker, as the result of the worker asking for its next input.
send_output(Output, FromPartition, Details) -> any()
Equivalent to send_output(Output, FromPartition, Details, infinity).
send_output(Output::term(), FromPartition::riak_pipe_vnode:partition(), Fitting_details::riak_pipe_fitting:details(), Timeout::riak_pipe_vnode:qtimeout()) -> ok | {error, term()}
Send output from the given fitting to the next output down the
line. FromPartition
is used in the case that the next
fitting's partition function is follow
.
send_output(Output::term(), FromPartition::riak_pipe_vnode:partition(), Details::riak_pipe_fitting:details(), FittingOverride::riak_pipe:fitting(), Timeout::riak_pipe_vnode:qtimeout()) -> ok | {error, term()}
Send output from the given fitting to a specific fitting.
This is most often used to send output to the sink, but also
happens to be the internal implementation of send_output/3
.
start_link(Partition::riak_pipe_vnode:partition(), VnodePid::pid(), FittingDetails::riak_pipe_fitting:details()) -> {ok, pid()} | ignore | {error, term()}
Start a worker for the specified fitting+vnode.
terminate(Reason::term(), StateName::atom(), State::state()) -> ok
Unused.
wait_for_input(X1::{input, done | {term(), riak_core_apl:preflist()}} | {handoff, term()} | archive, State::state()) -> {next_state, wait_for_input, state()} | {stop, normal, state()}
The worker has requested its next input item, and is waiting for it.
If the input is done
, due to end-of-inputs from the fitting,
then the implementing module's done
function is evaluated,
the the worker terminates normally.
If the input is any regular input, then the implementing
module's process
function is evaluated. When it finishes,
the next input is requested from the vnode.
If the input is a handoff from another vnode, the worker asks the implementing module to merge the archive, if the worker exports that functionality.
If the input is a request to archive, the worker asks the implementing module to archive itself, if the worker exports that functionality. When the archiving process has finished, the worker terminates normally.Generated by EDoc, Nov 21 2012, 15:51:29.