couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [31/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f
Date Fri, 01 Aug 2014 09:12:19 GMT
Update _all_docs coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/ad460e40
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/ad460e40
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/ad460e40

Branch: refs/heads/windsor-merge-121
Commit: ad460e40eed6e2162d42c7eb2bc7e1306af4f092
Parents: dae200f
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Wed Sep 4 17:28:57 2013 -0500
Committer: Robert Newson <rnewson@apache.org>
Committed: Thu Jul 31 14:37:06 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_all_docs.erl | 127 ++++++++++++++++++++------------------
 1 file changed, 66 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ad460e40/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index e5ed7b3..8dc9173 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -20,29 +20,26 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
-go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc0) ->
-    Workers = fabric_util:submit_jobs(mem3:shards(DbName),all_docs,[Options, QueryArgs]),
-    #mrargs{limit = Limit, skip = Skip} = QueryArgs,
-    State = #collector{
-        query_args = QueryArgs,
-        callback = Callback,
-        counters = fabric_dict:init(Workers, 0),
-        skip = Skip,
-        limit = Limit,
-        user_acc = Acc0
-    },
-    RexiMon = fabric_util:create_monitors(Workers),
-    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
-        State, infinity, 5000) of
-    {ok, NewState} ->
-        {ok, NewState#collector.user_acc};
-    {timeout, NewState} ->
-        Callback({error, timeout}, NewState#collector.user_acc);
-    {error, Resp} ->
-        {ok, Resp}
+go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+    Shards = mem3:shards(DbName),
+    Workers0 = fabric_util:submit_jobs(
+            Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    go(DbName, Options, Workers, QueryArgs, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, _} ->
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
     after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
+        rexi_monitor:stop(RexiMon)
     end;
 
 
@@ -81,19 +78,32 @@ go(DbName, Options, QueryArgs, Callback, Acc0) ->
         Callback(timeout, Acc0)
     end.
 
+go(DbName, _Options, Workers, QueryArgs, Callback, Acc0) ->
+    #mrargs{limit = Limit, skip = Skip} = QueryArgs,
+    State = #collector{
+        db_name = DbName,
+        query_args = QueryArgs,
+        callback = Callback,
+        counters = fabric_dict:init(Workers, 0),
+        skip = Skip,
+        limit = Limit,
+        user_acc = Acc0
+    },
+    case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+        State, infinity, 5000) of
+    {ok, NewState} ->
+        {ok, NewState#collector.user_acc};
+    {timeout, NewState} ->
+        Callback({error, timeout}, NewState#collector.user_acc);
+    {error, Resp} ->
+        {ok, Resp}
+    end.
+
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
 
 handle_message({meta, Meta0}, {Worker, From}, State) ->
     Tot = couch_util:get_value(total, Meta0, 0),
@@ -105,35 +115,30 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
         offset = Offset0,
         user_acc = AccIn
     } = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate
-        gen_server:reply(From, stop),
-        {ok, State};
-    0 ->
-        gen_server:reply(From, ok),
-        Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
-        Total = Total0 + Tot,
-        Offset = Offset0 + Off,
-        case fabric_dict:any(0, Counters2) of
-        true ->
-            {ok, State#collector{
-                counters = Counters2,
-                total_rows = Total,
-                offset = Offset
-            }};
-        false ->
-            FinalOffset = erlang:min(Total, Offset+State#collector.skip),
-            Meta = [{total, Total}, {offset, FinalOffset}],
-            {Go, Acc} = Callback({meta, Meta}, AccIn),
-            {Go, State#collector{
-                counters = fabric_dict:decrement_all(Counters2),
-                total_rows = Total,
-                offset = FinalOffset,
-                user_acc = Acc
-            }}
-        end
+    % Assert that we don't have other messages from this
+    % worker when the total_and_offset message arrives.
+    0 = fabric_dict:lookup_element(Worker, Counters0),
+    rexi:stream_ack(From),
+    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    Total = Total0 + Tot,
+    Offset = Offset0 + Off,
+    case fabric_dict:any(0, Counters1) of
+    true ->
+        {ok, State#collector{
+            counters = Counters1,
+            total_rows = Total,
+            offset = Offset
+        }};
+    false ->
+        FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+        Meta = [{total, Total}, {offset, FinalOffset}],
+        {Go, Acc} = Callback({meta, Meta}, AccIn),
+        {Go, State#collector{
+            counters = fabric_dict:decrement_all(Counters1),
+            total_rows = Total,
+            offset = FinalOffset,
+            user_acc = Acc
+        }}
     end;
 
 handle_message(#view_row{} = Row, {Worker, From}, State) ->


Mime
View raw message