1. Justin Sheehy
  2. riak


riak / doc / basic-mapreduce.txt

Introduction to Map/Reduce on Riak

This document describes Riak's implementation of a data processing
system based on the MapReduce[1] programming paradigm popularized by
Google.  It assumes that you have already set up Riak, and know the
basics about dealing with Riak clients.  For more information on these
prerequisites, see riak/doc/basic-setup.txt and

Quick and Dirty Example

If you have a Riak client hanging around, you can execute Map/Reduce
queries on it like this:

1> Count = fun(G, undefined, none) ->
             [dict:from_list([{I, 1} || I <- riak_object:get_value(G)])]
2> Merge = fun(Gcounts, none) ->
             [lists:foldl(fun(G, Acc) ->
                            dict:merge(fun(_, X, Y) -> X+Y end,
                                       G, Acc)
3> {ok, [R]} = Client:mapred([{groceries, "mine"},{groceries, "yours"}],
                             [{map, {qfun, Count}, none, false},
                              {reduce, {qfun, Merge}, none, true}]).
4> L = dict:to_list(R).

If the "mine" and "yours" objects in the groceries bucket had values
of ["bread", "cheese"], ["bread", "butter"], the sequence of commands
above would result in L being bound to
[{"bread", 2},{"cheese",1},{"butter",1}].


More importantly, riak_client:mapred takes two lists as arguments.
The first list contains bucket key pairs, which are the keys to the
"starting values" of the Map/Reduce query.  The second list are the
steps of the query.

Map Steps

Map steps expect as input a list of bucket/key pairs, just like the
first argument to the riak_client:mapred function.  Riak executes a
map step by looking up values for keys in the input list and executing
the map function referenced in the step.

Map steps take the form:

{map, FunTerm, Arg, Accumulate}


  FunTerm is a reference to the function that will compute the map of
    each value.  A function referenced by a FunTerm must be arity-3,
    accepting the arguments:

    Value: the value found at a key.  This will be a Riak object
      (defined by the riak_object module) if a value was found, or the
      tuple {error, notfound} if a bucket/key was put in the input
      list, but not found in the Riak cluster.

    Data: An optional piece of data attached to the bucket/key tuple.
      If instead of {Bucket, Key}, {{Bucket, Key}, Data} is passed as
      input to a map step, that Data will be passed to the map
      function in this argument.  Data will be the atom 'undefined' if
      the former form is used.

    Arg: The Arg from the map step definition.  The same Arg is passed
      to every execution of the map function in this step.

    Functions may be referenced in two ways:

      {modfun, Module, Function} where Module and Function are atoms
        that name an Erlang function in a specific module

      {qfun, Function} where Function is a callable fun term

    The function must return a *list* of values.  The lists returned
    by all executions of the map function for this step will be
    appended and passed to the next step.

  Arg: The third argument passed to the function referenced in FunTerm.

  Accumulate: If true, the output of this map step will be included in
    the final return of the mapred function.  If false, the output will
    be discarded after the next step.

Reduce Steps

A reduce step takes the form:

{reduce, FunTerm, Arg, Acc}

Where FunTerm, Arg, and Acc are mostly the same as their definition
for a map step, but the function referenced in FunTerm is instead of
arity 2.  Its parameters are:

  ValueList: The list of values produce by the preceeding step of the

  Arg: The Arg from the step definition.

The function should again produce a list of values, but it must also
be true that the function is commutative, associative, and
idempotent. That is, if the input list [a,b,c,d] is valid for a given
F, then all of the following must produce the same result:

  F([a,d] ++ F([c,b]))

Where does the code run?

So, all well and good, but you could code the same abstraction in a
couple of hours, right?  Just fetch each object and run your function.

Well, not so fast.  This map/reduce isn't just an abstraction, it
fully exploits data locality.  That is to say, both map and reduce
functions run on Riak nodes.  Map nodes are even run on the node where
the data is already located.

This means a few things to you:

- If you use the {modfun, Module, Function} form of the FunTerm in the
  map/reduce step definition, that Module must be in the code path of
  the Riak node.  This isn't a huge concern for libraries that ship
  with Erlang, but for any of your custom code, you'll need to make
  sure it's loadable.

  The easiest way to get your custom modules in the code path of a
  Riak node is to symlink your application from riak's "deps"
  directory.  The node start script (start-fresh.sh, start-join.sh,
  etc.) automatically adds deps/*/ebin to the code path.

- If you use the {modfun, Module, Function} form of the FunTerm in the
  map/reduce step definition, you'll need to force the Riak nodes to
  reload the Module if you make a change to it.

  The easiest way to reload a module on a Riak node is to get a Riak
  client, then call Client:reload_all(Module).

- If you need to do a Riak 'get' inside of a map or reduce function,
  you can use riak:local_client/0 to get a Riak client instead of
  riak:client_connect/3.  The code is already running on a connected
  node -- there's no need to go through the doorbell/connect

- Your map and reduce functions are running on a Riak node, which
  means that that Riak node is spending CPU time doing something other
  than responding to 'get' and 'put' requests.

- If you use the {qfun, Fun} form, your callable function, and its
  environment will be shipped to the Riak cluster, and each node on
  which it runs.  This is both a benefit (in that you have the full
  power of closures) and a danger (in that you must be mindful of
  closing over very large data structures).

[1] http://labs.google.com/papers/mapreduce.html