couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] 06/08: [08/N] Clustered Purge: Fabric API
Date Wed, 30 May 2018 22:07:41 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f0922b6c5101bd9634a8a89436a778a281adc2b7
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Tue Apr 24 12:27:43 2018 -0500

    [08/N] Clustered Purge: Fabric API
    
    This commit implements the clustered API for performing purge requests.
    This change should be a fairly straightforward change for anyone already
    familiar with the general implementation of a fabric coordinator given
    that the purge API is fairly simple.
    
    TODO: This includes pre-emptive changes for read-repair that need to go
    into the next commit (and then probably swap this commit to before we
    implement anti-entropy updates)
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <mayyas@ca.ibm.com>
    Co-authored-by: jiangphcn <jiangph@cn.ibm.com>
---
 src/fabric/rebar.config             |   4 +-
 src/fabric/src/fabric.erl           |  27 +-
 src/fabric/src/fabric_db_info.erl   |  29 +-
 src/fabric/src/fabric_db_meta.erl   |  26 +-
 src/fabric/src/fabric_doc_open.erl  |  11 +-
 src/fabric/src/fabric_doc_purge.erl | 572 ++++++++++++++++++++++++++++++++++++
 src/fabric/src/fabric_rpc.erl       |  98 +++++-
 7 files changed, 738 insertions(+), 29 deletions(-)

diff --git a/src/fabric/rebar.config b/src/fabric/rebar.config
index 362c878..3f51af3 100644
--- a/src/fabric/rebar.config
+++ b/src/fabric/rebar.config
@@ -10,5 +10,5 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
-{cover_enabled, true}.
-{cover_print_enabled, true}.
+%{cover_enabled, true}.
+%{cover_print_enabled, true}.
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 4a07271..b2e71dc 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -21,12 +21,13 @@
     delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
     set_security/2, set_security/3, get_revs_limit/1, get_security/1,
     get_security/2, get_all_security/1, get_all_security/2,
+    get_purge_infos_limit/1, set_purge_infos_limit/3,
     compact/1, compact/2]).
 
 % Documents
 -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3,
     get_missing_revs/2, get_missing_revs/3, update_doc/3, update_docs/3,
-    purge_docs/2, att_receiver/2]).
+    purge_docs/3, att_receiver/2]).
 
 % Views
 -export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4,
@@ -137,6 +138,18 @@ set_security(DbName, SecObj) ->
 set_security(DbName, SecObj, Options) ->
     fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
 
+%% @doc sets the upper bound for the number of stored purge requests
+-spec set_purge_infos_limit(dbname(), pos_integer(), [option()]) -> ok.
+set_purge_infos_limit(DbName, Limit, Options)
+        when is_integer(Limit), Limit > 0 ->
+    fabric_db_meta:set_purge_infos_limit(dbname(DbName), Limit, opts(Options)).
+
+%% @doc retrieves the upper bound for the number of stored purge requests
+-spec get_purge_infos_limit(dbname()) -> pos_integer() | no_return().
+get_purge_infos_limit(DbName) ->
+    {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
+    try couch_db:get_purge_infos_limit(Db) after catch couch_db:close(Db) end.
+
 get_security(DbName) ->
     get_security(DbName, [?ADMIN_CTX]).
 
@@ -267,8 +280,16 @@ update_docs(DbName, Docs, Options) ->
         {aborted, PreCommitFailures}
     end.
 
-purge_docs(_DbName, _IdsRevs) ->
-    not_implemented.
+
+%% @doc purge revisions for a list '{Id, Revs}'
+%%      returns {ok, {PurgeSeq, Results}}
+-spec purge_docs(dbname(), [{docid(), [revision()]}], [option()]) ->
+    {ok, [{Health, [revision()]}] | {error, any()}} when
+    Health :: ok | accepted.
+purge_docs(DbName, IdsRevs, Options) when is_list(IdsRevs) ->
+    IdsRevs2 = [idrevs(IdRs) || IdRs <- IdsRevs],
+    fabric_doc_purge:go(dbname(DbName), IdsRevs2, opts(Options)).
+
 
 %% @doc spawns a process to upload attachment data and
 %%      returns a function that shards can use to communicate
diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl
index 98e8e52..97a31c2 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -23,10 +23,12 @@ go(DbName) ->
     RexiMon = fabric_util:create_monitors(Shards),
     Fun = fun handle_message/3,
     {ok, ClusterInfo} = get_cluster_info(Shards),
-    Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}]},
+    Acc0 = {fabric_dict:init(Workers, nil), [], [{cluster, ClusterInfo}]},
     try
         case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
-            {ok, Acc} -> {ok, Acc};
+
+            {ok, Acc} ->
+                {ok, Acc};
             {timeout, {WorkersDict, _}} ->
                 DefunctWorkers = fabric_util:remove_done_workers(
                     WorkersDict,
@@ -37,44 +39,49 @@ go(DbName) ->
                     "get_db_info"
                 ),
                 {error, timeout};
-            {error, Error} -> throw(Error)
+            {error, Error} ->
+                throw(Error)
         end
     after
         rexi_monitor:stop(RexiMon)
     end.
 
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
+handle_message({rexi_DOWN,
+        _, {_,NodeRef},_}, _Shard, {Counters, PseqAcc, Acc}) ->
     case fabric_util:remove_down_workers(Counters, NodeRef) of
     {ok, NewCounters} ->
-        {ok, {NewCounters, Acc}};
+        {ok, {NewCounters, PseqAcc, Acc}};
     error ->
         {error, {nodedown, <<"progress not possible">>}}
     end;
 
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, PseqAcc, Acc}) ->
     NewCounters = fabric_dict:erase(Shard, Counters),
     case fabric_view:is_progress_possible(NewCounters) of
     true ->
-        {ok, {NewCounters, Acc}};
+        {ok, {NewCounters, PseqAcc, Acc}};
     false ->
         {error, Reason}
     end;
 
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
+handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, PseqAcc, Acc}) ->
     case fabric_dict:lookup_element(Shard, Counters) of
     undefined ->
         % already heard from someone else in this range
-        {ok, {Counters, Acc}};
+        {ok, {Counters, PseqAcc, Acc}};
     nil ->
         Seq = couch_util:get_value(update_seq, Info),
         C1 = fabric_dict:store(Shard, Seq, Counters),
         C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+        PSeq = couch_util:get_value(purge_seq, Info),
+        NewPseqAcc = [{Shard, PSeq}|PseqAcc],
         case fabric_dict:any(nil, C2) of
         true ->
-            {ok, {C2, [Info|Acc]}};
+            {ok, {C2, NewPseqAcc, [Info|Acc]}};
         false ->
             {stop, [
                 {db_name,Name},
+                {purge_seq, fabric_view_changes:pack_seqs(NewPseqAcc)},
                 {update_seq, fabric_view_changes:pack_seqs(C2)} |
                 merge_results(lists:flatten([Info|Acc]))
             ]}
@@ -91,8 +98,6 @@ merge_results(Info) ->
             [{doc_count, lists:sum(X)} | Acc];
         (doc_del_count, X, Acc) ->
             [{doc_del_count, lists:sum(X)} | Acc];
-        (purge_seq, X, Acc) ->
-            [{purge_seq, lists:sum(X)} | Acc];
         (compact_running, X, Acc) ->
             [{compact_running, lists:member(true, X)} | Acc];
         (disk_size, X, Acc) -> % legacy
diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl
index 367ef06..26e1b37 100644
--- a/src/fabric/src/fabric_db_meta.erl
+++ b/src/fabric/src/fabric_db_meta.erl
@@ -12,7 +12,8 @@
 
 -module(fabric_db_meta).
 
--export([set_revs_limit/3, set_security/3, get_all_security/2]).
+-export([set_revs_limit/3, set_security/3, get_all_security/2,
+    set_purge_infos_limit/3]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -48,6 +49,29 @@ handle_revs_message(Error, _, _Acc) ->
     {error, Error}.
 
 
+set_purge_infos_limit(DbName, Limit, Options) ->
+    Shards = mem3:shards(DbName),
+    Workers = fabric_util:submit_jobs(Shards, set_purge_infos_limit, [Limit, Options]),
+    Handler = fun handle_purge_message/3,
+    Acc0 = {Workers, length(Workers) - 1},
+    case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of
+        {ok, ok} ->
+            ok;
+        {timeout, {DefunctWorkers, _}} ->
+            fabric_util:log_timeout(DefunctWorkers, "set_purged_docs_limit"),
+            {error, timeout};
+        Error ->
+            Error
+    end.
+
+handle_purge_message(ok, _, {_Workers, 0}) ->
+    {stop, ok};
+handle_purge_message(ok, Worker, {Workers, Waiting}) ->
+    {ok, {lists:delete(Worker, Workers), Waiting - 1}};
+handle_purge_message(Error, _, _Acc) ->
+    {error, Error}.
+
+
 set_security(DbName, SecObj, Options) ->
     Shards = mem3:shards(DbName),
     RexiMon = fabric_util:create_monitors(Shards),
diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl
index 93f73a8..9c5c3c2 100644
--- a/src/fabric/src/fabric_doc_open.erl
+++ b/src/fabric/src/fabric_doc_open.erl
@@ -25,7 +25,8 @@
     r,
     state,
     replies,
-    q_reply
+    q_reply,
+    replies_by_node=[] %[{Node, Reply}] used for checking if a doc is purged
 }).
 
 
@@ -83,7 +84,8 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
     end;
 handle_message(Reply, Worker, Acc) ->
     NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
-    NewAcc = Acc#acc{replies = NewReplies},
+    NewNReplies = [{Worker#shard.node, Reply}|Acc#acc.replies_by_node],
+    NewAcc = Acc#acc{replies = NewReplies, replies_by_node = NewNReplies},
     case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
     {true, QuorumReply} ->
         fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
@@ -122,14 +124,15 @@ is_r_met(Workers, Replies, R) ->
         no_more_workers
     end.
 
-read_repair(#acc{dbname=DbName, replies=Replies}) ->
+read_repair(#acc{dbname=DbName, replies=Replies, replies_by_node=NReplies0}) ->
     Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
+    NReplies = [{Node, Doc} || {Node, {ok, #doc{}=Doc}} <- NReplies0],
     case Docs of
     % omit local docs from read repair
     [#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
         choose_reply(Docs);
     [#doc{id=Id} | _] ->
-        Opts = [replicated_changes, ?ADMIN_CTX],
+        Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NReplies}],
         Res = fabric:update_docs(DbName, Docs, Opts),
         case Res of
             {ok, []} ->
diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl
new file mode 100644
index 0000000..61b84d8
--- /dev/null
+++ b/src/fabric/src/fabric_doc_purge.erl
@@ -0,0 +1,572 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_purge).
+
+
+-export([
+    go/3
+]).
+
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+-record(acc, {
+    worker_uuids,
+    resps,
+    uuid_counts,
+    w
+}).
+
+
+go(_, [], _) ->
+    {ok, []};
+go(DbName, IdsRevs, Options) ->
+    % Generate our purge requests of {UUID, DocId, Revs}
+    {UUIDs, Reqs} = create_reqs(IdsRevs, [], []),
+
+    % Fire off rexi workers for each shard.
+    {Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) ->
+        #shard{name = ShardDbName, node = Node} = Shard,
+        Args = [ShardDbName, ShardReqs, Options],
+        Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}),
+        Worker = Shard#shard{ref=Ref},
+        ShardUUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs],
+        {[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]}
+    end, {[], []}, group_reqs_by_shard(DbName, Reqs)),
+
+    UUIDCounts = lists:foldl(fun({_Worker, WUUIDs}, CountAcc) ->
+        lists:foldl(fun(UUID, InnerCountAcc) ->
+            dict:update_counter(UUID, 1, InnerCountAcc)
+        end, CountAcc, WUUIDs)
+    end, dict:new(), WorkerUUIDs),
+
+    RexiMon = fabric_util:create_monitors(Workers),
+    Timeout = fabric_util:request_timeout(),
+    Acc0 = #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = dict:from_list([{UUID, []} || UUID <- UUIDs]),
+        uuid_counts = UUIDCounts,
+        w = w(DbName, Options)
+    },
+    Acc2 = try rexi_utils:recv(Workers, #shard.ref,
+            fun handle_message/3, Acc0, infinity, Timeout) of
+        {ok, Acc1} ->
+            Acc1;
+        {timeout, Acc1} ->
+            #acc{
+                worker_uuids = WorkerUUIDs,
+                resps = Resps
+            } = Acc1,
+            DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs],
+            fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
+            NewResps = append_errors(timeout, WorkerUUIDs, Resps),
+            Acc1#acc{worker_uuids = [], resps = NewResps};
+        Else ->
+            Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end,
+
+    FinalResps = format_resps(UUIDs, Acc2),
+    {resp_health(FinalResps), FinalResps}.
+
+
+handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    Pred = fun({#shard{node = N}, _}) -> N == Node end,
+    {Failed, Rest} = lists:partition(Pred, WorkerUUIDs),
+    NewResps = append_errors(internal_server_error, Failed, Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({rexi_EXIT, _}, Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+    NewResps = append_errors(internal_server_error, [WorkerPair], Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({ok, Replies}, Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+    NewResps = append_resps(UUIDs, Replies, Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({bad_request, Msg}, _, _) ->
+    throw({bad_request, Msg}).
+
+
+create_reqs([], UUIDs, Reqs) ->
+    {lists:reverse(UUIDs), lists:reverse(Reqs)};
+
+create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) ->
+    UUID = couch_uuids:new(),
+    NewUUIDs = [UUID | UUIDs],
+    NewReqs = [{UUID, Id, Revs} | Reqs],
+    create_reqs(RestIdsRevs, NewUUIDs, NewReqs).
+
+
+group_reqs_by_shard(DbName, Reqs) ->
+    lists:foldl(fun({_UUID, Id, _Revs} = Req, D0) ->
+        lists:foldl(fun(Shard, D1) ->
+            dict:append(Shard, Req, D1)
+        end, D0, mem3:shards(DbName, Id))
+    end, dict:new(), Reqs).
+
+
+w(DbName, Options) ->
+    try
+        list_to_integer(couch_util:get_value(w, Options))
+    catch _:_ ->
+        mem3:quorum(DbName)
+    end.
+
+
+append_errors(Type, WorkerUUIDs, Resps) ->
+    lists:foldl(fun({_Worker, UUIDs}, RespAcc) ->
+        Errors = [{error, Type} || _UUID <- UUIDs],
+        append_resps(UUIDs, Errors, RespAcc)
+    end, Resps, WorkerUUIDs).
+
+
+append_resps([], [], Resps) ->
+    Resps;
+append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) ->
+    NewResps = dict:append(UUID, Reply, Resps),
+    append_resps(RestUUIDs, RestReplies, NewResps).
+
+
+maybe_stop(#acc{worker_uuids = []} = Acc) ->
+    {stop, Acc};
+maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) ->
+    try
+        dict:fold(fun(UUID, UUIDResps, _) ->
+            UUIDCount = dict:fetch(UUID, Counts),
+            case has_quorum(UUIDResps, UUIDCount, W) of
+                true -> ok;
+                false -> throw(keep_going)
+            end
+        end, nil, Resps),
+        {stop, Acc}
+    catch throw:keep_going ->
+        {ok, Acc}
+    end.
+
+
+format_resps(UUIDs, #acc{} = Acc) ->
+    #acc{
+        resps = Resps,
+        w = W
+    } = Acc,
+    FoldFun = fun(UUID, Replies, ReplyAcc) ->
+        OkReplies = [Reply || {ok, Reply} <- Replies],
+        case OkReplies of
+            [] ->
+                [Error | _] = lists:usort(Replies),
+                [{UUID, Error} | ReplyAcc];
+            _ ->
+                AllRevs = lists:usort(lists:flatten(OkReplies)),
+                IsOk = length(OkReplies) >= W
+                        andalso length(lists:usort(OkReplies)) == 1,
+                Health = if IsOk -> ok; true -> accepted end,
+                [{UUID, {Health, AllRevs}} | ReplyAcc]
+        end
+    end,
+    FinalReplies = dict:fold(FoldFun, {ok, []}, Resps),
+    couch_util:reorder_results(UUIDs, FinalReplies);
+
+format_resps(_UUIDs, Else) ->
+    Else.
+
+
+resp_health(Resps) ->
+    Healths = lists:usort([H || {H, _} <- Resps]),
+    HasError = lists:member(error, Healths),
+    HasAccepted = lists:member(accepted, Healths),
+    AllOk = Healths == [ok],
+    if
+        HasError -> error;
+        HasAccepted -> accepted;
+        AllOk -> ok;
+        true -> error
+    end.
+
+
+has_quorum(Resps, Count, W) ->
+    OkResps = [R || {ok, _} = R <- Resps],
+    OkCounts = lists:foldl(fun(R, Acc) ->
+        dict:update_counter(R, 1, Acc)
+    end, dict:new(), OkResps),
+    MaxOk = lists:max([0 | element(2, lists:unzip(dict:to_list(OkCounts)))]),
+    if
+        MaxOk >= W -> true;
+        length(Resps) >= Count -> true;
+        true -> false
+    end.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+purge_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_w2_ok(),
+            t_w3_ok(),
+
+            t_w2_mixed_accepted(),
+            t_w3_mixed_accepted(),
+
+            t_w2_exit1_ok(),
+            t_w2_exit2_accepted(),
+            t_w2_exit3_error(),
+
+            t_w4_accepted(),
+
+            t_mixed_ok_accepted(),
+            t_mixed_errors()
+        ]
+    }.
+
+
+setup() ->
+    meck:new(couch_log),
+    meck:expect(couch_log, warning, fun(_, _) -> ok end),
+    meck:expect(couch_log, notice, fun(_, _) -> ok end).
+
+
+teardown(_) ->
+    meck:unload().
+
+
+t_w2_ok() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, true),
+
+        Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(ok, resp_health(Resps))
+    end).
+
+
+t_w3_ok() ->
+    ?_test(begin
+        Acc0 = create_init_acc(3),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(ok, resp_health(Resps))
+    end).
+
+
+t_w2_mixed_accepted() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
+        Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
+
+        {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [
+            {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
+            {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
+        ],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+
+
+t_w3_mixed_accepted() ->
+    ?_test(begin
+        Acc0 = create_init_acc(3),
+        Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
+        Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
+
+        {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [
+            {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
+            {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
+        ],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+
+
+t_w2_exit1_ok() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(ok, resp_health(Resps))
+    end).
+
+
+t_w2_exit2_accepted() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+
+
+t_w2_exit3_error() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        ExitMsg = {rexi_EXIT, blargh},
+
+        {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [
+            {error, internal_server_error},
+            {error, internal_server_error}
+        ],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(error, resp_health(Resps))
+    end).
+
+
+t_w4_accepted() ->
+    % Make sure we return when all workers have responded
+    % rather than wait around for a timeout if a user asks
+    % for a qourum with more than the available number of
+    % shards.
+    ?_test(begin
+        Acc0 = create_init_acc(4),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+
+
+t_mixed_ok_accepted() ->
+    ?_test(begin
+        WorkerUUIDs = [
+            {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
+
+            {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+        ],
+
+        Acc0 = #acc{
+            worker_uuids = WorkerUUIDs,
+            resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>,
[]}]),
+            uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>,
3}]),
+            w = 2
+        },
+
+        Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]},
+        Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+
+        {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+        {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1),
+        {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
+        {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
+        {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4),
+
+        Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+
+
+t_mixed_errors() ->
+    ?_test(begin
+        WorkerUUIDs = [
+            {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
+
+            {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+        ],
+
+        Acc0 = #acc{
+            worker_uuids = WorkerUUIDs,
+            resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>,
[]}]),
+            uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>,
3}]),
+            w = 2
+        },
+
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
+        {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
+        {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4),
+
+        Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(error, resp_health(Resps))
+    end).
+
+
+create_init_acc(W) ->
+    UUID1 = <<"uuid1">>,
+    UUID2 = <<"uuid2">>,
+
+    Nodes = [node1, node2, node3],
+    Shards = mem3_util:create_partition_map(<<"foo">>, 3, 1, Nodes),
+
+    % Create our worker_uuids. We're relying on the fact that
+    % we're using a fake Q=1 db so we don't have to worry
+    % about any hashing here.
+    WorkerUUIDs = lists:map(fun(Shard) ->
+        {Shard#shard{ref = erlang:make_ref()}, [UUID1, UUID2]}
+    end, Shards),
+
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = dict:from_list([{UUID1, []}, {UUID2, []}]),
+        uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]),
+        w = W
+    }.
+
+
+worker(N, #acc{worker_uuids = WorkerUUIDs}) ->
+    {Worker, _} = lists:nth(N, WorkerUUIDs),
+    Worker.
+
+
+check_quorum(Acc, Expect) ->
+    dict:fold(fun(_Shard, Resps, _) ->
+        ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w))
+    end, nil, Acc#acc.resps).
+
+-endif.
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 4a69e7e..a64d546 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -21,6 +21,7 @@
     delete_shard_db_doc/2]).
 -export([get_all_security/2, open_shard/2]).
 -export([compact/1, compact/2]).
+-export([get_purge_seq/2, purge_docs/3, set_purge_infos_limit/3]).
 
 -export([get_db_info/2, get_doc_count/2, get_update_seq/2,
          changes/4, map_view/5, reduce_view/5, group_info/3, update_mrview/4]).
@@ -201,6 +202,9 @@ get_all_security(DbName, Options) ->
 set_revs_limit(DbName, Limit, Options) ->
     with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
 
+set_purge_infos_limit(DbName, Limit, Options) ->
+    with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}).
+
 open_doc(DbName, DocId, Options) ->
     with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
 
@@ -236,14 +240,25 @@ get_missing_revs(DbName, IdRevsList, Options) ->
     end).
 
 update_docs(DbName, Docs0, Options) ->
-    case proplists:get_value(replicated_changes, Options) of
-    true ->
-        X = replicated_changes;
-    _ ->
-        X = interactive_edit
+    X = case proplists:get_value(replicated_changes, Options) of
+        true -> replicated_changes;
+        _ -> interactive_edit
     end,
-    Docs = make_att_readers(Docs0),
-    with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+    DocsByNode = couch_util:get_value(read_repair, Options),
+    case {X, DocsByNode} of
+        {_, undefined} ->
+            Docs = make_att_readers(Docs0),
+            with_db(DbName, Options,
+                {couch_db, update_docs, [Docs, Options, X]});
+        {replicated_changes, _} ->
+            update_docs_read_repair(DbName, DocsByNode, Options)
+    end.
+
+get_purge_seq(DbName, Options) ->
+    with_db(DbName, Options, {couch_db, get_purge_seq, []}).
+
+purge_docs(DbName, UUIdsIdsRevs, Options) ->
+    with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, Options]}).
 
 %% @equiv group_info(DbName, DDocId, [])
 group_info(DbName, DDocId) ->
@@ -298,6 +313,75 @@ with_db(DbName, Options, {M,F,A}) ->
         rexi:reply(Error)
     end.
 
+
+update_docs_read_repair(DbName, DocsByNode, Options) ->
+    set_io_priority(DbName, Options),
+    case get_or_create_db(DbName, Options) of
+    {ok, Db} ->
+        % omit Revisions that have been purged
+        Docs = filter_purged_revs(Db, DocsByNode),
+        Docs2 = make_att_readers(Docs),
+        {M,F,A} = {couch_db, update_docs, [Docs2, Options, replicated_changes]},
+        rexi:reply(try
+            apply(M, F, [Db | A])
+        catch Exception ->
+            Exception;
+        error:Reason ->
+            couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
+                clean_stack()]),
+            {error, Reason}
+        end);
+    Error ->
+        rexi:reply(Error)
+    end.
+
+
+% given [{Node, Doc}] diff revs of the same DocID from diff nodes
+% returns [Doc] filtering out purged docs.
+% This is done for read-repair from fabric_doc_open,
+% so that not to recreate Docs that have been purged before
+% on this node() from Nodes that are out of sync.
+filter_purged_revs(Db, DocsByNode) ->
+    AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
+    {ok, DbPSeq} = couch_db:get_purge_seq(Db),
+    PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) ->  [{Id, Revs}|Acc]  end,
+    V = "v" ++ config:get("purge", "version", "1") ++ "-",
+    StartKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++  V ++ "mem3-"),
+    EndKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem31"),
+    Opts = [{start_key, StartKey}, {end_key_gt, EndKey}],
+    % go through _local/purge-mem3-.. docs
+    % find Node that this LDoc corresponds to
+    % check if update from Node has not been recently purged on current node
+    LDocsFoldFun = fun(#doc{body={Props}}, Acc) ->
+        {VOps} = couch_util:get_value(<<"verify_options">>, Props),
+        Node = couch_util:get_value(<<"node">>, VOps),
+        Result = lists:keyfind(Node, 1, DocsByNode),
+        NewAcc = if not Result -> Acc; true ->
+            {Node, Doc} = Result,
+            NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
+            if  NodePSeq == DbPSeq ->
+                    [Doc|Acc];
+                (NodePSeq+AllowedPSeqLag) < DbPSeq ->
+                    % Node is very out of sync, ignore updates from it
+                    Acc;
+                true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
+                    % if Doc has been purged recently, than ignore it
+                    {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
+                            NodePSeq, PurgeFoldFun, [], []),
+                    {Start, [FirstRevId|_]} = Doc#doc.revs,
+                    DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
+                    case lists:member(DocIdRevs, PurgedIdsRevs) of
+                        true -> Acc;
+                        false -> [Doc|Acc]
+                    end
+            end
+        end,
+        {ok, NewAcc}
+    end,
+    {ok, Docs} = couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
+    Docs.
+
+
 get_or_create_db(DbName, Options) ->
     couch_db:open_int(DbName, [{create_if_missing, true} | Options]).
 

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.

Mime
View raw message