couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] 08/10: [08/10] Clustered Purge: Update read-repair
Date Fri, 06 Jul 2018 17:37:31 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit df36be6d07adb9a6b39bb243200c4a0f72a86c88
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Wed May 30 17:26:02 2018 -0500

    [08/10] Clustered Purge: Update read-repair
    
    Read-repair needs to know which nodes have requested an update to a
    local doc so that it can determine if the update is applied. The basic
    idea here is that we may have gotten an update from a remote node that
    has yet to apply a purge request. If the local node were to apply this
    update it would effectively undo a succesful purge request.
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <mayyas@ca.ibm.com>
    Co-authored-by: jiangphcn <jiangph@cn.ibm.com>
---
 src/fabric/src/fabric_doc_open.erl         |  73 +++++++-
 src/fabric/src/fabric_doc_open_revs.erl    | 262 +++++++++++++++++++++-----
 src/fabric/src/fabric_rpc.erl              | 128 ++++++++++++-
 src/fabric/test/fabric_rpc_purge_tests.erl | 285 +++++++++++++++++++++++++++++
 4 files changed, 692 insertions(+), 56 deletions(-)

diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl
index 93f73a8..0a85346 100644
--- a/src/fabric/src/fabric_doc_open.erl
+++ b/src/fabric/src/fabric_doc_open.erl
@@ -25,6 +25,7 @@
     r,
     state,
     replies,
+    node_revs = [],
     q_reply
 }).
 
@@ -83,7 +84,13 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
     end;
 handle_message(Reply, Worker, Acc) ->
     NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
-    NewAcc = Acc#acc{replies = NewReplies},
+    NewNodeRevs = case Reply of
+        {ok, #doc{revs = {Pos, [Rev | _]}}} ->
+            [{Worker#shard.node, [{Pos, Rev}]} | Acc#acc.node_revs];
+        _ ->
+            Acc#acc.node_revs
+    end,
+    NewAcc = Acc#acc{replies = NewReplies, node_revs = NewNodeRevs},
     case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
     {true, QuorumReply} ->
         fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
@@ -122,14 +129,14 @@ is_r_met(Workers, Replies, R) ->
         no_more_workers
     end.
 
-read_repair(#acc{dbname=DbName, replies=Replies}) ->
+read_repair(#acc{dbname=DbName, replies=Replies, node_revs=NodeRevs}) ->
     Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
     case Docs of
     % omit local docs from read repair
     [#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
         choose_reply(Docs);
     [#doc{id=Id} | _] ->
-        Opts = [replicated_changes, ?ADMIN_CTX],
+        Opts = [?ADMIN_CTX, {read_repair, NodeRevs}],
         Res = fabric:update_docs(DbName, Docs, Opts),
         case Res of
             {ok, []} ->
@@ -205,6 +212,7 @@ open_doc_test_() ->
             t_handle_message_down(),
             t_handle_message_exit(),
             t_handle_message_reply(),
+            t_store_node_revs(),
             t_read_repair(),
             t_handle_response_quorum_met(),
             t_get_doc_info()
@@ -397,6 +405,65 @@ t_handle_message_reply() ->
     end).
 
 
+t_store_node_revs() ->
+    W1 = #shard{node = w1, ref = erlang:make_ref()},
+    W2 = #shard{node = w2, ref = erlang:make_ref()},
+    W3 = #shard{node = w3, ref = erlang:make_ref()},
+    Foo1 = {ok, #doc{id = <<"bar">>, revs = {1, [<<"foo">>]}}},
+    Foo2 = {ok, #doc{id = <<"bar">>, revs = {2, [<<"foo2">>, <<"foo">>]}}},
+    NFM = {not_found, missing},
+
+    InitAcc = #acc{workers = [W1, W2, W3], replies = [], r = 2},
+
+    ?_test(begin
+        meck:expect(rexi, kill, fun(_, _) -> ok end),
+
+        % Simple case
+        {ok, #acc{node_revs = NodeRevs1}} = handle_message(Foo1, W1, InitAcc),
+        ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs1),
+
+        % Make sure we only hold the head rev
+        {ok, #acc{node_revs = NodeRevs2}} = handle_message(Foo2, W1, InitAcc),
+        ?assertEqual([{w1, [{2, <<"foo2">>}]}], NodeRevs2),
+
+        % Make sure we don't capture anything on error
+        {ok, #acc{node_revs = NodeRevs3}} = handle_message(NFM, W1, InitAcc),
+        ?assertEqual([], NodeRevs3),
+
+        % Make sure we accumulate node revs
+        Acc1 = InitAcc#acc{node_revs = [{w1, [{1, <<"foo">>}]}]},
+        {ok, #acc{node_revs = NodeRevs4}} = handle_message(Foo2, W2, Acc1),
+        ?assertEqual(
+                [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}],
+                NodeRevs4
+            ),
+
+        % Make sure rexi_DOWN doesn't modify node_revs
+        Down = {rexi_DOWN, nil, {nil, w1}, nil},
+        {ok, #acc{node_revs = NodeRevs5}} = handle_message(Down, W2, Acc1),
+        ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs5),
+
+        % Make sure rexi_EXIT doesn't modify node_revs
+        Exit = {rexi_EXIT, reason},
+        {ok, #acc{node_revs = NodeRevs6}} = handle_message(Exit, W2, Acc1),
+        ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs6),
+
+        % Make sure an error doesn't remove any node revs
+        {ok, #acc{node_revs = NodeRevs7}} = handle_message(NFM, W2, Acc1),
+        ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs7),
+
+        % Make sure we have all of our node_revs when meeting
+        % quorum
+        {ok, Acc2} = handle_message(Foo1, W1, InitAcc),
+        {ok, Acc3} = handle_message(Foo2, W2, Acc2),
+        {stop, Acc4} = handle_message(NFM, W3, Acc3),
+        ?assertEqual(
+                [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}],
+                Acc4#acc.node_revs
+            )
+    end).
+
+
 t_read_repair() ->
     Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}},
     Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}},
diff --git a/src/fabric/src/fabric_doc_open_revs.erl b/src/fabric/src/fabric_doc_open_revs.erl
index 096722f..234b108 100644
--- a/src/fabric/src/fabric_doc_open_revs.erl
+++ b/src/fabric/src/fabric_doc_open_revs.erl
@@ -29,6 +29,7 @@
     revs,
     latest,
     replies = [],
+    node_revs = [],
     repair = false
 }).
 
@@ -82,6 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
         worker_count = WorkerCount,
         workers = Workers,
         replies = PrevReplies,
+        node_revs = PrevNodeRevs,
         r = R,
         revs = Revs,
         latest = Latest,
@@ -92,7 +94,6 @@ handle_message({ok, RawReplies}, Worker, State) ->
     IsTree = Revs == all orelse Latest,
 
     % Do not count error replies when checking quorum
-
     RealReplyCount = ReplyCount + 1 - ReplyErrorCount,
     QuorumReplies = RealReplyCount >= R,
     {NewReplies, QuorumMet, Repair} = case IsTree of
@@ -102,11 +103,23 @@ handle_message({ok, RawReplies}, Worker, State) ->
             NumLeafs = couch_key_tree:count_leafs(PrevReplies),
             SameNumRevs = length(RawReplies) == NumLeafs,
             QMet = AllInternal andalso SameNumRevs andalso QuorumReplies,
-            {NewReplies0, QMet, Repair0};
+            % Don't set repair=true on the first reply
+            {NewReplies0, QMet, (ReplyCount > 0) and Repair0};
         false ->
             {NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies),
             {NewReplies0, MinCount >= R, false}
     end,
+    NewNodeRevs = if Worker == nil -> PrevNodeRevs; true ->
+        IdRevs = lists:foldl(fun
+            ({ok, #doc{revs = {Pos, [Rev | _]}}}, Acc) ->
+                [{Pos, Rev} | Acc];
+            (_, Acc) ->
+                Acc
+        end, [], RawReplies),
+        if IdRevs == [] -> PrevNodeRevs; true ->
+            [{Worker#shard.node, IdRevs} | PrevNodeRevs]
+        end
+    end,
 
     Complete = (ReplyCount =:= (WorkerCount - 1)),
 
@@ -117,6 +130,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
                     DbName,
                     IsTree,
                     NewReplies,
+                    NewNodeRevs,
                     ReplyCount + 1,
                     InRepair orelse Repair
                 ),
@@ -124,6 +138,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
         false ->
             {ok, State#state{
                 replies = NewReplies,
+                node_revs = NewNodeRevs,
                 reply_count = ReplyCount + 1,
                 workers = lists:delete(Worker, Workers),
                 repair = InRepair orelse Repair
@@ -180,7 +195,7 @@ dict_replies(Dict, [Reply | Rest]) ->
     dict_replies(NewDict, Rest).
 
 
-maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
+maybe_read_repair(Db, IsTree, Replies, NodeRevs, ReplyCount, DoRepair) ->
     Docs = case IsTree of
         true -> tree_repair_docs(Replies, DoRepair);
         false -> dict_repair_docs(Replies, ReplyCount)
@@ -189,7 +204,7 @@ maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
         [] ->
             ok;
         _ ->
-            erlang:spawn(fun() -> read_repair(Db, Docs) end)
+            erlang:spawn(fun() -> read_repair(Db, Docs, NodeRevs) end)
     end.
 
 
@@ -208,8 +223,9 @@ dict_repair_docs(Replies, ReplyCount) ->
     end.
 
 
-read_repair(Db, Docs) ->
-    Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]),
+read_repair(Db, Docs, NodeRevs) ->
+    Opts = [?ADMIN_CTX, {read_repair, NodeRevs}],
+    Res = fabric:update_docs(Db, Docs, Opts),
     case Res of
         {ok, []} ->
             couch_stats:increment_counter([fabric, read_repairs, success]);
@@ -268,20 +284,24 @@ filter_reply(Replies) ->
 setup() ->
     config:start_link([]),
     meck:new([fabric, couch_stats, couch_log]),
+    meck:new(fabric_util, [passthrough]),
     meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end),
     meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
-    meck:expect(couch_log, notice, fun(_, _) -> ok end).
+    meck:expect(couch_log, notice, fun(_, _) -> ok end),
+    meck:expect(fabric_util, cleanup, fun(_) -> ok end).
+
 
 
 teardown(_) ->
-    (catch meck:unload([fabric, couch_stats, couch_log])),
+    (catch meck:unload([fabric, couch_stats, couch_log, fabric_util])),
     config:stop().
 
 
 state0(Revs, Latest) ->
     #state{
         worker_count = 3,
-        workers = [w1, w2, w3],
+        workers =
+            [#shard{node='node1'}, #shard{node='node2'}, #shard{node='node3'}],
         r = 2,
         revs = Revs,
         latest = Latest
@@ -321,6 +341,14 @@ open_doc_revs_test_() ->
             check_worker_error_skipped(),
             check_quorum_only_counts_valid_responses(),
             check_empty_list_when_no_workers_reply(),
+            check_node_rev_stored(),
+            check_node_rev_store_head_only(),
+            check_node_rev_store_multiple(),
+            check_node_rev_dont_store_errors(),
+            check_node_rev_store_non_errors(),
+            check_node_rev_store_concatenate(),
+            check_node_rev_store_concantenate_multiple(),
+            check_node_rev_unmodified_on_down_or_exit(),
             check_not_found_replies_are_removed_when_doc_found(),
             check_not_found_returned_when_one_of_docs_not_found(),
             check_not_found_returned_when_doc_not_found()
@@ -334,27 +362,35 @@ open_doc_revs_test_() ->
 check_empty_response_not_quorum() ->
     % Simple smoke test that we don't think we're
     % done with a first empty response
+    W1 = #shard{node='node1'},
+    W2 = #shard{node='node2'},
+    W3 = #shard{node='node3'},
     ?_assertMatch(
-        {ok, #state{workers = [w2, w3]}},
-        handle_message({ok, []}, w1, state0(all, false))
+        {ok, #state{workers = [W2, W3]}},
+        handle_message({ok, []}, W1, state0(all, false))
     ).
 
 
 check_basic_response() ->
     % Check that we've handle a response
+    W1 = #shard{node='node1'},
+    W2 = #shard{node='node2'},
+    W3 = #shard{node='node3'},
     ?_assertMatch(
-        {ok, #state{reply_count = 1, workers = [w2, w3]}},
-        handle_message({ok, [foo1(), bar1()]}, w1, state0(all, false))
+        {ok, #state{reply_count = 1, workers = [W2, W3]}},
+        handle_message({ok, [foo1(), bar1()]}, W1, state0(all, false))
     ).
 
 
 check_finish_quorum() ->
     % Two messages with the same revisions means we're done
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         Expect = {stop, [bar1(), foo1()]},
-        ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, w2, S1))
+        ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, W2, S1))
     end).
 
 
@@ -363,11 +399,13 @@ check_finish_quorum_newer() ->
     % foo1 should count for foo2 which means we're finished.
     % We also validate that read_repair was triggered.
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         Expect = {stop, [bar1(), foo2()]},
         ok = meck:reset(fabric),
-        ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, w2, S1)),
+        ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, W2, S1)),
         ok = meck:wait(fabric, update_docs, '_', 5000),
         ?assertMatch(
             [{_, {fabric, update_docs, [_, _, _]}, _}],
@@ -380,11 +418,14 @@ check_no_quorum_on_second() ->
     % Quorum not yet met for the foo revision so we
     % would wait for w3
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         ?assertMatch(
-            {ok, #state{workers = [w3]}},
-            handle_message({ok, [bar1()]}, w2, S1)
+            {ok, #state{workers = [W3]}},
+            handle_message({ok, [bar1()]}, W2, S1)
         )
     end).
 
@@ -394,11 +435,14 @@ check_done_on_third() ->
     % what. Every revision seen in this pattern should be
     % included.
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
-        {ok, S2} = handle_message({ok, [bar1()]}, w2, S1),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
+        {ok, S2} = handle_message({ok, [bar1()]}, W2, S1),
         Expect = {stop, [bar1(), foo1()]},
-        ?assertEqual(Expect, handle_message({ok, [bar1()]}, w3, S2))
+        ?assertEqual(Expect, handle_message({ok, [bar1()]}, W3, S2))
     end).
 
 
@@ -407,108 +451,234 @@ check_done_on_third() ->
 
 check_specific_revs_first_msg() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), false),
         ?assertMatch(
-            {ok, #state{reply_count = 1, workers = [w2, w3]}},
-            handle_message({ok, [foo1(), bar1(), bazNF()]}, w1, S0)
+            {ok, #state{reply_count = 1, workers = [W2, W3]}},
+            handle_message({ok, [foo1(), bar1(), bazNF()]}, W1, S0)
         )
     end).
 
 
 check_revs_done_on_agreement() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), false),
         Msg = {ok, [foo1(), bar1(), bazNF()]},
-        {ok, S1} = handle_message(Msg, w1, S0),
+        {ok, S1} = handle_message(Msg, W1, S0),
         Expect = {stop, [bar1(), foo1(), bazNF()]},
-        ?assertEqual(Expect, handle_message(Msg, w2, S1))
+        ?assertEqual(Expect, handle_message(Msg, W2, S1))
     end).
 
 
 check_latest_true() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo2(), bar1(), bazNF()]},
         Msg2 = {ok, [foo2(), bar1(), bazNF()]},
-        {ok, S1} = handle_message(Msg1, w1, S0),
+        {ok, S1} = handle_message(Msg1, W1, S0),
         Expect = {stop, [bar1(), foo2(), bazNF()]},
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1))
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1))
     end).
 
 
 check_ancestor_counted_in_quorum() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), bazNF()]},
         Msg2 = {ok, [foo2(), bar1(), bazNF()]},
         Expect = {stop, [bar1(), foo2(), bazNF()]},
 
         % Older first
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
 
         % Newer first
-        {ok, S2} = handle_message(Msg2, w2, S0),
-        ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+        {ok, S2} = handle_message(Msg2, W2, S0),
+        ?assertEqual(Expect, handle_message(Msg1, W1, S2))
     end).
 
 
 check_not_found_counts_for_descendant() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), bazNF()]},
         Msg2 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
         % not_found first
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
 
         % not_found second
-        {ok, S2} = handle_message(Msg2, w2, S0),
-        ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+        {ok, S2} = handle_message(Msg2, W2, S0),
+        ?assertEqual(Expect, handle_message(Msg1, W1, S2))
     end).
 
 
 check_worker_error_skipped() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), baz1()]},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
     end).
 
 
 check_quorum_only_counts_valid_responses() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {rexi_EXIT, reason},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
     end).
 
 
 check_empty_list_when_no_workers_reply() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {rexi_EXIT, reason},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {rexi_DOWN, nodedown, {nil, node()}, nil},
         Expect = {stop, all_workers_died},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
+    end).
+
+
+check_node_rev_stored() ->
+    ?_test(begin
+        W1 = #shard{node = node1},
+        S0 = state0([], true),
+
+        {ok, S1} = handle_message({ok, [foo1()]}, W1, S0),
+        ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs)
+    end).
+
+
+check_node_rev_store_head_only() ->
+    ?_test(begin
+        W1 = #shard{node = node1},
+        S0 = state0([], true),
+
+        {ok, S1} = handle_message({ok, [foo2()]}, W1, S0),
+        ?assertEqual([{node1, [{2, <<"foo2">>}]}], S1#state.node_revs)
+    end).
+
+
+check_node_rev_store_multiple() ->
+    ?_test(begin
+        W1 = #shard{node = node1},
+        S0 = state0([], true),
+
+        {ok, S1} = handle_message({ok, [foo1(), foo2()]}, W1, S0),
+        ?assertEqual(
+                [{node1, [{2, <<"foo2">>}, {1, <<"foo">>}]}],
+                S1#state.node_revs
+            )
+    end).
+
+
+check_node_rev_dont_store_errors() ->
+    ?_test(begin
+        W1 = #shard{node = node1},
+        S0 = state0([], true),
+
+        {ok, S1} = handle_message({ok, [barNF()]}, W1, S0),
+        ?assertEqual([], S1#state.node_revs)
+    end).
+
+
+check_node_rev_store_non_errors() ->
+    ?_test(begin
+        W1 = #shard{node = node1},
+        S0 = state0([], true),
+
+        {ok, S1} = handle_message({ok, [foo1(), barNF()]}, W1, S0),
+        ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs)
+    end).
+
+
+check_node_rev_store_concatenate() ->
+    ?_test(begin
+        W2 = #shard{node = node2},
+        S0 = state0([], true),
+        S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]},
+
+        {ok, S2} = handle_message({ok, [foo2()]}, W2, S1),
+        ?assertEqual(
+                [{node2, [{2, <<"foo2">>}]}, {node1, [{1, <<"foo">>}]}],
+                S2#state.node_revs
+            )
+    end).
+
+
+check_node_rev_store_concantenate_multiple() ->
+    ?_test(begin
+        W2 = #shard{node = node2},
+        S0 = state0([], true),
+        S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]},
+
+        {ok, S2} = handle_message({ok, [foo2(), bar1()]}, W2, S1),
+        ?assertEqual(
+                [
+                    {node2, [{1, <<"bar">>}, {2, <<"foo2">>}]},
+                    {node1, [{1, <<"foo">>}]}
+                ],
+                S2#state.node_revs
+            )
+    end).
+
+
+check_node_rev_unmodified_on_down_or_exit() ->
+    ?_test(begin
+        W2 = #shard{node = node2},
+        S0 = state0([], true),
+        S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]},
+
+        Down = {rexi_DOWN, nodedown, {nil, node()}, nil},
+        {ok, S2} = handle_message(Down, W2, S1),
+        ?assertEqual(
+                [{node1, [{1, <<"foo">>}]}],
+                S2#state.node_revs
+            ),
+
+        Exit = {rexi_EXIT, reason},
+        {ok, S3} = handle_message(Exit, W2, S1),
+        ?assertEqual(
+                [{node1, [{1, <<"foo">>}]}],
+                S3#state.node_revs
+            )
     end).
 
 
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 4a69e7e..d8a38ba 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -21,6 +21,7 @@
     delete_shard_db_doc/2]).
 -export([get_all_security/2, open_shard/2]).
 -export([compact/1, compact/2]).
+-export([get_purge_seq/2, purge_docs/3, set_purge_infos_limit/3]).
 
 -export([get_db_info/2, get_doc_count/2, get_update_seq/2,
          changes/4, map_view/5, reduce_view/5, group_info/3, update_mrview/4]).
@@ -201,6 +202,9 @@ get_all_security(DbName, Options) ->
 set_revs_limit(DbName, Limit, Options) ->
     with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
 
+set_purge_infos_limit(DbName, Limit, Options) ->
+    with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}).
+
 open_doc(DbName, DocId, Options) ->
     with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
 
@@ -236,14 +240,26 @@ get_missing_revs(DbName, IdRevsList, Options) ->
     end).
 
 update_docs(DbName, Docs0, Options) ->
-    case proplists:get_value(replicated_changes, Options) of
-    true ->
-        X = replicated_changes;
-    _ ->
-        X = interactive_edit
+    {Docs1, Type} = case couch_util:get_value(read_repair, Options) of
+        NodeRevs when is_list(NodeRevs) ->
+            Filtered = read_repair_filter(DbName, Docs0, NodeRevs, Options),
+            {Filtered, replicated_changes};
+        undefined ->
+            X = case proplists:get_value(replicated_changes, Options) of
+                true -> replicated_changes;
+                _ -> interactive_edit
+            end,
+            {Docs0, X}
     end,
-    Docs = make_att_readers(Docs0),
-    with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+    Docs2 = make_att_readers(Docs1),
+    with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, Type]}).
+
+
+get_purge_seq(DbName, Options) ->
+    with_db(DbName, Options, {couch_db, get_purge_seq, []}).
+
+purge_docs(DbName, UUIdsIdsRevs, Options) ->
+    with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, Options]}).
 
 %% @equiv group_info(DbName, DDocId, [])
 group_info(DbName, DDocId) ->
@@ -298,6 +314,104 @@ with_db(DbName, Options, {M,F,A}) ->
         rexi:reply(Error)
     end.
 
+
+read_repair_filter(DbName, Docs, NodeRevs, Options) ->
+    set_io_priority(DbName, Options),
+    case get_or_create_db(DbName, Options) of
+        {ok, Db} ->
+            try
+                read_repair_filter(Db, Docs, NodeRevs)
+            after
+                couch_db:close(Db)
+            end;
+        Error ->
+            rexi:reply(Error)
+    end.
+
+
+% A read repair operation may have been triggered by a node
+% that was out of sync with the local node. Thus, any time
+% we receive a read repair request we need to check if we
+% may have recently purged any of the given revisions and
+% ignore them if so.
+%
+% This is accomplished by looking at the purge infos that we
+% have locally that have not been replicated to the remote
+% node. The logic here is that we may have received the purge
+% request before the remote shard copy. So to check that we
+% need to look at the purge infos that we have locally but
+% have not yet sent to the remote copy.
+%
+% NodeRevs is a list of the {node(), [rev()]} tuples passed
+% as the read_repair option to update_docs.
+read_repair_filter(Db, Docs, NodeRevs) ->
+    [#doc{id = DocId} | _] = Docs,
+    Nodes = lists:usort([Node || {Node, _} <- NodeRevs, Node /= node()]),
+    NodeSeqs = get_node_seqs(Db, Nodes),
+
+    DbPSeq = couch_db:get_purge_seq(Db),
+    Lag = config:get_integer("couchdb", "read_repair_lag", 100),
+
+    % Filter out read-repair updates from any node that is
+    % so out of date that it would force us to scan a large
+    % number of purge infos
+    NodeFiltFun = fun({Node, _Revs}) ->
+        {Node, NodeSeq} = lists:keyfind(Node, 1, NodeSeqs),
+        NodeSeq >= DbPSeq - Lag
+    end,
+    RecentNodeRevs = lists:filter(NodeFiltFun, NodeRevs),
+
+    % For each node we scan the purge infos to filter out any
+    % revisions that have been locally purged since we last
+    % replicated to the remote node's shard copy.
+    AllowableRevs = lists:foldl(fun({Node, Revs}, RevAcc) ->
+        {Node, StartSeq} = lists:keyfind(Node, 1, NodeSeqs),
+        FoldFun = fun({_PSeq, _UUID, PDocId, PRevs}, InnerAcc) ->
+            if PDocId /= DocId -> {ok, InnerAcc}; true ->
+                {ok, InnerAcc -- PRevs}
+            end
+        end,
+        {ok, FiltRevs} = couch_db:fold_purge_infos(Db, StartSeq, FoldFun, Revs),
+        lists:usort(FiltRevs ++ RevAcc)
+    end, [], RecentNodeRevs),
+
+    % Finally, filter the doc updates to only include revisions
+    % that have not been purged locally.
+    DocFiltFun = fun(#doc{revs = {Pos, [Rev | _]}}) ->
+        lists:member({Pos, Rev}, AllowableRevs)
+    end,
+    lists:filter(DocFiltFun, Docs).
+
+
+get_node_seqs(Db, Nodes) ->
+    % Gather the list of {Node, PurgeSeq} pairs for all nodes
+    % that are present in our read repair group
+    FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
+        case Id of
+            <<?LOCAL_DOC_PREFIX, "purge-mem3-", _/binary>> ->
+                TgtNode = couch_util:get_value(<<"target_node">>, Props),
+                PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props),
+                case lists:keyfind(TgtNode, 1, Acc) of
+                    {_, OldSeq} ->
+                        NewSeq = erlang:max(OldSeq, PurgeSeq),
+                        NewEntry = {TgtNode, NewSeq},
+                        NewAcc = lists:keyreplace(TgtNode, 1, Acc, NewEntry),
+                        {ok, NewAcc};
+                    false ->
+                        {ok, Acc}
+                end;
+            _ ->
+                % We've processed all _local mem3 purge docs
+                {stop, Acc}
+        end
+    end,
+    InitAcc = [{list_to_binary(atom_to_list(Node)), 0} || Node <- Nodes],
+    Opts = [{start_key, <<?LOCAL_DOC_PREFIX, "purge-mem3-">>}],
+    {ok, NodeBinSeqs} = couch_db:fold_local_docs(Db, FoldFun, InitAcc, Opts),
+    [{list_to_existing_atom(binary_to_list(N)), S} || {N, S} <- NodeBinSeqs].
+
+
+
 get_or_create_db(DbName, Options) ->
     couch_db:open_int(DbName, [{create_if_missing, true} | Options]).
 
diff --git a/src/fabric/test/fabric_rpc_purge_tests.erl b/src/fabric/test/fabric_rpc_purge_tests.erl
new file mode 100644
index 0000000..26507cf
--- /dev/null
+++ b/src/fabric/test/fabric_rpc_purge_tests.erl
@@ -0,0 +1,285 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_rpc_purge_tests).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(TDEF(A), {A, fun A/1}).
+
+% TODO: Add tests:
+%         - filter some updates
+%         - allow for an update that was filtered by a node
+%         - ignore lagging nodes
+
+main_test_() ->
+    {
+        setup,
+        spawn,
+        fun setup_all/0,
+        fun teardown_all/1,
+        [
+            {
+                foreach,
+                fun setup_no_purge/0,
+                fun teardown_no_purge/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_no_purge_no_filter)
+                ])
+            },
+            {
+                foreach,
+                fun setup_single_purge/0,
+                fun teardown_single_purge/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_filter),
+                    ?TDEF(t_filter_unknown_node),
+                    ?TDEF(t_no_filter_old_node),
+                    ?TDEF(t_no_filter_different_node),
+                    ?TDEF(t_no_filter_after_repl)
+                ])
+            },
+            {
+                foreach,
+                fun setup_multi_purge/0,
+                fun teardown_multi_purge/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_filter),
+                    ?TDEF(t_filter_unknown_node),
+                    ?TDEF(t_no_filter_old_node),
+                    ?TDEF(t_no_filter_different_node),
+                    ?TDEF(t_no_filter_after_repl)
+                ])
+            }
+        ]
+    }.
+
+
+setup_all() ->
+    test_util:start_couch().
+
+
+teardown_all(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+
+setup_no_purge() ->
+    {ok, Db} = create_db(),
+    populate_db(Db),
+    couch_db:name(Db).
+
+
+teardown_no_purge(DbName) ->
+    ok = couch_server:delete(DbName, []).
+
+
+setup_single_purge() ->
+    DbName = setup_no_purge(),
+    DocId = <<"0003">>,
+    {ok, OldDoc} = open_doc(DbName, DocId),
+    purge_doc(DbName, DocId),
+    {DbName, DocId, OldDoc, 1}.
+
+
+teardown_single_purge({DbName, _, _, _}) ->
+    teardown_no_purge(DbName).
+
+
+setup_multi_purge() ->
+    DbName = setup_no_purge(),
+    DocId = <<"0003">>,
+    {ok, OldDoc} = open_doc(DbName, DocId),
+    lists:foreach(fun(I) ->
+        PDocId = iolist_to_binary(io_lib:format("~4..0b", [I])),
+        purge_doc(DbName, PDocId)
+    end, lists:seq(1, 5)),
+    {DbName, DocId, OldDoc, 3}.
+
+
+teardown_multi_purge(Ctx) ->
+    teardown_single_purge(Ctx).
+
+
+t_no_purge_no_filter(DbName) ->
+    DocId = <<"0003">>,
+
+    {ok, OldDoc} = open_doc(DbName, DocId),
+    NewDoc = create_update(OldDoc, 2),
+
+    rpc_update_doc(DbName, NewDoc),
+
+    {ok, CurrDoc} = open_doc(DbName, DocId),
+    ?assert(CurrDoc /= OldDoc),
+    ?assert(CurrDoc == NewDoc).
+
+
+t_filter({DbName, DocId, OldDoc, _PSeq}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 0),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)).
+
+
+t_filter_unknown_node({DbName, DocId, OldDoc, _PSeq}) ->
+    % Unknown nodes are assumed to start at PurgeSeq = 0
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 0),
+
+    {Pos, [Rev | _]} = OldDoc#doc.revs,
+    RROpt = {read_repair, [{'blargh@127.0.0.1', [{Pos, Rev}]}]},
+    rpc_update_doc(DbName, OldDoc, [RROpt]),
+
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)).
+
+
+t_no_filter_old_node({DbName, DocId, OldDoc, PSeq}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, PSeq),
+
+    % The random UUID is to generate a badarg exception when
+    % we try and convert it to an existing atom.
+    create_purge_checkpoint(DbName, 0, couch_uuids:random()),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+t_no_filter_different_node({DbName, DocId, OldDoc, PSeq}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, PSeq),
+
+    % Create a valid purge for a different node
+    TgtNode = list_to_binary(atom_to_list('notfoo@127.0.0.1')),
+    create_purge_checkpoint(DbName, 0, TgtNode),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+t_no_filter_after_repl({DbName, DocId, OldDoc, PSeq}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, PSeq),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+wrap({Name, Fun}) ->
+    fun(Arg) ->
+        {timeout, 60, {atom_to_list(Name), fun() ->
+            process_flag(trap_exit, true),
+            Fun(Arg)
+        end}}
+    end.
+
+
+create_db() ->
+    DbName = ?tempdb(),
+    couch_db:create(DbName, [?ADMIN_CTX]).
+
+
+populate_db(Db) ->
+    Docs = lists:map(fun(Idx) ->
+        DocId = lists:flatten(io_lib:format("~4..0b", [Idx])),
+        #doc{
+            id = list_to_binary(DocId),
+            body = {[{<<"int">>, Idx}, {<<"vsn">>, 2}]}
+        }
+    end, lists:seq(1, 100)),
+    {ok, _} = couch_db:update_docs(Db, Docs).
+
+
+open_doc(DbName, DocId) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        couch_db:open_doc(Db, DocId, [])
+    end).
+
+
+create_update(Doc, NewVsn) ->
+    #doc{
+        id = DocId,
+        revs = {Pos, [Rev | _] = Revs},
+        body = {Props}
+    } = Doc,
+    NewProps = lists:keyreplace(<<"vsn">>, 1, Props, {<<"vsn">>,
NewVsn}),
+    NewRev = crypto:hash(md5, term_to_binary({DocId, Rev, {NewProps}})),
+    Doc#doc{
+        revs = {Pos + 1, [NewRev | Revs]},
+        body = {NewProps}
+    }.
+
+
+purge_doc(DbName, DocId) ->
+    {ok, Doc} = open_doc(DbName, DocId),
+    {Pos, [Rev | _]} = Doc#doc.revs,
+    PInfo = {couch_uuids:random(), DocId, [{Pos, Rev}]},
+    Resp = couch_util:with_db(DbName, fun(Db) ->
+        couch_db:purge_docs(Db, [PInfo], [])
+    end),
+    ?assertEqual({ok, [{ok, [{Pos, Rev}]}]}, Resp).
+
+
+create_purge_checkpoint(DbName, PurgeSeq) ->
+    create_purge_checkpoint(DbName, PurgeSeq, tgt_node_bin()).
+
+
+create_purge_checkpoint(DbName, PurgeSeq, TgtNode) when is_binary(TgtNode) ->
+    Resp = couch_util:with_db(DbName, fun(Db) ->
+        SrcUUID = couch_db:get_uuid(Db),
+        TgtUUID = couch_uuids:random(),
+        CPDoc = #doc{
+            id = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
+            body = {[
+                {<<"target_node">>, TgtNode},
+                {<<"purge_seq">>, PurgeSeq}
+            ]}
+        },
+        couch_db:update_docs(Db, [CPDoc], [])
+    end),
+    ?assertMatch({ok, [_]}, Resp).
+
+
+rpc_update_doc(DbName, Doc) ->
+    {Pos, [Rev | _]} = Doc#doc.revs,
+    RROpt = {read_repair, [{tgt_node(), [{Pos, Rev}]}]},
+    rpc_update_doc(DbName, Doc, [RROpt]).
+
+
+rpc_update_doc(DbName, Doc, Opts) ->
+    Ref = erlang:make_ref(),
+    put(rexi_from, {self(), Ref}),
+    fabric_rpc:update_docs(DbName, [Doc], Opts),
+    Reply = test_util:wait(fun() ->
+        receive
+            {Ref, Reply} ->
+                Reply
+        after 0 ->
+            wait
+        end
+    end),
+    ?assertEqual({ok, []}, Reply).
+
+
+tgt_node() ->
+    'foo@127.0.0.1'.
+
+
+tgt_node_bin() ->
+    iolist_to_binary(atom_to_list(tgt_node())).
\ No newline at end of file


Mime
View raw message