couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [05/49] fabric commit: updated refs/heads/windsor-merge to b1c0030
Date Fri, 01 Aug 2014 14:34:26 GMT
Implement fabric_util:stream_start/2

This is a utility function that handles gathering the start of a rexi
stream. It will return a single full hash ring of shards or an error.

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/bea3052a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/bea3052a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/bea3052a

Branch: refs/heads/windsor-merge
Commit: bea3052ae5dc5faeba0f7d072c9419e14487127c
Parents: 28a9e3d
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Fri Sep 6 07:26:32 2013 -0500
Committer: Robert Newson <rnewson@apache.org>
Committed: Fri Aug 1 15:33:40 2014 +0100

----------------------------------------------------------------------
 src/fabric_util.erl | 52 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/bea3052a/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index c93efda..f11abe3 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,6 +16,7 @@
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2]).
 -export([request_timeout/0]).
+-export([stream_start/2]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -44,6 +45,57 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
 cleanup(Workers) ->
     [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
 
+stream_start(Workers0, Keypos) ->
+    Fun = fun handle_stream_start/3,
+    Acc = fabric_dict:init(Workers0, waiting),
+    Timeout = request_timeout(),
+    case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
+        {ok, Workers} ->
+            true = fabric_view:is_progress_possible(Workers),
+            AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
+                rexi:stream_start(From),
+                [Worker | WorkerAcc]
+            end, [], Workers),
+            {ok, AckedWorkers};
+        Else ->
+            Else
+    end.
+
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+    case fabric_util:remove_down_workers(State, NodeRef) of
+    {ok, NewState} ->
+        {ok, NewState};
+    error ->
+        Reason = {nodedown, <<"progress not possible">>},
+        {error, Reason}
+    end;
+handle_stream_start({rexi_EXIT, Reason}, Worker, State) ->
+    NewState = fabric_dict:erase(Worker, State),
+    case fabric_view:is_progress_possible(NewState) of
+    true ->
+        {ok, NewState};
+    false ->
+        {error, fabric_util:error_info(Reason)}
+    end;
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, State) ->
+    case fabric_dict:lookup_element(Worker, State) of
+    undefined ->
+        % This worker lost the race with other partition copies, terminate
+        rexi:stream_cancel(From),
+        {ok, State};
+    waiting ->
+        % Don't ack the worker yet so they don't start sending us
+        % rows until we're ready
+        NewState0 = fabric_dict:store(Worker, From, State),
+        NewState1 = fabric_view:remove_overlapping_shards(Worker, NewState0),
+        case fabric_dict:any(waiting, NewState1) of
+            true -> {ok, NewState1};
+            false -> {stop, NewState1}
+        end
+    end;
+handle_stream_start(Else, _, _) ->
+    exit({invalid_stream_start, Else}).
+
 recv(Workers, Keypos, Fun, Acc0) ->
     rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
 


Mime
View raw message