couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [3/3] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to 8b3a6b1
Date Wed, 15 Mar 2017 19:17:02 GMT
Add internal replication of purge requests

COUCHDB-3326


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/8b3a6b19
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/8b3a6b19
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/8b3a6b19

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 8b3a6b19add32dbfe3c78622e3e8176b0a0ccb6a
Parents: b26e394
Author: Mayya Sharipova <mayyas@ca.ibm.com>
Authored: Mon Oct 17 17:22:16 2016 -0400
Committer: Paul J. Davis <paul.joseph.davis@gmail.com>
Committed: Wed Mar 15 14:16:42 2017 -0500

----------------------------------------------------------------------
 src/mem3_rep.erl | 136 ++++++++++++++++++++++++++++++++++++++++++++++----
 src/mem3_rpc.erl |  94 ++++++++++++++++++++++++++++++++--
 2 files changed, 214 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/8b3a6b19/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 64905e3..419af31 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -17,7 +17,9 @@
     go/2,
     go/3,
     make_local_id/2,
-    find_source_seq/4
+    make_local_purge_id/2,
+    find_source_seq/4,
+    mem3_sync_purge/1
 ]).
 
 -export([
@@ -39,7 +41,8 @@
     target,
     filter,
     db,
-    history = {[]}
+    history = {[]},
+    purge_seq = 0
 }).
 
 
@@ -119,6 +122,10 @@ make_local_id(SourceThing, TargetThing, Filter) ->
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
 
 
+make_local_purge_id(SourceUUID, TargetUUID) ->
+   <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
+
+
 %% @doc Find and return the largest update_seq in SourceDb
 %% that the client has seen from TargetNode.
 %%
@@ -172,11 +179,55 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID,
TgtSeq) ->
 
 repl(#db{name=DbName}=Db, Acc0) ->
     erlang:put(io_priority, {internal_repl, DbName}),
-    #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
-    Fun = fun ?MODULE:changes_enumerator/2,
-    {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
-    {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
-    {ok, couch_db:count_changes_since(Db, LastSeq)}.
+    #acc{source = Db2} = Acc1 = pull_purges_from_target(Db, Acc0),
+    #acc{seq=Seq} = Acc2 = calculate_start_seq(Acc1),
+    try
+        % this throws an exception: {invalid_start_purge_seq, PurgeSeq0},
+        % when oldest_purge_seq on source > the last source purge_seq known to target
+        Acc3 = replicate_purged_docs(Acc2),
+        Fun = fun ?MODULE:changes_enumerator/2,
+        {ok, Acc4} = couch_db:fold_changes(Db2, Seq, Fun, Acc3),
+        {ok, #acc{seq = LastSeq}} = replicate_batch(Acc4),
+        {ok, couch_db:count_changes_since(Db2, LastSeq)}
+    catch
+        throw:{invalid_start_purge_seq, PurgeSeq} ->
+            couch_log:error(
+                "oldest_purge_seq on source passed purge_seq: ~p known to target for db:
~p",
+                [PurgeSeq, DbName]
+            )
+    end.
+
+
+pull_purges_from_target(Db, #acc{target=#shard{node=TNode, name=DbName}}=Acc) ->
+    SourceUUID = couch_db:get_uuid(Db),
+    {TUUIDsIdsRevs, TargetPDocID, TargetPSeq} =
+            mem3_rpc:load_purges(TNode, DbName, SourceUUID),
+    Acc2 = case TUUIDsIdsRevs of
+        [] -> Acc#acc{source = Db};
+        _ ->
+            % check which Target UUIDs have not been applied to Source
+            UUIDs = [UUID || {UUID, _Id, _Revs} <- TUUIDsIdsRevs],
+            PurgedDocs = couch_db:open_purged_docs(Db, UUIDs),
+            Results = lists:zip(TUUIDsIdsRevs, PurgedDocs),
+            Unapplied = lists:filtermap(fun
+                ({UUIDIdRevs, not_found}) -> {true, UUIDIdRevs};
+                (_) -> false
+            end, Results),
+            Acc1 = case Unapplied of
+                [] -> Acc#acc{source = Db};
+                _ ->
+                    % purge Db on Source and reopen it
+                    couch_db:purge_docs(Db, Unapplied),
+                    couch_db:close(Db),
+                    {ok, Db2} = couch_db:open(DbName, [?ADMIN_CTX]),
+                    Acc#acc{source = Db2}
+            end,
+            % update on Target target_purge_seq known to Source
+            mem3_rpc:save_purge_checkpoint(TNode, DbName, TargetPDocID,
+                    TargetPSeq, node()),
+            Acc1
+    end,
+    Acc2.
 
 
 calculate_start_seq(Acc) ->
@@ -210,7 +261,31 @@ calculate_start_seq(Acc) ->
                     Seq = TargetSeq,
                     History = couch_util:get_value(<<"history">>, TProps, {[]})
             end,
-            Acc1#acc{seq = Seq, history = History};
+            SourcePurgeSeq0 = couch_util:get_value(<<"purge_seq">>, SProps),
+            TargetPurgeSeq0 = couch_util:get_value(<<"purge_seq">>, TProps),
+            % before purge upgrade, purge_seq was not saved in checkpoint file,
+            % thus get purge_seq directly from dbs
+            SourcePurgeSeq = case is_integer(SourcePurgeSeq0) of
+                true ->
+                    SourcePurgeSeq0;
+                false ->
+                    {ok, SPS} = couch_db:get_purge_seq(Db),
+                    SPS
+            end,
+            TargetPurgeSeq = case is_integer(TargetPurgeSeq0) of
+                true ->
+                    TargetPurgeSeq0;
+                false ->
+                    {ok, TPS} = mem3_rpc:get_purge_seq(Node, Name),
+                    TPS
+            end,
+            case SourcePurgeSeq =< TargetPurgeSeq of
+                true ->
+                    PurgeSeq = SourcePurgeSeq;
+                false ->
+                    PurgeSeq = TargetPurgeSeq
+            end,
+            Acc1#acc{seq = Seq, history = History, purge_seq = PurgeSeq};
         {not_found, _} ->
             compare_epochs(Acc1)
     end.
@@ -246,6 +321,27 @@ changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0)
->
     {Go, Acc1}.
 
 
+replicate_purged_docs(Acc0) ->
+    #acc{
+        source = Db,
+        target = #shard{node=Node, name=Name},
+        purge_seq = PurgeSeq0
+    } = Acc0,
+    PFoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) ->
+        [{UUID, Id, Revs} | Acc]
+    end,
+
+    {ok, UUIDsIdsRevs} = couch_db:fold_purged_docs(Db, PurgeSeq0, PFoldFun, [], []),
+    case UUIDsIdsRevs of
+        [] ->
+            Acc0;
+        _ ->
+            ok = purge_on_target(Node, Name, UUIDsIdsRevs),
+            {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+            Acc0#acc{purge_seq = PurgeSeq}
+    end.
+
+
 replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     case find_missing_revs(Acc) of
     [] ->
@@ -291,8 +387,19 @@ save_on_target(Node, Name, Docs) ->
     ok.
 
 
+purge_on_target(Node, Name, UUIdsIdsRevs) ->
+    mem3_rpc:purge_docs(Node, Name, UUIdsIdsRevs,[
+        replicated_changes,
+        full_commit,
+        ?ADMIN_CTX,
+        {io_priority, {internal_repl, Name}}
+    ]),
+    ok.
+
+
 update_locals(Acc) ->
-    #acc{seq=Seq, source=Db, target=Target, localid=Id, history=History} = Acc,
+    #acc{seq=Seq, purge_seq = PurgeSeq, source=Db, target=Target,
+            localid=Id, history=History} = Acc,
     #shard{name=Name, node=Node} = Target,
     NewEntry = [
         {<<"source_node">>, atom_to_binary(node(), utf8)},
@@ -300,8 +407,9 @@ update_locals(Acc) ->
         {<<"source_seq">>, Seq},
         {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
     ],
-    NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
-    {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
+    NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, PurgeSeq,
+            NewEntry, History),
+   {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
 
 
 find_repl_doc(SrcDb, TgtUUIDPrefix) ->
@@ -336,6 +444,12 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
     end.
 
 
+% used during compaction to check if _local/purge doc is current
+mem3_sync_purge(Opts)->
+    Node = couch_util:get_value(<<"node">>, Opts),
+    lists:member(mem3:nodes(), Node).
+
+
 is_prefix(Prefix, Subject) ->
     binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
 

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/8b3a6b19/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index 13641dc..07b4f54 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -19,15 +19,21 @@
     find_common_seq/4,
     get_missing_revs/4,
     update_docs/4,
+    get_purge_seq/2,
+    purge_docs/4,
     load_checkpoint/4,
-    save_checkpoint/6
+    save_checkpoint/7,
+    load_purges/3,
+    save_purge_checkpoint/5
 ]).
 
 % Private RPC callbacks
 -export([
     find_common_seq_rpc/3,
     load_checkpoint_rpc/3,
-    save_checkpoint_rpc/5
+    save_checkpoint_rpc/6,
+    load_purges_rpc/2,
+    save_purge_checkpoint_rpc/4
 ]).
 
 
@@ -43,16 +49,34 @@ update_docs(Node, DbName, Docs, Options) ->
     rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
 
 
+get_purge_seq(Node, DbName) ->
+    rexi_call(Node, {fabric_rpc, get_purge_seq, [DbName]}).
+
+
+purge_docs(Node, DbName, PUUIdsIdsRevs, Options) ->
+    rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PUUIdsIdsRevs, Options]}).
+
+
 load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
     Args = [DbName, SourceNode, SourceUUID],
     rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
 
 
-save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
-    Args = [DbName, DocId, Seq, Entry, History],
+save_checkpoint(Node, DbName, DocId, Seq, PurgeSeq, Entry, History) ->
+    Args = [DbName, DocId, Seq, PurgeSeq, Entry, History],
     rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
 
 
+load_purges(Node, DbName, SourceUUID) ->
+    Args = [DbName, SourceUUID],
+    rexi_call(Node, {mem3_rpc, load_purges_rpc, Args}).
+
+
+save_purge_checkpoint(Node, DbName, DocId, PurgeSeq, SourceNode) ->
+    Args = [DbName, DocId, PurgeSeq, SourceNode],
+    rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}).
+
+
 find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
     Args = [DbName, SourceUUID, SourceEpochs],
     rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
@@ -81,7 +105,8 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
     end.
 
 
-save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
+save_checkpoint_rpc(DbName, Id, SourceSeq, SourcePurgeSeq,
+        NewEntry0, History0) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     case get_or_create_db(DbName, [?ADMIN_CTX]) of
         {ok, Db} ->
@@ -93,6 +118,7 @@ save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
             ] ++ NewEntry0},
             Body = {[
                 {<<"seq">>, SourceSeq},
+                {<<"purge_seq">>, SourcePurgeSeq},
                 {<<"target_uuid">>, couch_db:get_uuid(Db)},
                 {<<"history">>, add_checkpoint(NewEntry, History0)}
             ]},
@@ -129,6 +155,64 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
     end.
 
 
+load_purges_rpc(DbName, SourceUUID) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+    {ok, Db} ->
+        TargetUUID = couch_db:get_uuid(Db),
+        DocId = mem3_rep:make_local_purge_id(SourceUUID, TargetUUID),
+        LastPSeq = case couch_db:open_doc(Db, DocId, []) of
+            {ok, #doc{body={Props}} } ->
+                couch_util:get_value(<<"purge_seq">>, Props);
+            {not_found, _} ->
+                0
+        end,
+        {ok, CurPSeq} = couch_db:get_purge_seq(Db),
+        UUIDsIdsRevs = if (LastPSeq == CurPSeq) -> []; true ->
+            FoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) ->
+                [{UUID, Id, Revs} | Acc]
+            end,
+            {ok, UUIDsIdsRevs0} = couch_db:fold_purged_docs(
+                Db, LastPSeq, FoldFun, [], []
+            ),
+            UUIDsIdsRevs0
+        end,
+        rexi:reply({ok, {UUIDsIdsRevs, DocId, CurPSeq}});
+    Error ->
+        rexi:reply(Error)
+    end.
+
+
+save_purge_checkpoint_rpc(DbName, Id, PurgeSeq, Node) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            Timestamp = couch_util:utc_string(),
+            Body = {[
+                {<<"purge_seq">>, PurgeSeq},
+                {<<"timestamp_utc">>, Timestamp},
+                {<<"verify_module">>, <<"mem3_rep">>},
+                {<<"verify_function">>, <<"mem3_sync_purge">>},
+                {<<"verify_options">>, {[{<<"node">>, Node}]}},
+                {<<"type">>, <<"internal_replication">>}
+            ]},
+            Doc = #doc{id = Id, body = Body},
+            rexi:reply(try couch_db:update_doc(Db, Doc, []) of
+                {ok, _} ->
+                    {ok, Body};
+                Else ->
+                    {error, Else}
+            catch
+                Exception ->
+                    Exception;
+                error:Reason ->
+                    {error, Reason}
+            end);
+        Error ->
+            rexi:reply(Error)
+    end.
+
+
 %% @doc Return the sequence where two files with the same UUID diverged.
 compare_epochs(SourceEpochs, TargetEpochs) ->
     compare_rev_epochs(


Mime
View raw message