couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [07/50] fabric commit: updated refs/heads/master to a71701c
Date Thu, 28 Aug 2014 12:20:42 GMT
Allow shard replacements in stream_start

This adds fabric_util:stream_start/4 which accepts a start function and
a proplist of replacement shards keyed by range. If a worker exits due
to maintenance mode and prevents progress from being made, the list of
replacement shards is searched for any replacements for the range. If
any are found the start function is called for each possible
replacement. The start function takes the possible replacement and
should return the new worker.

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f2256977
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f2256977
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f2256977

Branch: refs/heads/master
Commit: f225697757ed07fd344016f1a6a85674b800686c
Parents: 0f45478
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Thu Sep 12 10:23:48 2013 -0500
Committer: Robert Newson <rnewson@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_util.erl | 82 +++++++++++++++++++++++++++++++++++-------------
 1 file changed, 60 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f2256977/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index f11abe3..fb6cd8b 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,13 +16,19 @@
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2]).
 -export([request_timeout/0]).
--export([stream_start/2]).
+-export([stream_start/2, stream_start/4]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
+-record(stream_acc, {
+    workers,
+    start_fun,
+    replacements
+}).
+
 remove_down_workers(Workers, BadNode) ->
     Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
     NewWorkers = fabric_dict:filter(Filter, Workers),
@@ -45,12 +51,19 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
 cleanup(Workers) ->
     [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
 
-stream_start(Workers0, Keypos) ->
+stream_start(Workers, Keypos) ->
+    stream_start(Workers, Keypos, undefined, undefined).
+
+stream_start(Workers0, Keypos, StartFun, Replacements) ->
     Fun = fun handle_stream_start/3,
-    Acc = fabric_dict:init(Workers0, waiting),
+    Acc = #stream_acc{
+        workers = fabric_dict:init(Workers0, waiting),
+        start_fun = StartFun,
+        replacements = Replacements
+    },
     Timeout = request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
-        {ok, Workers} ->
+        {ok, #stream_acc{workers=Workers}} ->
             true = fabric_view:is_progress_possible(Workers),
             AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
                 rexi:stream_start(From),
@@ -61,36 +74,61 @@ stream_start(Workers0, Keypos) ->
             Else
     end.
 
-handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    case fabric_util:remove_down_workers(State, NodeRef) of
-    {ok, NewState} ->
-        {ok, NewState};
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
+    case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
+    {ok, Workers} ->
+        {ok, St#stream_acc{workers=Workers}};
     error ->
         Reason = {nodedown, <<"progress not possible">>},
         {error, Reason}
     end;
-handle_stream_start({rexi_EXIT, Reason}, Worker, State) ->
-    NewState = fabric_dict:erase(Worker, State),
-    case fabric_view:is_progress_possible(NewState) of
-    true ->
-        {ok, NewState};
-    false ->
+handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
+    Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
+    case {fabric_view:is_progress_possible(Workers), Reason} of
+    {true, _} ->
+        {ok, St#stream_acc{workers=Workers}};
+    {false, {maintenance_mode, _Node}} ->
+        % Check if we have replacements for this range
+        % and start the new workers if so.
+        case lists:keytake(Worker#shard.range, 1, St#stream_acc.replacements) of
+            {value, {_Range, WorkerReplacements}, NewReplacements} ->
+                FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
+                    NewWorker = (St#stream_acc.start_fun)(Repl),
+                    fabric_dict:store(NewWorker, waiting, NewWorkers)
+                end, Workers, WorkerReplacements),
+                % Assert that our replaced worker provides us
+                % the oppurtunity to make progress.
+                true = fabric_view:is_progress_possible(FinalWorkers),
+                NewRefs = fabric_dict:fetch_keys(FinalWorkers),
+                {new_refs, NewRefs, St#stream_acc{
+                    workers=FinalWorkers,
+                    replacements=NewReplacements
+                }};
+            false ->
+                % If we progress isn't possible and we don't have any
+                % replacements then we're dead in the water.
+                Error = {nodedown, <<"progress not possible">>},
+                {error, Error}
+        end;
+    {false, _} ->
         {error, fabric_util:error_info(Reason)}
     end;
-handle_stream_start(rexi_STREAM_INIT, {Worker, From}, State) ->
-    case fabric_dict:lookup_element(Worker, State) of
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
+    case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
     undefined ->
         % This worker lost the race with other partition copies, terminate
         rexi:stream_cancel(From),
-        {ok, State};
+        {ok, St};
     waiting ->
         % Don't ack the worker yet so they don't start sending us
         % rows until we're ready
-        NewState0 = fabric_dict:store(Worker, From, State),
-        NewState1 = fabric_view:remove_overlapping_shards(Worker, NewState0),
-        case fabric_dict:any(waiting, NewState1) of
-            true -> {ok, NewState1};
-            false -> {stop, NewState1}
+        Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
+        Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
+        case fabric_dict:any(waiting, Workers1) of
+            true ->
+                {ok, St#stream_acc{workers=Workers1}};
+            false ->
+                {stop, St#stream_acc{workers=Workers1}}
         end
     end;
 handle_stream_start(Else, _, _) ->


Mime
View raw message