Commits

Bryan Fink  committed 10c5a24

specify link-walking in json-m/r spec with {"link":{"bucket":B,"tag":T,"keep":K}}

  • Participants
  • Parent commits 890b91b

Comments (0)

Files changed (1)

File apps/riak/src/riak_mapred_json.erl

         true ->
             error
     end;
-parse_query([{struct, [{Type, {struct, StepDef}}]}|T], Accum) when Type =:= <<"map">>;
-                                                               Type =:= <<"reduce">> ->
+parse_query([{struct, [{Type, {struct, StepDef}}]}|T], Accum)
+  when Type =:= <<"map">>; Type =:= <<"reduce">>; Type =:= <<"link">> ->
     StepType = case Type of
                    <<"map">> -> map;
-                   <<"reduce">> -> reduce
+                   <<"reduce">> -> reduce;
+                   <<"link">> -> link
                end,
-    Lang = proplists:get_value(<<"language">>, StepDef),
     Keep = proplists:get_value(<<"keep">>, StepDef),
-    case not(Keep =:= true orelse Keep =:= false) of
+    Step = case not(Keep =:= true orelse Keep =:= false) of
+               true -> error;
+               false ->
+                   if StepType == link ->
+                          case parse_link_step(StepDef) of
+                              {ok, {Bucket, Tag}} ->
+                                  {ok, {link, Bucket, Tag, Keep}};
+                              LError ->
+                                  LError
+                          end;
+                      true -> % map or reduce
+                           Lang = proplists:get_value(<<"language">>, StepDef),
+                           case parse_step(Lang, StepDef) of
+                               error ->
+                                   error;
+                               {ok, ParsedStep} ->
+                                   Arg = proplists:get_value(<<"arg">>, StepDef, none),
+                                   {ok, {StepType, ParsedStep, Arg, Keep}}
+                           end
+                   end
+           end,
+    case Step of
+        {ok, S} -> parse_query(T, [S|Accum]);
+        SError  -> SError
+    end;
+parse_query(_, _Accum) ->
+    error.
+
+parse_link_step(StepDef) ->
+    Bucket = proplists:get_value(<<"bucket">>, StepDef, <<"_">>),
+    Tag = proplists:get_value(<<"tag">>, StepDef, <<"_">>),
+    case not(is_binary(Bucket) andalso is_binary(Tag)) of
         true ->
             error;
         false ->
-            case parse_step(Lang, StepDef) of
-                error ->
-                    error;
-                {ok, ParsedStep} ->
-                    Arg = proplists:get_value(<<"arg">>, StepDef, none),
-                    parse_query(T, [{StepType, ParsedStep, Arg, Keep}|Accum])
-            end
-    end;
-parse_query(_, _Accum) ->
-    error.
+            {ok, {if Bucket == <<"_">> -> '_';
+                     true              -> Bucket
+                  end,
+                  if Tag == <<"_">> -> '_';
+                     true           -> Tag
+                  end}}
+    end.
 
 parse_step(<<"javascript">>, StepDef) ->
     Source = proplists:get_value(<<"source">>, StepDef),