Module riak_pipe

Basic interface to riak_pipe.

Description

Basic interface to riak_pipe.

Clients of riak_pipe are most likely to be interested in exec/2, wait_first_fitting/1, receive_result/1, and collect_results/1.

Basic client usage should go something like this:
  % define the pipeline
  PipelineSpec = [#fitting_spec{name="passer"
                                module=riak_pipe_w_pass}],
 
  % start things up
  {ok, Pipe} = riak_pipe:exec(PipelineSpec, []),
 
  % send in some work
  riak_pipe:queue_work(Pipe, "work item 1"),
  riak_pipe:queue_work(Pipe, "work item 2"),
  riak_pipe:queue_work(Pipe, "work item 3"),
  riak_pipe:eoi(Pipe),
 
  % wait for results (alternatively use receive_result/1 repeatedly)
  {ok, Results} = riak_pipe:collect_results(Pipe).

Many examples are included in the source code, and exported as functions named example*.

The functions result/3, eoi/1, and log/3 are used by workers and fittings to deliver messages to the sink.

Data Types

exec_option()

exec_option() = {sink, fitting()} | {trace, all | list() | set() | ordsets:ordset()} | {log, sink | sasl}

exec_opts()

exec_opts() = [exec_option()]

fitting()

fitting() = #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}

fitting_spec()

fitting_spec() = #fitting_spec{name = undefined | term(), module = undefined | atom(), arg = undefined | term(), chashfun = riak_pipe_vnode:chashfun(), nval = riak_pipe_vnode:nval(), q_limit = pos_integer()}

pipe()

pipe() = #pipe{builder = undefined | pid(), fittings = undefined | [{Name::term(), #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}], sink = undefined | #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}

stat()

stat() = {atom(), term()}

Function Index

active_pipelines/1Get all active pipelines hosted on Node.
collect_results/1Receive all results and log messages, up to end-of-inputs (unless receive_result times out before the eoi arrives).
collect_results/2
destroy/1Brutally kill a pipeline.
eoi/1Send an end-of-inputs message to the head of the pipe.
example/0An example run of a simple pipe.
example_receive/1An example of receiving data from a pipeline.
example_reduce/0Another example pipeline use.
example_send/1An example of sending data into a pipeline.
example_start/0An example of starting a simple pipe.
example_tick/3
example_tick/4
example_transform/0Another example pipeline use.
exec/2Setup a pipeline.
generic_transform/4
queue_work/2Equivalent to queue_work(Pipe, Input, infinity).
queue_work/3Send inputs to the head of the pipe.
receive_result/1Pull the next pipeline result out of the sink's mailbox.
receive_result/2
status/1Retrieve details about the status of the workers in this pipeline.

Function Details

active_pipelines/1

active_pipelines(Node::node() | global) -> [#pipe{builder = undefined | pid(), fittings = undefined | [{Name::term(), #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}], sink = undefined | #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}] | error | [{node(), [#pipe{builder = undefined | pid(), fittings = undefined | [{Name::term(), #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}], sink = undefined | #fitting{pid = undefined | pid(), ref = undefined | reference(), chashfun = undefined | riak_pipe_vnode:chashfun(), nval = undefined | riak_pipe_vnode:nval()}}] | error}]

Get all active pipelines hosted on Node. Pass the atom global instead of a node name to get all pipelines hosted on all nodes.

The return value for a Node is a list of #pipe{} records. When global is used, the return value is a list of {Node, [#pipe{}]} tuples.

collect_results/1

collect_results(Pipe::pipe()) -> {eoi | timeout, Results::[{From::term(), Result::term()}], Logs::[{From::term(), Message::term()}]}

Receive all results and log messages, up to end-of-inputs (unless receive_result times out before the eoi arrives).

If end-of-inputs was the last message received, the first element of the returned tuple will be the atom eoi. If the receive timed out before receiving end-of-inputs, the first element of the returned tuple will be the atom timeout.

The second element will be a list of all result messages received, while the third element will be a list of all log messages received.

This function assumes that it is called in the sink's process. Passing the #fitting{} structure is only needed for reference to weed out misdirected messages from forgotten pipelines. A static inter-message timeout of five seconds is hard-coded (TODO).

collect_results/2

collect_results(Pipe::pipe(), Timeout::integer() | infinity) -> {eoi | timeout, Results::[{From::term(), Result::term()}], Logs::[{From::term(), Message::term()}]}

destroy/1

destroy(Pipe::pipe()) -> ok

Brutally kill a pipeline. Use this when it is necessary to stop all parts of a pipeline as quickly as possible, instead of waiting for an eoi to propagate through.

eoi/1

eoi(Pipe::pipe()) -> ok

Send an end-of-inputs message to the head of the pipe.

example/0

example() -> {eoi | timeout, list(), list()}

An example run of a simple pipe. Uses example_start/0, example_send/0, and example_receive/0 to send nonsense through a pipe.

If everything behaves correctly, this function should return
  {eoi, [{empty_pass, "hello"}], _LogMessages}.

example_receive/1

example_receive(Pipe::pipe()) -> {eoi | timeout, list(), list()}

An example of receiving data from a pipeline. Reads all results sent to the given sink.

example_reduce/0

example_reduce() -> any()

Another example pipeline use. This one sets up a simple "reduce" fitting, which expects tuples of the form {Key::term(), Value::number()}, and produces results of the same form, where the output value is the sum of all of the input values for a given key.

If everything behaves correctly, this function should return
  {eoi, [{"sum reduce", {a, [55]}}, {"sum reduce", {b, [155]}}], []}.

example_send/1

example_send(Pipe::pipe()) -> ok

An example of sending data into a pipeline. Queues the string "hello" for the fitting provided, then signals end-of-inputs to that fitting.

example_start/0

example_start() -> {ok, Pipe::pipe()}

An example of starting a simple pipe. Starts a pipe with one "pass" fitting. Sink is pointed at the current process. Logging is pointed at the sink. All tracing is enabled.

example_tick/3

example_tick(TickLen, NumTicks, ChainLen) -> any()

example_tick/4

example_tick(TickLen, BatchSize, NumTicks, ChainLen) -> any()

example_transform/0

example_transform() -> {eoi | timeout, list(), list()}

Another example pipeline use. This one sets up a simple "transform" fitting, which expects lists of numbers as input, and produces the sum of that list as output.

If everything behaves correctly, this function should return
  {eoi, [{"sum transform", 55}], []}.

exec/2

exec(Spec::[fitting_spec()], Options::exec_opts()) -> {ok, Pipe::pipe()}

Setup a pipeline. This function starts up fitting/monitoring processes according the fitting specs given, returning a handle to the pipeline. Inputs may then be sent to vnodes, tagged with that head fitting.

The pipeline is specified as an ordered list of #fitting_spec{} records. Each record has the fields:
name
Any term. Will be used in logging, trace, and result messages.
module
Atom. The name of the module implementing the fitting. This module must implement the riak_pipe_vnode_worker behavior.
arg
Any term. Will be available to the fitting-implementation module's initialization function. This is a good way to parameterize general fittings.
chashfun

A function of arity 1. The consistent-hashing function used to determine which vnode should receive an input. This function will be evaluated as Fun(Input). The result of that evaluation should be a binary, 160 bits in length, which will be used to choose the working vnode from a riak_core_ring. (Very similar to the chash_keyfun bucket property used in riak_kv.)

The default is fun chash:key_of/1, which will distribute inputs according to the SHA-1 hash of the input.
nval

Either a positive integer, or a function of arity 1 that returns a positive integer. This field determines the maximum number of vnodes that might be asked to handle the input. If a worker is unable to process an input on a given vnode, it can ask to have the input sent to a different vnode. Up to nval vnodes will be tried in this manner.

If nval is an integer, that static number is used for all inputs. If nval is a function, the function is evaluated as Fun(Input) (much like chashfun), and is expected to return a positive integer.
Defined elements of the Options list are:
{sink, Sink}
If no sink option is provided, one will be created, such that the calling process will receive all messages sent to the sink (all output, logging, and trace messages). If specified, Sink should be a #fitting{} record, filled with the pid of the process prepared to receive these messages.
{trace, TraceMatches}
If no trace option is provided, tracing will be disabled for this pipeline. If specified, TraceMatches should be either the atom all, in which case all trace messages will be delivered, or a list of trace tags to match, in which case only messages with matching tags will be delivered.
{log, LogTarget}
If no log option is provided, logging will be disabled for this pipeline. If specified, LogTarget should be one of the following atoms:
sink
all log (and trace) messages will be delivered to the sink
sasl
all log (and trace) messages will be printed via error_logger to the SASL log
lager
all log (and trace) messages will be printed to the Riak node's log via the lager utility
Other values are allowed, but ignored, in Options. The value of Options is provided to all fitting modules during initialization, so it can be a good vector for global configuration of general fittings.

generic_transform/4

generic_transform(MsgFun, DriverFun, ExecOpts, NumFittings) -> any()

queue_work/2

queue_work(Pipe, Input) -> any()

Equivalent to queue_work(Pipe, Input, infinity).

queue_work/3

queue_work(Pipe::pipe(), Input::term(), Timeout::riak_pipe_vnode:qtimeout()) -> ok | {error, riak_pipe_vnode:qerror()}

Send inputs to the head of the pipe.

Note that Timeout options are only infinity and noblock, not generic durations yet.

receive_result/1

receive_result(Pipe::pipe()) -> {result, {From::term(), Result::term()}} | {log, {From::term(), Message::term()}} | eoi | timeout

Pull the next pipeline result out of the sink's mailbox. The From element of the result and log messages will be the name of the fitting that generated them, as specified in the #fitting_spec{} record used to start the pipeline. This function assumes that it is called in the sink's process. Passing the #fitting{} structure is only needed for reference to weed out misdirected messages from forgotten pipelines. A static timeout of five seconds is hard-coded (TODO).

receive_result/2

receive_result(Pipe::pipe(), Timeout::integer() | infinity) -> {result, {From::term(), Result::term()}} | {log, {From::term(), Message::term()}} | eoi | timeout

status/1

status(Pipe::pipe()) -> [{FittingName::term(), [PartitionStatus::[stat()]]}]

Retrieve details about the status of the workers in this pipeline. The form of the return is a list with one entry per fitting in the pipe. Each fitting's entry is a 2-tuple of the form {FittingName, WorkerDetails}, where FittingName is the name that was given to the fitting in the call to riak_pipe:exec/2, and WorkerDetails is a list with one entry per worker. Each worker entry is a proplist, of the form returned by riak_pipe_vnode:status/1, with two properties added: node, the node on which the worker is running, and partition, the index of the vnode that the worker belongs to.


Generated by EDoc, Nov 21 2012, 15:49:54.