couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [1/2] fabric commit: updated refs/heads/master to e5c9c62
Date Fri, 10 Jun 2016 20:40:53 GMT
Repository: couchdb-fabric
Updated Branches:
  refs/heads/master 0aeaaa084 -> e5c9c62d8


Fix fabric_db_update_listener rexi_DOWN handling

A recent change that fixed the list comprehension ended up uncovering
the fact that we don't handle rexi_DOWN errors properly. This patch just
tracks the shards that are still listening and uses
`fabric_view:is_progress_possible/1` to know if we are still able to
continue listening for changes.

Fixes COUCHDB-3036


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/d5511c4f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/d5511c4f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/d5511c4f

Branch: refs/heads/master
Commit: d5511c4fb0efead8d61187279220fb6f06c0e086
Parents: 0aeaaa0
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Fri Jun 10 12:52:42 2016 -0500
Committer: Paul J. Davis <paul.joseph.davis@gmail.com>
Committed: Fri Jun 10 12:52:42 2016 -0500

----------------------------------------------------------------------
 src/fabric_db_update_listener.erl | 38 +++++++++++++++++++++++-----------
 1 file changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/d5511c4f/src/fabric_db_update_listener.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_update_listener.erl b/src/fabric_db_update_listener.erl
index ac4d8a2..f0155de 100644
--- a/src/fabric_db_update_listener.erl
+++ b/src/fabric_db_update_listener.erl
@@ -32,11 +32,13 @@
 
 -record(acc, {
     parent,
-    state
+    state,
+    shards
 }).
 
 go(Parent, ParentRef, DbName, Timeout) ->
-    Notifiers = start_update_notifiers(DbName),
+    Shards = mem3:shards(DbName),
+    Notifiers = start_update_notifiers(Shards),
     MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <- Notifiers]),
     RexiMon = rexi_monitor:start(MonRefs),
     MonPid = start_cleanup_monitor(self(), Notifiers),
@@ -44,8 +46,13 @@ go(Parent, ParentRef, DbName, Timeout) ->
     %% process to communicate via handle_message/3 we "fake" it as a
     %% a spawned worker.
     Workers = [#worker{ref=ParentRef, pid=Parent} | Notifiers],
+    Acc = #acc{
+        parent = Parent,
+        state = unset,
+        shards = Shards
+    },
     Resp = try
-        receive_results(Workers, #acc{parent=Parent, state=unset}, Timeout)
+        receive_results(Workers, Acc, Timeout)
     after
         rexi_monitor:stop(RexiMon),
         stop_cleanup_monitor(MonPid)
@@ -56,10 +63,10 @@ go(Parent, ParentRef, DbName, Timeout) ->
         Error -> erlang:error(Error)
     end.
 
-start_update_notifiers(DbName) ->
+start_update_notifiers(Shards) ->
     EndPointDict = lists:foldl(fun(#shard{node=Node, name=Name}, Acc) ->
         dict:append(Node, Name, Acc)
-    end, dict:new(), mem3:shards(DbName)),
+    end, dict:new(), Shards),
     lists:map(fun({Node, DbNames}) ->
         Ref = rexi:cast(Node, {?MODULE, start_update_notifier, [DbNames]}),
         #worker{ref=Ref, node=Node}
@@ -134,12 +141,12 @@ receive_results(Workers, Acc0, Timeout) ->
     end.
 
 
-handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, _Acc) ->
-    {error, {nodedown, Node}};
-handle_message({rexi_EXIT, _Reason}, Worker, _Acc) ->
-    {error, {worker_exit, Worker}};
-handle_message({gen_event_EXIT, Node, Reason}, _Worker, _Acc) ->
-    {error, {gen_event_exit, Node, Reason}};
+handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
+    handle_error(Node, {nodedown, Node}, Acc);
+handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
+    handle_error(Worker#worker.node, {worker_exit, Worker}, Acc);
+handle_message({gen_event_EXIT, Node, Reason}, _Worker, Acc) ->
+    handle_error(Node, {gen_event_EXIT, Node, Reason}, Acc);
 handle_message(db_updated, _Worker, #acc{state=waiting}=Acc) ->
     % propagate message to calling controller
     erlang:send(Acc#acc.parent, {state, self(), updated}),
@@ -155,4 +162,11 @@ handle_message(done, _, _) ->
     {stop, ok}.
 
 
-
+handle_error(Node, Reason, #acc{shards = Shards} = Acc) ->
+    Rest = lists:filter(fun(#shard{node = N}) -> N /= Node end, Shards),
+    case fabric_view:is_progress_possible([{R, nil} || R <- Rest]) of
+        true ->
+            {ok, Acc#acc{shards = Rest}};
+        false ->
+            {error, Reason}
+    end.


Mime
View raw message