Module riak_pipe_fitting

The process that hold the details for the fitting.

Behaviours: gen_fsm.

Description

The process that hold the details for the fitting. This process also manages the end-of-inputs synchronization for this stage of the pipeline.

Data Types

details()

details() = #fitting_details{fitting = undefined | #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}, name = undefined | term(), module = undefined | atom(), arg = undefined | term(), output = undefined | #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}, options = undefined | riak_pipe:exec_opts(), q_limit = undefined | pos_integer()}

state()

abstract datatype: state()

Function Index

code_change/4Unused.
eoi/1Send an end-of-inputs message to the specified fitting process.
format_name/1Coerce a fitting name into a printable string.
get_details/2Request the details about this fitting.
handle_event/3Unused.
handle_info/3The non-gen_fsm message that this process expects is 'DOWN'.
handle_sync_event/4The only sync event handled in all states is workers, which retrieves a list of ring partition indexes that have requested this fittings details (i.e.
init/1Initialize the fitting process.
start_link/4Start the fitting, according to the Spec given.
terminate/3Unused.
validate_fitting/1Ensure that a fitting specification is valid.
wait_upstream_eoi/2The fitting is just hanging out, serving details and waiting for end-of-inputs.
wait_upstream_eoi/3The fitting is just hanging out, serving details and waiting for end-of-inputs.
wait_workers_done/3The fitting has forwarded the end-of-inputs signal to all of the vnodes working for it, and is waiting for done responses.
worker_done/1Tell the fitting that this worker is done.
workers/1Get the list of ring partition indexes (vnodes) that are doing work for this fitting.

Function Details

code_change/4

code_change(OldVsn::term(), StateName::atom(), State::state(), Extra::term()) -> {ok, atom(), state()}

Unused.

eoi/1

eoi(Fitting::riak_pipe:fitting()) -> ok

Send an end-of-inputs message to the specified fitting process.

format_name/1

format_name(Name::term()) -> iolist()

Coerce a fitting name into a printable string.

get_details/2

get_details(Fitting::riak_pipe:fitting(), Partition::riak_pipe_vnode:partition()) -> {ok, details()} | gone

Request the details about this fitting. The ring partition index of the vnode requesting the details is included such that the fitting can inform the vnode of end-of-inputs later. This function assumes that it is being called from the vnode process, so the self() can be used to give the fitting a pid to monitor.

handle_event/3

handle_event(Event::term(), StateName::atom(), State::state()) -> {next_state, atom(), state()}

Unused.

handle_info/3

handle_info(Info::{'DOWN', reference(), term(), term(), term()}, StateName::atom(), State::state()) -> {next_state, atom(), state()} | {stop, normal, state()}

The non-gen_fsm message that this process expects is 'DOWN'.

'DOWN' messages are received when monitored vnodes exit. In that case, the vnode is removed from the worker list. If that was also the last vnode we were waiting on a done message from, also forward eoi and shut down the fitting.

handle_sync_event/4

handle_sync_event(Event::workers, From::term(), StateName::atom(), State::state()) -> {reply, [riak_pipe_vnode:partition()], atom(), state()}

The only sync event handled in all states is workers, which retrieves a list of ring partition indexes that have requested this fittings details (i.e. that are doing work for this fitting).

init/1

init(X1::[pid() | riak_pipe:fitting_spec() | riak_pipe:fitting() | riak_pipe:exec_opts()]) -> {ok, wait_upstream_eoi, state()}

Initialize the fitting process. This function monitors the builder process, so it will tear down if the builder exits.

start_link/4

start_link(Builder::pid(), Spec::riak_pipe:fitting_spec(), Output::riak_pipe:fitting(), Options::riak_pipe:exec_opts()) -> {ok, pid(), riak_pipe:fitting()} | ignore | {error, term()}

Start the fitting, according to the Spec given. The fitting will register with Builder and will request its outputs to be processed under the Output fitting.

terminate/3

terminate(Reason::term(), StateName::atom(), State::state()) -> ok

Unused.

validate_fitting/1

validate_fitting(Fitting_spec::riak_pipe:fitting_spec()) -> ok

Ensure that a fitting specification is valid. This function will check that the module is an atom that names a valid module (see riak_pipe_v:validate_module/2), that the arg is valid for the module (see validate_argument/2), and that the partition function is of the proper form (see validate_chashfun/1). It also checks that nval is undefined or a postive integer.

If all components are valid, the atom ok is returned. If any piece is invalid, {badarg, #fitting_spec.name, ErrorMsg} is thrown.

wait_upstream_eoi/2

wait_upstream_eoi(X1::eoi, State::state()) -> {stop, normal, state()} | {next_state, wait_workers_done, state()}

The fitting is just hanging out, serving details and waiting for end-of-inputs.

When it gets eoi, it forwards the signal to its workers, and then begins waiting for them to respond done. If it has no workers when it receives end-of-inputs, the fitting stops immediately.

wait_upstream_eoi/3

wait_upstream_eoi(M::{get_details, riak_pipe_vnode:partition(), pid()}, From::term(), State::state()) -> {reply, {ok, details()}, wait_upstream_eoi, state()}

The fitting is just hanging out, serving details and waiting for end-of-inputs.

When it gets a request for the fitting's details, it sets up a monitor for the working vnode, and responds with details.

The fitting may receive a done message from a vnode before eoi has been sent, if handoff causes the worker to relocate. In this case, the fitting simply demonitors the vnode, and removes it from its worker list.

wait_workers_done/3

wait_workers_done(M::{get_details, riak_pipe_vnode:partition(), pid()}, From::term(), State::state()) -> {reply, {ok, details()}, wait_workers_done, state()}

The fitting has forwarded the end-of-inputs signal to all of the vnodes working for it, and is waiting for done responses.

When the fitting receives a done response, it demonitors the vnode that sent it, and removes it from its worker list. If there are no more responses to wait for, the fitting forwards the end-of-inputs signal to the fitting that follows, and then shuts down normally.

If the fitting receives a request for details from a vnode while in this state, it responds with the detail as usual, but also immediately sends end-of-inputs to that vnode.

worker_done/1

worker_done(Fitting::riak_pipe:fitting()) -> ok | gone

Tell the fitting that this worker is done. This function assumes that it is being called from the vnode process, so that self() can be used to inform the fitting of which worker is done.

workers/1

workers(Fitting::pid()) -> {ok, [riak_pipe_vnode:partition()]} | gone

Get the list of ring partition indexes (vnodes) that are doing work for this fitting.


Generated by EDoc, Nov 19 2012, 04:53:19.