couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kxe...@apache.org
Subject [2/4] fabric commit: updated refs/heads/master to ba27f81
Date Tue, 12 May 2015 19:55:39 GMT
Add updates_pending field to get_view_group_info

We also change the beheviour of this function to wait
response from all shards.

COUCHDB-2526


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/6351140d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/6351140d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/6351140d

Branch: refs/heads/master
Commit: 6351140d4f8d0621fc4d33da2767d4f09ecb2ad7
Parents: 66b98b0
Author: ILYA Khlopotov <iilyak@ca.ibm.com>
Authored: Mon Dec 22 13:57:38 2014 -0800
Committer: ILYA Khlopotov <iilyak@ca.ibm.com>
Committed: Thu Mar 26 05:39:58 2015 -0700

----------------------------------------------------------------------
 src/fabric.erl            |  5 +++
 src/fabric_group_info.erl | 81 ++++++++++++++++++++++++++++++------------
 2 files changed, 63 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/6351140d/src/fabric.erl
----------------------------------------------------------------------
diff --git a/src/fabric.erl b/src/fabric.erl
index 13cc910..aff0485 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -326,6 +326,11 @@ query_view(DbName, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
             {active, non_neg_integer()} |
             {external, non_neg_integer()} |
             {file, non_neg_integer()}
+        ]} |
+        {updates_pending, [
+            {minimum, non_neg_integer()} |
+            {preferred, non_neg_integer()} |
+            {total, non_neg_integer()}
         ]}
     ]}.
 get_view_group_info(DbName, DesignId) ->

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/6351140d/src/fabric_group_info.erl
----------------------------------------------------------------------
diff --git a/src/fabric_group_info.erl b/src/fabric_group_info.erl
index 85719b6..4d81200 100644
--- a/src/fabric_group_info.erl
+++ b/src/fabric_group_info.erl
@@ -24,11 +24,12 @@ go(DbName, GroupId) when is_binary(GroupId) ->
 
 go(DbName, #doc{id=DDocId}) ->
     Shards = mem3:shards(DbName),
+    Ushards = mem3:ushards(DbName),
     Workers = fabric_util:submit_jobs(Shards, group_info, [DDocId]),
     RexiMon = fabric_util:create_monitors(Shards),
-    Acc0 = {fabric_dict:init(Workers, nil), []},
-    try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
-    {timeout, {WorkersDict, _}} ->
+    Acc = acc_init(Workers, Ushards),
+    try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc) of
+    {timeout, {WorkersDict, _, _}} ->
         DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
         fabric_util:log_timeout(DefunctWorkers, "group_info"),
         {error, timeout};
@@ -38,48 +39,82 @@ go(DbName, #doc{id=DDocId}) ->
         rexi_monitor:stop(RexiMon)
     end.
 
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc, U}) ->
     case fabric_util:remove_down_workers(Counters, NodeRef) of
     {ok, NewCounters} ->
-        {ok, {NewCounters, Acc}};
+        {ok, {NewCounters, Acc, U}};
     error ->
         {error, {nodedown, <<"progress not possible">>}}
     end;
 
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc, U}) ->
     NewCounters = lists:keydelete(Shard, #shard.ref, Counters),
     case fabric_view:is_progress_possible(NewCounters) of
     true ->
-        {ok, {NewCounters, Acc}};
+        {ok, {NewCounters, Acc, U}};
     false ->
         {error, Reason}
     end;
 
-handle_message({ok, Info}, Shard, {Counters, Acc}) ->
-    case fabric_dict:lookup_element(Shard, Counters) of
-    undefined ->
-        % already heard from someone else in this range
-        {ok, {Counters, Acc}};
-    nil ->
-        C1 = fabric_dict:store(Shard, ok, Counters),
-        C2 = fabric_view:remove_overlapping_shards(Shard, C1),
-        case fabric_dict:any(nil, C2) of
-        true ->
-            {ok, {C2, [Info|Acc]}};
-        false ->
-            {stop, merge_results(lists:flatten([Info|Acc]))}
-        end
+handle_message({ok, Info}, Shard, {Counters0, Acc, U}) ->
+    NewAcc = append_result(Info, Shard, Acc, U),
+    Counters = fabric_dict:store(Shard, ok, Counters0),
+    case is_complete(Counters) of
+    false ->
+        {ok, {Counters, NewAcc, U}};
+    true ->
+        Pending = aggregate_pending(NewAcc),
+        Infos = get_infos(NewAcc),
+        Results = [{updates_pending, {Pending}} | merge_results(Infos)],
+        {stop, Results}
     end;
 handle_message(_, _, Acc) ->
     {ok, Acc}.
 
+acc_init(Workers, Ushards) ->
+    Set = sets:from_list([{Id, N} || #shard{name = Id, node = N} <- Ushards]),
+    {fabric_dict:init(Workers, nil), dict:new(), Set}.
+
+is_complete(Counters) ->
+    not fabric_dict:any(nil, Counters).
+
+append_result(Info, #shard{name = Name, node = Node}, Acc, Ushards) ->
+    IsPreferred = sets:is_element({Name, Node}, Ushards),
+    dict:append(Name, {Node, IsPreferred, Info}, Acc).
+
+get_infos(Acc) ->
+    Values = [V || {_, V} <- dict:to_list(Acc)],
+    lists:flatten([Info || {_Node, _Pref, Info} <- lists:flatten(Values)]).
+
+aggregate_pending(Dict) ->
+    {Preferred, Total, Minimum} =
+        dict:fold(fun(_Name, Results, {P, T, M}) ->
+            {Preferred, Total, Minimum} = calculate_pending(Results),
+            {P + Preferred, T + Total, M + Minimum}
+        end, {0, 0, 0}, Dict),
+    [
+        {minimum, Minimum},
+        {preferred, Preferred},
+        {total, Total}
+    ].
+
+calculate_pending(Results) ->
+    lists:foldl(fun
+    ({_Node, true, Info}, {P, T, V}) ->
+       Pending = couch_util:get_value(pending_updates, Info),
+       {P + Pending, T + Pending, min(Pending, V)};
+    ({_Node, false, Info}, {P, T, V}) ->
+       Pending = couch_util:get_value(pending_updates, Info),
+       {P, T + Pending, min(Pending, V)}
+    end, {0, 0, infinity}, Results).
+
 merge_results(Info) ->
     Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end,
         orddict:new(), Info),
     orddict:fold(fun
-        (signature, [X|_], Acc) ->
+        (signature, [X | _], Acc) ->
             [{signature, X} | Acc];
-        (language, [X|_], Acc) ->
+        (language, [X | _], Acc) ->
             [{language, X} | Acc];
         (disk_size, X, Acc) -> % legacy
             [{disk_size, lists:sum(X)} | Acc];


Mime
View raw message