From commits-return-34200-archive-asf-public=cust-asf.ponee.io@couchdb.apache.org Thu Aug 9 11:43:21 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id C0BD91807A5 for ; Thu, 9 Aug 2018 11:43:19 +0200 (CEST) Received: (qmail 97675 invoked by uid 500); 9 Aug 2018 09:43:18 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 97532 invoked by uid 99); 9 Aug 2018 09:43:18 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Aug 2018 09:43:18 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8828E85030; Thu, 9 Aug 2018 09:43:17 +0000 (UTC) Date: Thu, 09 Aug 2018 09:43:25 +0000 To: "commits@couchdb.apache.org" Subject: [couchdb] 08/10: [08/10] Clustered Purge: Update read-repair MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: jiangphcn@apache.org In-Reply-To: <153380779701.15303.13985059148980557590@gitbox.apache.org> References: <153380779701.15303.13985059148980557590@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: couchdb X-Git-Refname: refs/heads/COUCHDB-3326-clustered-purge-pr5-implementation X-Git-Reftype: branch X-Git-Rev: 73173efbb907615ca24d1869535ea03746c2d91e X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180809094317.8828E85030@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation in repository https://gitbox.apache.org/repos/asf/couchdb.git commit 73173efbb907615ca24d1869535ea03746c2d91e Author: Paul J. Davis AuthorDate: Wed May 30 17:26:02 2018 -0500 [08/10] Clustered Purge: Update read-repair Read-repair needs to know which nodes have requested an update to a local doc so that it can determine if the update is applied. The basic idea here is that we may have gotten an update from a remote node that has yet to apply a purge request. If the local node were to apply this update it would effectively undo a succesful purge request. COUCHDB-3326 Co-authored-by: Mayya Sharipova Co-authored-by: jiangphcn --- src/fabric/src/fabric_doc_open.erl | 73 +++++++- src/fabric/src/fabric_doc_open_revs.erl | 262 +++++++++++++++++++++----- src/fabric/src/fabric_rpc.erl | 128 ++++++++++++- src/fabric/test/fabric_rpc_purge_tests.erl | 285 +++++++++++++++++++++++++++++ 4 files changed, 692 insertions(+), 56 deletions(-) diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl index 93f73a8..0a85346 100644 --- a/src/fabric/src/fabric_doc_open.erl +++ b/src/fabric/src/fabric_doc_open.erl @@ -25,6 +25,7 @@ r, state, replies, + node_revs = [], q_reply }). @@ -83,7 +84,13 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) -> end; handle_message(Reply, Worker, Acc) -> NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies), - NewAcc = Acc#acc{replies = NewReplies}, + NewNodeRevs = case Reply of + {ok, #doc{revs = {Pos, [Rev | _]}}} -> + [{Worker#shard.node, [{Pos, Rev}]} | Acc#acc.node_revs]; + _ -> + Acc#acc.node_revs + end, + NewAcc = Acc#acc{replies = NewReplies, node_revs = NewNodeRevs}, case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of {true, QuorumReply} -> fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)), @@ -122,14 +129,14 @@ is_r_met(Workers, Replies, R) -> no_more_workers end. -read_repair(#acc{dbname=DbName, replies=Replies}) -> +read_repair(#acc{dbname=DbName, replies=Replies, node_revs=NodeRevs}) -> Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies], case Docs of % omit local docs from read repair [#doc{id = <>} | _] -> choose_reply(Docs); [#doc{id=Id} | _] -> - Opts = [replicated_changes, ?ADMIN_CTX], + Opts = [?ADMIN_CTX, {read_repair, NodeRevs}], Res = fabric:update_docs(DbName, Docs, Opts), case Res of {ok, []} -> @@ -205,6 +212,7 @@ open_doc_test_() -> t_handle_message_down(), t_handle_message_exit(), t_handle_message_reply(), + t_store_node_revs(), t_read_repair(), t_handle_response_quorum_met(), t_get_doc_info() @@ -397,6 +405,65 @@ t_handle_message_reply() -> end). +t_store_node_revs() -> + W1 = #shard{node = w1, ref = erlang:make_ref()}, + W2 = #shard{node = w2, ref = erlang:make_ref()}, + W3 = #shard{node = w3, ref = erlang:make_ref()}, + Foo1 = {ok, #doc{id = <<"bar">>, revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{id = <<"bar">>, revs = {2, [<<"foo2">>, <<"foo">>]}}}, + NFM = {not_found, missing}, + + InitAcc = #acc{workers = [W1, W2, W3], replies = [], r = 2}, + + ?_test(begin + meck:expect(rexi, kill, fun(_, _) -> ok end), + + % Simple case + {ok, #acc{node_revs = NodeRevs1}} = handle_message(Foo1, W1, InitAcc), + ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs1), + + % Make sure we only hold the head rev + {ok, #acc{node_revs = NodeRevs2}} = handle_message(Foo2, W1, InitAcc), + ?assertEqual([{w1, [{2, <<"foo2">>}]}], NodeRevs2), + + % Make sure we don't capture anything on error + {ok, #acc{node_revs = NodeRevs3}} = handle_message(NFM, W1, InitAcc), + ?assertEqual([], NodeRevs3), + + % Make sure we accumulate node revs + Acc1 = InitAcc#acc{node_revs = [{w1, [{1, <<"foo">>}]}]}, + {ok, #acc{node_revs = NodeRevs4}} = handle_message(Foo2, W2, Acc1), + ?assertEqual( + [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}], + NodeRevs4 + ), + + % Make sure rexi_DOWN doesn't modify node_revs + Down = {rexi_DOWN, nil, {nil, w1}, nil}, + {ok, #acc{node_revs = NodeRevs5}} = handle_message(Down, W2, Acc1), + ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs5), + + % Make sure rexi_EXIT doesn't modify node_revs + Exit = {rexi_EXIT, reason}, + {ok, #acc{node_revs = NodeRevs6}} = handle_message(Exit, W2, Acc1), + ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs6), + + % Make sure an error doesn't remove any node revs + {ok, #acc{node_revs = NodeRevs7}} = handle_message(NFM, W2, Acc1), + ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs7), + + % Make sure we have all of our node_revs when meeting + % quorum + {ok, Acc2} = handle_message(Foo1, W1, InitAcc), + {ok, Acc3} = handle_message(Foo2, W2, Acc2), + {stop, Acc4} = handle_message(NFM, W3, Acc3), + ?assertEqual( + [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}], + Acc4#acc.node_revs + ) + end). + + t_read_repair() -> Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}}, Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}}, diff --git a/src/fabric/src/fabric_doc_open_revs.erl b/src/fabric/src/fabric_doc_open_revs.erl index 096722f..234b108 100644 --- a/src/fabric/src/fabric_doc_open_revs.erl +++ b/src/fabric/src/fabric_doc_open_revs.erl @@ -29,6 +29,7 @@ revs, latest, replies = [], + node_revs = [], repair = false }). @@ -82,6 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) -> worker_count = WorkerCount, workers = Workers, replies = PrevReplies, + node_revs = PrevNodeRevs, r = R, revs = Revs, latest = Latest, @@ -92,7 +94,6 @@ handle_message({ok, RawReplies}, Worker, State) -> IsTree = Revs == all orelse Latest, % Do not count error replies when checking quorum - RealReplyCount = ReplyCount + 1 - ReplyErrorCount, QuorumReplies = RealReplyCount >= R, {NewReplies, QuorumMet, Repair} = case IsTree of @@ -102,11 +103,23 @@ handle_message({ok, RawReplies}, Worker, State) -> NumLeafs = couch_key_tree:count_leafs(PrevReplies), SameNumRevs = length(RawReplies) == NumLeafs, QMet = AllInternal andalso SameNumRevs andalso QuorumReplies, - {NewReplies0, QMet, Repair0}; + % Don't set repair=true on the first reply + {NewReplies0, QMet, (ReplyCount > 0) and Repair0}; false -> {NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies), {NewReplies0, MinCount >= R, false} end, + NewNodeRevs = if Worker == nil -> PrevNodeRevs; true -> + IdRevs = lists:foldl(fun + ({ok, #doc{revs = {Pos, [Rev | _]}}}, Acc) -> + [{Pos, Rev} | Acc]; + (_, Acc) -> + Acc + end, [], RawReplies), + if IdRevs == [] -> PrevNodeRevs; true -> + [{Worker#shard.node, IdRevs} | PrevNodeRevs] + end + end, Complete = (ReplyCount =:= (WorkerCount - 1)), @@ -117,6 +130,7 @@ handle_message({ok, RawReplies}, Worker, State) -> DbName, IsTree, NewReplies, + NewNodeRevs, ReplyCount + 1, InRepair orelse Repair ), @@ -124,6 +138,7 @@ handle_message({ok, RawReplies}, Worker, State) -> false -> {ok, State#state{ replies = NewReplies, + node_revs = NewNodeRevs, reply_count = ReplyCount + 1, workers = lists:delete(Worker, Workers), repair = InRepair orelse Repair @@ -180,7 +195,7 @@ dict_replies(Dict, [Reply | Rest]) -> dict_replies(NewDict, Rest). -maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) -> +maybe_read_repair(Db, IsTree, Replies, NodeRevs, ReplyCount, DoRepair) -> Docs = case IsTree of true -> tree_repair_docs(Replies, DoRepair); false -> dict_repair_docs(Replies, ReplyCount) @@ -189,7 +204,7 @@ maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) -> [] -> ok; _ -> - erlang:spawn(fun() -> read_repair(Db, Docs) end) + erlang:spawn(fun() -> read_repair(Db, Docs, NodeRevs) end) end. @@ -208,8 +223,9 @@ dict_repair_docs(Replies, ReplyCount) -> end. -read_repair(Db, Docs) -> - Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]), +read_repair(Db, Docs, NodeRevs) -> + Opts = [?ADMIN_CTX, {read_repair, NodeRevs}], + Res = fabric:update_docs(Db, Docs, Opts), case Res of {ok, []} -> couch_stats:increment_counter([fabric, read_repairs, success]); @@ -268,20 +284,24 @@ filter_reply(Replies) -> setup() -> config:start_link([]), meck:new([fabric, couch_stats, couch_log]), + meck:new(fabric_util, [passthrough]), meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end), meck:expect(couch_stats, increment_counter, fun(_) -> ok end), - meck:expect(couch_log, notice, fun(_, _) -> ok end). + meck:expect(couch_log, notice, fun(_, _) -> ok end), + meck:expect(fabric_util, cleanup, fun(_) -> ok end). + teardown(_) -> - (catch meck:unload([fabric, couch_stats, couch_log])), + (catch meck:unload([fabric, couch_stats, couch_log, fabric_util])), config:stop(). state0(Revs, Latest) -> #state{ worker_count = 3, - workers = [w1, w2, w3], + workers = + [#shard{node='node1'}, #shard{node='node2'}, #shard{node='node3'}], r = 2, revs = Revs, latest = Latest @@ -321,6 +341,14 @@ open_doc_revs_test_() -> check_worker_error_skipped(), check_quorum_only_counts_valid_responses(), check_empty_list_when_no_workers_reply(), + check_node_rev_stored(), + check_node_rev_store_head_only(), + check_node_rev_store_multiple(), + check_node_rev_dont_store_errors(), + check_node_rev_store_non_errors(), + check_node_rev_store_concatenate(), + check_node_rev_store_concantenate_multiple(), + check_node_rev_unmodified_on_down_or_exit(), check_not_found_replies_are_removed_when_doc_found(), check_not_found_returned_when_one_of_docs_not_found(), check_not_found_returned_when_doc_not_found() @@ -334,27 +362,35 @@ open_doc_revs_test_() -> check_empty_response_not_quorum() -> % Simple smoke test that we don't think we're % done with a first empty response + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, ?_assertMatch( - {ok, #state{workers = [w2, w3]}}, - handle_message({ok, []}, w1, state0(all, false)) + {ok, #state{workers = [W2, W3]}}, + handle_message({ok, []}, W1, state0(all, false)) ). check_basic_response() -> % Check that we've handle a response + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, ?_assertMatch( - {ok, #state{reply_count = 1, workers = [w2, w3]}}, - handle_message({ok, [foo1(), bar1()]}, w1, state0(all, false)) + {ok, #state{reply_count = 1, workers = [W2, W3]}}, + handle_message({ok, [foo1(), bar1()]}, W1, state0(all, false)) ). check_finish_quorum() -> % Two messages with the same revisions means we're done ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), Expect = {stop, [bar1(), foo1()]}, - ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, w2, S1)) + ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, W2, S1)) end). @@ -363,11 +399,13 @@ check_finish_quorum_newer() -> % foo1 should count for foo2 which means we're finished. % We also validate that read_repair was triggered. ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), Expect = {stop, [bar1(), foo2()]}, ok = meck:reset(fabric), - ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, w2, S1)), + ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, W2, S1)), ok = meck:wait(fabric, update_docs, '_', 5000), ?assertMatch( [{_, {fabric, update_docs, [_, _, _]}, _}], @@ -380,11 +418,14 @@ check_no_quorum_on_second() -> % Quorum not yet met for the foo revision so we % would wait for w3 ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), ?assertMatch( - {ok, #state{workers = [w3]}}, - handle_message({ok, [bar1()]}, w2, S1) + {ok, #state{workers = [W3]}}, + handle_message({ok, [bar1()]}, W2, S1) ) end). @@ -394,11 +435,14 @@ check_done_on_third() -> % what. Every revision seen in this pattern should be % included. ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), - {ok, S2} = handle_message({ok, [bar1()]}, w2, S1), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), + {ok, S2} = handle_message({ok, [bar1()]}, W2, S1), Expect = {stop, [bar1(), foo1()]}, - ?assertEqual(Expect, handle_message({ok, [bar1()]}, w3, S2)) + ?assertEqual(Expect, handle_message({ok, [bar1()]}, W3, S2)) end). @@ -407,108 +451,234 @@ check_done_on_third() -> check_specific_revs_first_msg() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), false), ?assertMatch( - {ok, #state{reply_count = 1, workers = [w2, w3]}}, - handle_message({ok, [foo1(), bar1(), bazNF()]}, w1, S0) + {ok, #state{reply_count = 1, workers = [W2, W3]}}, + handle_message({ok, [foo1(), bar1(), bazNF()]}, W1, S0) ) end). check_revs_done_on_agreement() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), false), Msg = {ok, [foo1(), bar1(), bazNF()]}, - {ok, S1} = handle_message(Msg, w1, S0), + {ok, S1} = handle_message(Msg, W1, S0), Expect = {stop, [bar1(), foo1(), bazNF()]}, - ?assertEqual(Expect, handle_message(Msg, w2, S1)) + ?assertEqual(Expect, handle_message(Msg, W2, S1)) end). check_latest_true() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), true), Msg1 = {ok, [foo2(), bar1(), bazNF()]}, Msg2 = {ok, [foo2(), bar1(), bazNF()]}, - {ok, S1} = handle_message(Msg1, w1, S0), + {ok, S1} = handle_message(Msg1, W1, S0), Expect = {stop, [bar1(), foo2(), bazNF()]}, - ?assertEqual(Expect, handle_message(Msg2, w2, S1)) + ?assertEqual(Expect, handle_message(Msg2, W2, S1)) end). check_ancestor_counted_in_quorum() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), true), Msg1 = {ok, [foo1(), bar1(), bazNF()]}, Msg2 = {ok, [foo2(), bar1(), bazNF()]}, Expect = {stop, [bar1(), foo2(), bazNF()]}, % Older first - {ok, S1} = handle_message(Msg1, w1, S0), - ?assertEqual(Expect, handle_message(Msg2, w2, S1)), + {ok, S1} = handle_message(Msg1, W1, S0), + ?assertEqual(Expect, handle_message(Msg2, W2, S1)), % Newer first - {ok, S2} = handle_message(Msg2, w2, S0), - ?assertEqual(Expect, handle_message(Msg1, w1, S2)) + {ok, S2} = handle_message(Msg2, W2, S0), + ?assertEqual(Expect, handle_message(Msg1, W1, S2)) end). check_not_found_counts_for_descendant() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), true), Msg1 = {ok, [foo1(), bar1(), bazNF()]}, Msg2 = {ok, [foo1(), bar1(), baz1()]}, Expect = {stop, [bar1(), baz1(), foo1()]}, % not_found first - {ok, S1} = handle_message(Msg1, w1, S0), - ?assertEqual(Expect, handle_message(Msg2, w2, S1)), + {ok, S1} = handle_message(Msg1, W1, S0), + ?assertEqual(Expect, handle_message(Msg2, W2, S1)), % not_found second - {ok, S2} = handle_message(Msg2, w2, S0), - ?assertEqual(Expect, handle_message(Msg1, w1, S2)) + {ok, S2} = handle_message(Msg2, W2, S0), + ?assertEqual(Expect, handle_message(Msg1, W1, S2)) end). check_worker_error_skipped() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), true), Msg1 = {ok, [foo1(), bar1(), baz1()]}, Msg2 = {rexi_EXIT, reason}, Msg3 = {ok, [foo1(), bar1(), baz1()]}, Expect = {stop, [bar1(), baz1(), foo1()]}, - {ok, S1} = handle_message(Msg1, w1, S0), - {ok, S2} = handle_message(Msg2, w2, S1), - ?assertEqual(Expect, handle_message(Msg3, w3, S2)) + {ok, S1} = handle_message(Msg1, W1, S0), + {ok, S2} = handle_message(Msg2, W2, S1), + ?assertEqual(Expect, handle_message(Msg3, W3, S2)) end). check_quorum_only_counts_valid_responses() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), true), Msg1 = {rexi_EXIT, reason}, Msg2 = {rexi_EXIT, reason}, Msg3 = {ok, [foo1(), bar1(), baz1()]}, Expect = {stop, [bar1(), baz1(), foo1()]}, - {ok, S1} = handle_message(Msg1, w1, S0), - {ok, S2} = handle_message(Msg2, w2, S1), - ?assertEqual(Expect, handle_message(Msg3, w3, S2)) + {ok, S1} = handle_message(Msg1, W1, S0), + {ok, S2} = handle_message(Msg2, W2, S1), + ?assertEqual(Expect, handle_message(Msg3, W3, S2)) end). check_empty_list_when_no_workers_reply() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), true), Msg1 = {rexi_EXIT, reason}, Msg2 = {rexi_EXIT, reason}, Msg3 = {rexi_DOWN, nodedown, {nil, node()}, nil}, Expect = {stop, all_workers_died}, - {ok, S1} = handle_message(Msg1, w1, S0), - {ok, S2} = handle_message(Msg2, w2, S1), - ?assertEqual(Expect, handle_message(Msg3, w3, S2)) + {ok, S1} = handle_message(Msg1, W1, S0), + {ok, S2} = handle_message(Msg2, W2, S1), + ?assertEqual(Expect, handle_message(Msg3, W3, S2)) + end). + + +check_node_rev_stored() -> + ?_test(begin + W1 = #shard{node = node1}, + S0 = state0([], true), + + {ok, S1} = handle_message({ok, [foo1()]}, W1, S0), + ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs) + end). + + +check_node_rev_store_head_only() -> + ?_test(begin + W1 = #shard{node = node1}, + S0 = state0([], true), + + {ok, S1} = handle_message({ok, [foo2()]}, W1, S0), + ?assertEqual([{node1, [{2, <<"foo2">>}]}], S1#state.node_revs) + end). + + +check_node_rev_store_multiple() -> + ?_test(begin + W1 = #shard{node = node1}, + S0 = state0([], true), + + {ok, S1} = handle_message({ok, [foo1(), foo2()]}, W1, S0), + ?assertEqual( + [{node1, [{2, <<"foo2">>}, {1, <<"foo">>}]}], + S1#state.node_revs + ) + end). + + +check_node_rev_dont_store_errors() -> + ?_test(begin + W1 = #shard{node = node1}, + S0 = state0([], true), + + {ok, S1} = handle_message({ok, [barNF()]}, W1, S0), + ?assertEqual([], S1#state.node_revs) + end). + + +check_node_rev_store_non_errors() -> + ?_test(begin + W1 = #shard{node = node1}, + S0 = state0([], true), + + {ok, S1} = handle_message({ok, [foo1(), barNF()]}, W1, S0), + ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs) + end). + + +check_node_rev_store_concatenate() -> + ?_test(begin + W2 = #shard{node = node2}, + S0 = state0([], true), + S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]}, + + {ok, S2} = handle_message({ok, [foo2()]}, W2, S1), + ?assertEqual( + [{node2, [{2, <<"foo2">>}]}, {node1, [{1, <<"foo">>}]}], + S2#state.node_revs + ) + end). + + +check_node_rev_store_concantenate_multiple() -> + ?_test(begin + W2 = #shard{node = node2}, + S0 = state0([], true), + S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]}, + + {ok, S2} = handle_message({ok, [foo2(), bar1()]}, W2, S1), + ?assertEqual( + [ + {node2, [{1, <<"bar">>}, {2, <<"foo2">>}]}, + {node1, [{1, <<"foo">>}]} + ], + S2#state.node_revs + ) + end). + + +check_node_rev_unmodified_on_down_or_exit() -> + ?_test(begin + W2 = #shard{node = node2}, + S0 = state0([], true), + S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]}, + + Down = {rexi_DOWN, nodedown, {nil, node()}, nil}, + {ok, S2} = handle_message(Down, W2, S1), + ?assertEqual( + [{node1, [{1, <<"foo">>}]}], + S2#state.node_revs + ), + + Exit = {rexi_EXIT, reason}, + {ok, S3} = handle_message(Exit, W2, S1), + ?assertEqual( + [{node1, [{1, <<"foo">>}]}], + S3#state.node_revs + ) end). diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index ef4092d..c684229 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -21,6 +21,7 @@ delete_shard_db_doc/2]). -export([get_all_security/2, open_shard/2]). -export([compact/1, compact/2]). +-export([get_purge_seq/2, purge_docs/3, set_purge_infos_limit/3]). -export([get_db_info/2, get_doc_count/2, get_update_seq/2, changes/4, map_view/5, reduce_view/5, group_info/3, update_mrview/4]). @@ -202,6 +203,9 @@ get_all_security(DbName, Options) -> set_revs_limit(DbName, Limit, Options) -> with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). +set_purge_infos_limit(DbName, Limit, Options) -> + with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}). + open_doc(DbName, DocId, Options) -> with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). @@ -237,14 +241,26 @@ get_missing_revs(DbName, IdRevsList, Options) -> end). update_docs(DbName, Docs0, Options) -> - case proplists:get_value(replicated_changes, Options) of - true -> - X = replicated_changes; - _ -> - X = interactive_edit + {Docs1, Type} = case couch_util:get_value(read_repair, Options) of + NodeRevs when is_list(NodeRevs) -> + Filtered = read_repair_filter(DbName, Docs0, NodeRevs, Options), + {Filtered, replicated_changes}; + undefined -> + X = case proplists:get_value(replicated_changes, Options) of + true -> replicated_changes; + _ -> interactive_edit + end, + {Docs0, X} end, - Docs = make_att_readers(Docs0), - with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). + Docs2 = make_att_readers(Docs1), + with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, Type]}). + + +get_purge_seq(DbName, Options) -> + with_db(DbName, Options, {couch_db, get_purge_seq, []}). + +purge_docs(DbName, UUIdsIdsRevs, Options) -> + with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, Options]}). %% @equiv group_info(DbName, DDocId, []) group_info(DbName, DDocId) -> @@ -299,6 +315,104 @@ with_db(DbName, Options, {M,F,A}) -> rexi:reply(Error) end. + +read_repair_filter(DbName, Docs, NodeRevs, Options) -> + set_io_priority(DbName, Options), + case get_or_create_db(DbName, Options) of + {ok, Db} -> + try + read_repair_filter(Db, Docs, NodeRevs) + after + couch_db:close(Db) + end; + Error -> + rexi:reply(Error) + end. + + +% A read repair operation may have been triggered by a node +% that was out of sync with the local node. Thus, any time +% we receive a read repair request we need to check if we +% may have recently purged any of the given revisions and +% ignore them if so. +% +% This is accomplished by looking at the purge infos that we +% have locally that have not been replicated to the remote +% node. The logic here is that we may have received the purge +% request before the remote shard copy. So to check that we +% need to look at the purge infos that we have locally but +% have not yet sent to the remote copy. +% +% NodeRevs is a list of the {node(), [rev()]} tuples passed +% as the read_repair option to update_docs. +read_repair_filter(Db, Docs, NodeRevs) -> + [#doc{id = DocId} | _] = Docs, + Nodes = lists:usort([Node || {Node, _} <- NodeRevs, Node /= node()]), + NodeSeqs = get_node_seqs(Db, Nodes), + + DbPSeq = couch_db:get_purge_seq(Db), + Lag = config:get_integer("couchdb", "read_repair_lag", 100), + + % Filter out read-repair updates from any node that is + % so out of date that it would force us to scan a large + % number of purge infos + NodeFiltFun = fun({Node, _Revs}) -> + {Node, NodeSeq} = lists:keyfind(Node, 1, NodeSeqs), + NodeSeq >= DbPSeq - Lag + end, + RecentNodeRevs = lists:filter(NodeFiltFun, NodeRevs), + + % For each node we scan the purge infos to filter out any + % revisions that have been locally purged since we last + % replicated to the remote node's shard copy. + AllowableRevs = lists:foldl(fun({Node, Revs}, RevAcc) -> + {Node, StartSeq} = lists:keyfind(Node, 1, NodeSeqs), + FoldFun = fun({_PSeq, _UUID, PDocId, PRevs}, InnerAcc) -> + if PDocId /= DocId -> {ok, InnerAcc}; true -> + {ok, InnerAcc -- PRevs} + end + end, + {ok, FiltRevs} = couch_db:fold_purge_infos(Db, StartSeq, FoldFun, Revs), + lists:usort(FiltRevs ++ RevAcc) + end, [], RecentNodeRevs), + + % Finally, filter the doc updates to only include revisions + % that have not been purged locally. + DocFiltFun = fun(#doc{revs = {Pos, [Rev | _]}}) -> + lists:member({Pos, Rev}, AllowableRevs) + end, + lists:filter(DocFiltFun, Docs). + + +get_node_seqs(Db, Nodes) -> + % Gather the list of {Node, PurgeSeq} pairs for all nodes + % that are present in our read repair group + FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) -> + case Id of + <> -> + TgtNode = couch_util:get_value(<<"target_node">>, Props), + PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props), + case lists:keyfind(TgtNode, 1, Acc) of + {_, OldSeq} -> + NewSeq = erlang:max(OldSeq, PurgeSeq), + NewEntry = {TgtNode, NewSeq}, + NewAcc = lists:keyreplace(TgtNode, 1, Acc, NewEntry), + {ok, NewAcc}; + false -> + {ok, Acc} + end; + _ -> + % We've processed all _local mem3 purge docs + {stop, Acc} + end + end, + InitAcc = [{list_to_binary(atom_to_list(Node)), 0} || Node <- Nodes], + Opts = [{start_key, <>}], + {ok, NodeBinSeqs} = couch_db:fold_local_docs(Db, FoldFun, InitAcc, Opts), + [{list_to_existing_atom(binary_to_list(N)), S} || {N, S} <- NodeBinSeqs]. + + + get_or_create_db(DbName, Options) -> couch_db:open_int(DbName, [{create_if_missing, true} | Options]). diff --git a/src/fabric/test/fabric_rpc_purge_tests.erl b/src/fabric/test/fabric_rpc_purge_tests.erl new file mode 100644 index 0000000..26507cf --- /dev/null +++ b/src/fabric/test/fabric_rpc_purge_tests.erl @@ -0,0 +1,285 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_rpc_purge_tests). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +-define(TDEF(A), {A, fun A/1}). + +% TODO: Add tests: +% - filter some updates +% - allow for an update that was filtered by a node +% - ignore lagging nodes + +main_test_() -> + { + setup, + spawn, + fun setup_all/0, + fun teardown_all/1, + [ + { + foreach, + fun setup_no_purge/0, + fun teardown_no_purge/1, + lists:map(fun wrap/1, [ + ?TDEF(t_no_purge_no_filter) + ]) + }, + { + foreach, + fun setup_single_purge/0, + fun teardown_single_purge/1, + lists:map(fun wrap/1, [ + ?TDEF(t_filter), + ?TDEF(t_filter_unknown_node), + ?TDEF(t_no_filter_old_node), + ?TDEF(t_no_filter_different_node), + ?TDEF(t_no_filter_after_repl) + ]) + }, + { + foreach, + fun setup_multi_purge/0, + fun teardown_multi_purge/1, + lists:map(fun wrap/1, [ + ?TDEF(t_filter), + ?TDEF(t_filter_unknown_node), + ?TDEF(t_no_filter_old_node), + ?TDEF(t_no_filter_different_node), + ?TDEF(t_no_filter_after_repl) + ]) + } + ] + }. + + +setup_all() -> + test_util:start_couch(). + + +teardown_all(Ctx) -> + test_util:stop_couch(Ctx). + + +setup_no_purge() -> + {ok, Db} = create_db(), + populate_db(Db), + couch_db:name(Db). + + +teardown_no_purge(DbName) -> + ok = couch_server:delete(DbName, []). + + +setup_single_purge() -> + DbName = setup_no_purge(), + DocId = <<"0003">>, + {ok, OldDoc} = open_doc(DbName, DocId), + purge_doc(DbName, DocId), + {DbName, DocId, OldDoc, 1}. + + +teardown_single_purge({DbName, _, _, _}) -> + teardown_no_purge(DbName). + + +setup_multi_purge() -> + DbName = setup_no_purge(), + DocId = <<"0003">>, + {ok, OldDoc} = open_doc(DbName, DocId), + lists:foreach(fun(I) -> + PDocId = iolist_to_binary(io_lib:format("~4..0b", [I])), + purge_doc(DbName, PDocId) + end, lists:seq(1, 5)), + {DbName, DocId, OldDoc, 3}. + + +teardown_multi_purge(Ctx) -> + teardown_single_purge(Ctx). + + +t_no_purge_no_filter(DbName) -> + DocId = <<"0003">>, + + {ok, OldDoc} = open_doc(DbName, DocId), + NewDoc = create_update(OldDoc, 2), + + rpc_update_doc(DbName, NewDoc), + + {ok, CurrDoc} = open_doc(DbName, DocId), + ?assert(CurrDoc /= OldDoc), + ?assert(CurrDoc == NewDoc). + + +t_filter({DbName, DocId, OldDoc, _PSeq}) -> + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)), + create_purge_checkpoint(DbName, 0), + + rpc_update_doc(DbName, OldDoc), + + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)). + + +t_filter_unknown_node({DbName, DocId, OldDoc, _PSeq}) -> + % Unknown nodes are assumed to start at PurgeSeq = 0 + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)), + create_purge_checkpoint(DbName, 0), + + {Pos, [Rev | _]} = OldDoc#doc.revs, + RROpt = {read_repair, [{'blargh@127.0.0.1', [{Pos, Rev}]}]}, + rpc_update_doc(DbName, OldDoc, [RROpt]), + + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)). + + +t_no_filter_old_node({DbName, DocId, OldDoc, PSeq}) -> + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)), + create_purge_checkpoint(DbName, PSeq), + + % The random UUID is to generate a badarg exception when + % we try and convert it to an existing atom. + create_purge_checkpoint(DbName, 0, couch_uuids:random()), + + rpc_update_doc(DbName, OldDoc), + + ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)). + + +t_no_filter_different_node({DbName, DocId, OldDoc, PSeq}) -> + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)), + create_purge_checkpoint(DbName, PSeq), + + % Create a valid purge for a different node + TgtNode = list_to_binary(atom_to_list('notfoo@127.0.0.1')), + create_purge_checkpoint(DbName, 0, TgtNode), + + rpc_update_doc(DbName, OldDoc), + + ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)). + + +t_no_filter_after_repl({DbName, DocId, OldDoc, PSeq}) -> + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)), + create_purge_checkpoint(DbName, PSeq), + + rpc_update_doc(DbName, OldDoc), + + ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)). + + +wrap({Name, Fun}) -> + fun(Arg) -> + {timeout, 60, {atom_to_list(Name), fun() -> + process_flag(trap_exit, true), + Fun(Arg) + end}} + end. + + +create_db() -> + DbName = ?tempdb(), + couch_db:create(DbName, [?ADMIN_CTX]). + + +populate_db(Db) -> + Docs = lists:map(fun(Idx) -> + DocId = lists:flatten(io_lib:format("~4..0b", [Idx])), + #doc{ + id = list_to_binary(DocId), + body = {[{<<"int">>, Idx}, {<<"vsn">>, 2}]} + } + end, lists:seq(1, 100)), + {ok, _} = couch_db:update_docs(Db, Docs). + + +open_doc(DbName, DocId) -> + couch_util:with_db(DbName, fun(Db) -> + couch_db:open_doc(Db, DocId, []) + end). + + +create_update(Doc, NewVsn) -> + #doc{ + id = DocId, + revs = {Pos, [Rev | _] = Revs}, + body = {Props} + } = Doc, + NewProps = lists:keyreplace(<<"vsn">>, 1, Props, {<<"vsn">>, NewVsn}), + NewRev = crypto:hash(md5, term_to_binary({DocId, Rev, {NewProps}})), + Doc#doc{ + revs = {Pos + 1, [NewRev | Revs]}, + body = {NewProps} + }. + + +purge_doc(DbName, DocId) -> + {ok, Doc} = open_doc(DbName, DocId), + {Pos, [Rev | _]} = Doc#doc.revs, + PInfo = {couch_uuids:random(), DocId, [{Pos, Rev}]}, + Resp = couch_util:with_db(DbName, fun(Db) -> + couch_db:purge_docs(Db, [PInfo], []) + end), + ?assertEqual({ok, [{ok, [{Pos, Rev}]}]}, Resp). + + +create_purge_checkpoint(DbName, PurgeSeq) -> + create_purge_checkpoint(DbName, PurgeSeq, tgt_node_bin()). + + +create_purge_checkpoint(DbName, PurgeSeq, TgtNode) when is_binary(TgtNode) -> + Resp = couch_util:with_db(DbName, fun(Db) -> + SrcUUID = couch_db:get_uuid(Db), + TgtUUID = couch_uuids:random(), + CPDoc = #doc{ + id = mem3_rep:make_purge_id(SrcUUID, TgtUUID), + body = {[ + {<<"target_node">>, TgtNode}, + {<<"purge_seq">>, PurgeSeq} + ]} + }, + couch_db:update_docs(Db, [CPDoc], []) + end), + ?assertMatch({ok, [_]}, Resp). + + +rpc_update_doc(DbName, Doc) -> + {Pos, [Rev | _]} = Doc#doc.revs, + RROpt = {read_repair, [{tgt_node(), [{Pos, Rev}]}]}, + rpc_update_doc(DbName, Doc, [RROpt]). + + +rpc_update_doc(DbName, Doc, Opts) -> + Ref = erlang:make_ref(), + put(rexi_from, {self(), Ref}), + fabric_rpc:update_docs(DbName, [Doc], Opts), + Reply = test_util:wait(fun() -> + receive + {Ref, Reply} -> + Reply + after 0 -> + wait + end + end), + ?assertEqual({ok, []}, Reply). + + +tgt_node() -> + 'foo@127.0.0.1'. + + +tgt_node_bin() -> + iolist_to_binary(atom_to_list(tgt_node())). \ No newline at end of file