couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [03/50] fabric commit: updated refs/heads/master to a71701c
Date Thu, 28 Aug 2014 12:20:38 GMT
Update reduce view coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/df9cb854
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/df9cb854
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/df9cb854

Branch: refs/heads/master
Commit: df9cb854bd4a9adb659572fb54fce091b873318f
Parents: 11112ad
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Wed Sep 4 16:00:01 2013 -0500
Committer: Robert Newson <rnewson@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_reduce.erl | 80 ++++++++++++++++++++---------------------
 1 file changed, 40 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/df9cb854/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 12bed6d..42b889b 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -19,11 +19,35 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
-go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo) ->
-    Shards = fabric_view:get_shards(DbName, Args),
-    Workers = fabric_util:submit_jobs(Shards, reduce_view, [DDoc, VName, Args]),
+go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) ->
+    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
+    go(DbName, DDoc, View, Args, Callback, Acc0, VInfo);
+
+go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    RexiMon = fabric_util:create_monitors(Workers),
+    Workers0 = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
+        Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
+        Shard#shard{ref = Ref}
+    end, fabric_view:get_shards(DbName, Args)),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    go2(DbName, Workers, Lang, RedSrc, Args, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, _} ->
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+go2(DbName, Workers, Lang, RedSrc, Args, Callback, Acc0) ->
     #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
     OsProc = case os_proc_needed(RedSrc) of
         true -> couch_query_servers:get_os_process(Lang);
@@ -52,27 +76,16 @@ go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo)
->
     {error, Resp} ->
         {ok, Resp}
     after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers),
-        case State#collector.os_proc of
-            nil -> ok;
-            OsProc -> catch couch_query_servers:ret_os_process(OsProc)
+        if OsProc == nil -> ok; true ->
+            catch couch_query_servers:ret_os_process(OsProc)
         end
     end.
 
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
 
 %% HACK: this just sends meta once. Instead we should move the counter logic
 %% from the #view_row handle_message below into this function and and pass the
@@ -100,29 +113,16 @@ handle_message({meta, Meta}, {_Worker, From}, State) ->
 
 handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
     #collector{counters = Counters0, rows = Rows0} = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, State};
-    _ ->
-        Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0),
-        C1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        % TODO time this call, if slow don't do it every time
-        C2 = fabric_view:remove_overlapping_shards(Worker, C1),
-        State1 = State#collector{rows=Rows, counters=C2},
-        fabric_view:maybe_send_row(State1)
-    end;
+    true = fabric_dict:is_key(Worker, Counters0),
+    Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0),
+    C1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    State1 = State#collector{rows=Rows, counters=C1},
+    fabric_view:maybe_send_row(State1);
 
 handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        {ok, State};
-    _ ->
-        C1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        C2 = fabric_view:remove_overlapping_shards(Worker, C1),
-        fabric_view:maybe_send_row(State#collector{counters = C2})
-    end.
+    true = fabric_dict:is_key(Worker, Counters0),
+    C1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    fabric_view:maybe_send_row(State#collector{counters = C1}).
 
 complete_worker_test() ->
     meck:new(config),


Mime
View raw message