couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [40/48] mem3 commit: updated refs/heads/windsor-merge to ff02b9a
Date Fri, 01 Aug 2014 09:11:26 GMT
Include replication history on checkpoint docs

This changes how and what we store on internal replication checkpoint
documents. The two major changes are that we are now identifying
checkpoint documents by the database UUIDs (instead of the node that
hosted them) and we're storing a history of checkpoint information to
allow us to be able to replace dead shards.

The history is a list of checkpoint entries stored with exponentially
decreasing granularity. This allows us to store ~30 checkpoints covering
ranges into the billions of update sequences which means we won't need
to worry about truncations or other issues for the time being.

There's also a new mem3_rep:find_source_seq/4 helper function that will
find a local update_seq replacement provided information for a remote
shard copy. This logic is a bit subtle and should be reused rather than
reimplemented.

BugzId: 21973


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/bf07a7c2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/bf07a7c2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/bf07a7c2

Branch: refs/heads/windsor-merge
Commit: bf07a7c2987fe3dea2088f25be373f05135e3f8a
Parents: 36a1e63
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Fri Dec 6 12:08:07 2013 -0600
Committer: Robert Newson <rnewson@apache.org>
Committed: Wed Jul 23 18:50:19 2014 +0100

----------------------------------------------------------------------
 src/mem3_rep.erl | 259 ++++++++++++++++++++++-----
 src/mem3_rpc.erl | 476 +++++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 685 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/bf07a7c2/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 4284518..bdc6fa4 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -16,7 +16,8 @@
 -export([
     go/2,
     go/3,
-    make_local_id/2
+    make_local_id/2,
+    find_source_seq/4
 ]).
 
 -export([
@@ -36,12 +37,13 @@
     batch_count,
     revcount = 0,
     infos = [],
-    seq,
+    seq = 0,
     localid,
     source,
     target,
     filter,
-    db
+    db,
+    history = {[]}
 }).
 
 
@@ -65,11 +67,9 @@ go(#shard{} = Source, #shard{} = Target, Opts) ->
         _ -> 1
     end,
     Filter = proplists:get_value(filter, Opts),
-    LocalId = make_local_id(Source, Target, Filter),
     Acc = #acc{
         batch_size = BatchSize,
         batch_count = BatchCount,
-        localid = LocalId,
         source = Source,
         target = Target,
         filter = Filter
@@ -101,13 +101,17 @@ go(#acc{source=Source, batch_count=BC}=Acc0) ->
     end.
 
 
-make_local_id(#shard{}=Source, #shard{}=Target) ->
+make_local_id(Source, Target) ->
     make_local_id(Source, Target, undefined).
 
 
 make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
-    S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
-    T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
+    make_local_id(SourceNode, TargetNode, Filter);
+
+
+make_local_id(SourceThing, TargetThing, Filter) ->
+    S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceThing))),
+    T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetThing))),
     F = case is_function(Filter) of
         true ->
             {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq),
@@ -119,30 +123,102 @@ make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter)
->
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
 
 
-repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
+%% @doc Find and return the largest update_seq in SourceDb
+%% that the client has seen from TargetNode.
+%%
+%% When reasoning about this function it is very important to
+%% understand the direction of replication for this comparison.
+%% We're only interesting in internal replications initiated
+%% by this node to the node being replaced. When doing a
+%% replacement the most important thing is that the client doesn't
+%% miss any updates. This means we can only fast-forward as far
+%% as they've seen updates on this node. We can detect that by
+%% looking for our push replication history and choosing the
+%% largest source_seq that has a target_seq =< TgtSeq.
+find_source_seq(SrcDb, TgtNode, TgtUUID, TgtSeq) ->
+    SrcNode = atom_to_binary(node(), utf8),
+    SrcUUID = couch_db:get_uuid(SrcDb),
+    DocId = make_local_id(SrcUUID, TgtUUID),
+    case couch_db:open_doc(SrcDb, DocId, []) of
+    {ok, Doc} ->
+        find_source_seq_int(Doc, SrcNode, TgtNode, TgtUUID, TgtSeq);
+    {not_found, _} ->
+        0
+    end.
+
+
+find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
+    SrcNode = case is_atom(SrcNode0) of
+        true -> atom_to_binary(SrcNode0, utf8);
+        false -> SrcNode0
+    end,
+    TgtNode = case is_atom(TgtNode0) of
+        true -> atom_to_binary(TgtNode0, utf8);
+        false -> TgtNode0
+    end,
+    % This is split off purely for the ability to run unit tests
+    % against this bit of code without requiring all sorts of mocks.
+    {History} = couch_util:get_value(<<"history">>, Props, {[]}),
+    SrcHistory = couch_util:get_value(SrcNode, History, []),
+    UseableHistory = lists:filter(fun({Entry}) ->
+        couch_util:get_value(<<"target_node">>, Entry) =:= TgtNode andalso
+        couch_util:get_value(<<"target_uuid">>, Entry) =:= TgtUUID andalso
+        couch_util:get_value(<<"target_seq">>,  Entry) =<  TgtSeq
+    end, SrcHistory),
+
+    % This relies on SrcHistory being ordered desceding by source
+    % sequence.
+    case UseableHistory of
+        [{Entry} | _] ->
+            couch_util:get_value(<<"source_seq">>, Entry);
+        [] ->
+            0
+    end.
+
+
+repl(#db{name=DbName, seq_tree=Bt}=Db, Acc0) ->
     erlang:put(io_priority, {internal_repl, DbName}),
-    Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
-    Acc1 = Acc0#acc{source=Db, seq=Seq},
+    #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
     Fun = fun ?MODULE:changes_enumerator/3,
     {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
     {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
     {ok, couch_db:count_changes_since(Db, LastSeq)}.
 
 
-calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
-    case couch_db:open_doc(Db, LocalId, [ejson_body]) of
-    {ok, #doc{body = {SProps}}} ->
-        Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
-        try mem3_rpc:load_checkpoint(Node, Name, LocalId, Opts) of
-        #doc{body = {TProps}} ->
+calculate_start_seq(Acc) ->
+    #acc{
+        source = Db,
+        target = #shard{node=Node, name=Name}
+    } = Acc,
+    %% Give the target our UUID and ask it to return the checkpoint doc
+    UUID = couch_db:get_uuid(Db),
+    {NewDocId, Doc} = mem3_rpc:load_checkpoint(Node, Name, node(), UUID),
+    #doc{id=FoundId, body={TProps}} = Doc,
+    Acc1 = Acc#acc{localid = NewDocId},
+    % NewDocId and FoundId may be different the first time
+    % this code runs to save our newly named internal replication
+    % checkpoints. We store NewDocId to use when saving checkpoints
+    % but use FoundId to reuse the same docid that the target used.
+    case couch_db:open_doc(Db, FoundId, [ejson_body]) of
+        {ok, #doc{body = {SProps}}} ->
             SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
             TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
-            erlang:min(SourceSeq, TargetSeq)
-        catch error:{not_found, _} ->
-            0
-        end;
-    {not_found, _} ->
-        0
+            % We resume from the lower update seq stored in the two
+            % shard copies. We also need to be sure and use the
+            % corresponding history. A difference here could result
+            % from either a write failure on one of the nodes or if
+            % either shard was truncated by an operator.
+            case SourceSeq =< TargetSeq of
+                true ->
+                    Seq = SourceSeq,
+                    History = couch_util:get_value(<<"history">>, SProps, {[]});
+                false ->
+                    Seq = TargetSeq,
+                    History = couch_util:get_value(<<"history">>, TProps, {[]})
+            end,
+            Acc1#acc{seq = Seq, history = History};
+        {not_found, _} ->
+            Acc1
     end.
 
 
@@ -173,7 +249,8 @@ replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     [] ->
         ok;
     Missing ->
-        ok = save_on_target(Node, Name, open_docs(Acc, Missing))
+        Docs = open_docs(Acc, Missing),
+        ok = save_on_target(Node, Name, Docs)
     end,
     update_locals(Acc),
     {ok, Acc#acc{revcount=0, infos=[]}}.
@@ -191,20 +268,10 @@ find_missing_revs(Acc) ->
     ]).
 
 
-save_on_target(Node, Name, Docs) ->
-    mem3_rpc:update_docs(Node, Name, Docs, [
-        replicated_changes,
-        full_commit,
-        {user_ctx, ?CTX},
-        {io_priority, {internal_repl, Name}}
-    ]),
-    ok.
-
-
 open_docs(#acc{source=Source, infos=Infos}, Missing) ->
     lists:flatmap(fun({Id, Revs, _}) ->
         FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
-        RevTree = FDI#full_doc_info.rev_tree,
+        #full_doc_info{rev_tree=RevTree} = FDI,
         {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
         lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
             couch_db:make_doc(Source, Id, IsDel, SummaryPtr, FoundRevPath)
@@ -212,19 +279,27 @@ open_docs(#acc{source=Source, infos=Infos}, Missing) ->
     end, Missing).
 
 
+save_on_target(Node, Name, Docs) ->
+    mem3_rpc:update_docs(Node, Name, Docs, [
+        replicated_changes,
+        full_commit,
+        {user_ctx, ?CTX},
+        {io_priority, {internal_repl, Name}}
+    ]),
+    ok.
+
+
 update_locals(Acc) ->
-    #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
+    #acc{seq=Seq, source=Db, target=Target, localid=Id, history=History} = Acc,
     #shard{name=Name, node=Node} = Target,
-    Doc = #doc{id = Id, body = {[
-        {<<"seq">>, Seq},
-        {<<"node">>, list_to_binary(atom_to_list(Node))},
+    NewEntry = [
+        {<<"source_node">>, atom_to_binary(node(), utf8)},
+        {<<"source_uuid">>, couch_db:get_uuid(Db)},
+        {<<"source_seq">>, Seq},
         {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
-    ]}},
-    {ok, _} = couch_db:update_doc(Db, Doc, []),
-    mem3_rpc:save_checkpoint(Node, Name, Doc, [
-        {user_ctx, ?CTX},
-        {io_priority, {internal_repl, Name}}
-    ]).
+    ],
+    NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
+    {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
 
 
 filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
@@ -243,3 +318,97 @@ iso8601_timestamp() ->
     {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
     Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
     io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+find_source_seq_unknown_node_test() ->
+    ?assertEqual(
+        find_source_seq_int(doc_(), <<"foo">>, <<"bing">>, <<"bar_uuid">>,
10),
+        0
+    ).
+
+
+find_source_seq_unknown_uuid_test() ->
+    ?assertEqual(
+        find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"teapot">>,
10),
+        0
+    ).
+
+
+find_source_seq_ok_test() ->
+    ?assertEqual(
+        find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>,
100),
+        100
+    ).
+
+
+find_source_seq_old_ok_test() ->
+    ?assertEqual(
+        find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>,
84),
+        50
+    ).
+
+
+find_source_seq_different_node_test() ->
+    ?assertEqual(
+        find_source_seq_int(doc_(), <<"foo2">>, <<"bar">>, <<"bar_uuid">>,
92),
+        31
+    ).
+
+
+-define(SNODE, <<"source_node">>).
+-define(SUUID, <<"source_uuid">>).
+-define(SSEQ, <<"source_seq">>).
+-define(TNODE, <<"target_node">>).
+-define(TUUID, <<"target_uuid">>).
+-define(TSEQ, <<"target_seq">>).
+
+doc_() ->
+    Foo_Bar = [
+        {[
+            {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ,
100},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ,
100}
+        ]},
+        {[
+            {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ,
90},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ,
85}
+        ]},
+        {[
+            {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ,
50},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ,
51}
+        ]},
+        {[
+            {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ,
40},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ,
45}
+        ]},
+        {[
+            {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ,
2},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ,
2}
+        ]}
+    ],
+    Foo2_Bar = [
+        {[
+            {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ,
100},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ,
100}
+        ]},
+        {[
+            {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ,
92},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ,
93}
+        ]},
+        {[
+            {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ,
31},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ,
30}
+        ]}
+    ],
+    History = {[
+        {<<"foo">>, Foo_Bar},
+        {<<"foo2">>, Foo2_Bar}
+    ]},
+    #doc{
+        body={[{<<"history">>, History}]}
+    }.
+
+-endif.

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/bf07a7c2/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index d71cc93..1e77b57 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -19,7 +19,13 @@
     get_missing_revs/4,
     update_docs/4,
     load_checkpoint/4,
-    save_checkpoint/4
+    save_checkpoint/6
+]).
+
+% Private RPC callbacks
+-export([
+    load_checkpoint_rpc/3,
+    save_checkpoint_rpc/5
 ]).
 
 
@@ -38,12 +44,165 @@ update_docs(Node, DbName, Docs, Options) ->
     rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
 
 
-load_checkpoint(Node, DbName, DocId, Opts) ->
-    rexi_call(Node, {fabric_rpc, open_doc, [DbName, DocId, Opts]}).
+load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
+    Args = [DbName, SourceNode, SourceUUID],
+    rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
+
+
+save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
+    Args = [DbName, DocId, Seq, Entry, History],
+    rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
+
+
+load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case couch_db:open_int(DbName, [{user_ctx, ?CTX}]) of
+    {ok, Db} ->
+        TargetUUID = couch_db:get_uuid(Db),
+        NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID),
+        case couch_db:open_doc(Db, NewId, []) of
+        {ok, Doc} ->
+            rexi:reply({ok, {NewId, Doc}});
+        {not_found, _} ->
+            OldId = mem3_rep:make_local_id(SourceNode, node()),
+            case couch_db:open_doc(Db, OldId, []) of
+            {ok, Doc} ->
+                rexi:reply({ok, {NewId, Doc}});
+            {not_found, _} ->
+                rexi:reply({ok, {NewId, #doc{id = NewId}}})
+            end
+        end;
+    Error ->
+        rexi:reply(Error)
+    end.
+
+
+save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case couch_db:open_int(DbName, [{user_ctx, ?CTX}]) of
+        {ok, #db{update_seq = TargetSeq} = Db} ->
+            NewEntry = {[
+                {<<"target_node">>, atom_to_binary(node(), utf8)},
+                {<<"target_uuid">>, couch_db:get_uuid(Db)},
+                {<<"target_seq">>, TargetSeq}
+            ] ++ NewEntry0},
+            Body = {[
+                {<<"seq">>, SourceSeq},
+                {<<"history">>, add_checkpoint(NewEntry, History0)}
+            ]},
+            Doc = #doc{id = Id, body = Body},
+            rexi:reply(try couch_db:update_doc(Db, Doc, []) of
+                {ok, _} ->
+                    {ok, Body};
+                Else ->
+                    {error, Else}
+            catch
+                Exception ->
+                    Exception;
+                error:Reason ->
+                    {error, Reason}
+            end);
+        Error ->
+            rexi:reply(Error)
+    end.
+
 
+%% @doc This adds a new update sequence checkpoint to the replication
+%%      history. Checkpoints are keyed by the source node so that we
+%%      aren't mixing history between source shard moves.
+add_checkpoint({Props}, {History}) ->
+    % Extract the source and target seqs for reference
+    SourceSeq = couch_util:get_value(<<"source_seq">>, Props),
+    TargetSeq = couch_util:get_value(<<"target_seq">>, Props),
 
-save_checkpoint(Node, DbName, Doc, Options) ->
-    rexi_call(Node, {fabric_rpc, update_docs, [DbName, [Doc], Options]}).
+    % Get the history relevant to the source node.
+    SourceNode = couch_util:get_value(<<"source_node">>, Props),
+    SourceHistory = couch_util:get_value(SourceNode, History, []),
+
+    % If either the source or target shard has been truncated
+    % we need to filter out any history that was stored for
+    % any larger update seq than we're currently recording.
+    FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory),
+
+    % Insert the new entry into the history and trim the history
+    % to keep an exponentially increasing delta between checkpoints.
+    % We do this by defining logical buckets of exponentially
+    % increasing size and then keep the smallest and largest values
+    % in each bucket. We keep both min and max points so that
+    % we don't end up with empty buckets as new points are added.
+    %
+    % NB: We're guaranteed to keep the newest entry passed to this
+    % function because we filter out all larger update sequences
+    % which means it is guaranteed to be the smallest value in the
+    % first bucket with a delta of 0.
+    WithNewEntry = [{Props} | FilteredHistory],
+
+    % Tag each entry with the bucket id
+    BucketTagged = lists:map(fun({Entry}) ->
+        EntrySourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
+        BucketTag = case SourceSeq - EntrySourceSeq of
+            0 ->
+                0;
+            N when N > 0 ->
+                % This is int(log2(SourceSeq - EntrySourceSeq))
+                trunc(math:log(N) / math:log(2))
+        end,
+        {BucketTag, SourceSeq - EntrySourceSeq, {Entry}}
+    end, WithNewEntry),
+
+    % Find the min/max entries for each bucket
+    Buckets = lists:foldl(fun({Bucket, Delta, Entry}, BucketAcc) ->
+        {MinEntry, MaxEntry} = case dict:find(Bucket, BucketAcc) of
+            {ok, Value} -> Value;
+            error -> {nil, nil}
+        end,
+        NewMin = case MinEntry of
+            {MinDelta, _} when Delta < MinDelta ->
+                {Delta, Entry};
+            nil ->
+                {Delta, Entry};
+            _ ->
+                MinEntry
+        end,
+        NewMax = case MaxEntry of
+            {MaxDelta, _} when Delta > MaxDelta ->
+                {Delta, Entry};
+            nil ->
+                {Delta, Entry};
+            _ ->
+                MaxEntry
+        end,
+        dict:store(Bucket, {NewMin, NewMax}, BucketAcc)
+    end, dict:new(), BucketTagged),
+
+    % Turn our bucket dict back into a list sorted by increasing
+    % deltas (which corresponds to decreasing source_seq values).
+    NewSourceHistory = lists:flatmap(fun({_Bucket, {Min, Max}}) ->
+        % If there's a single point in a bucket its both the min
+        % and max entry so we account for that here.
+        if Min == Max ->
+            [element(2, Min)];
+        true ->
+            [element(2, Min), element(2, Max)]
+        end
+    end, lists:sort(dict:to_list(Buckets))),
+
+    % Finally update the source node history and we're done.
+    NodeRemoved = lists:keydelete(SourceNode, 1, History),
+    {[{SourceNode, NewSourceHistory} | NodeRemoved]}.
+
+
+filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
+    SourceFilter = fun({Entry}) ->
+        SourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
+        SourceSeq < SourceSeqThresh
+    end,
+    TargetFilter = fun({Entry}) ->
+        TargetSeq = couch_util:get_value(<<"target_seq">>, Entry),
+        TargetSeq < TargetSeqThresh
+    end,
+    SourceFiltered = lists:filter(SourceFilter, History),
+    lists:filter(TargetFilter, SourceFiltered).
 
 
 rexi_call(Node, MFA) ->
@@ -62,3 +221,310 @@ rexi_call(Node, MFA) ->
     after
         rexi_monitor:stop(Mon)
     end.
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+-define(SNODE, <<"src@localhost">>).
+-define(TNODE, <<"tgt@localhost">>).
+-define(SNODE_KV, {<<"source_node">>, ?SNODE}).
+-define(TNODE_KV, {<<"target_node">>, ?TNODE}).
+-define(SSEQ, <<"source_seq">>).
+-define(TSEQ, <<"target_seq">>).
+-define(ENTRY(S, T), {[?SNODE_KV, {?SSEQ, S}, ?TNODE_KV, {?TSEQ, T}]}).
+
+
+filter_history_data() ->
+    [
+        ?ENTRY(13, 15),
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ].
+
+
+filter_history_remove_none_test() ->
+    ?assertEqual(filter_history(20, 20, filter_history_data()), [
+        ?ENTRY(13, 15),
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]).
+
+
+filter_history_remove_all_test() ->
+    ?assertEqual(filter_history(1, 1, filter_history_data()), []).
+
+
+filter_history_remove_equal_test() ->
+    ?assertEqual(filter_history(10, 10, filter_history_data()), [
+        ?ENTRY(2, 3)
+    ]),
+    ?assertEqual(filter_history(11, 9, filter_history_data()), [
+        ?ENTRY(2, 3)
+    ]).
+
+
+filter_history_remove_for_source_and_target_test() ->
+    ?assertEqual(filter_history(11, 20, filter_history_data()), [
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]),
+    ?assertEqual(filter_history(14, 14, filter_history_data()), [
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]).
+
+
+filter_history_remove_for_both_test() ->
+    ?assertEqual(filter_history(11, 11, filter_history_data()), [
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]).
+
+
+filter_history_remove_for_both_again_test() ->
+    ?assertEqual(filter_history(3, 4, filter_history_data()), [
+        ?ENTRY(2, 3)
+    ]).
+
+
+add_first_checkpoint_test() ->
+    History = {[]},
+    ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[
+        {?SNODE, [
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+add_first_checkpoint_to_empty_test() ->
+    History = {[{?SNODE, []}]},
+    ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[
+        {?SNODE, [
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+add_second_checkpoint_test() ->
+    History = {[{?SNODE, [?ENTRY(2, 3)]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(10, 9), History), {[
+        {?SNODE, [
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+add_third_checkpoint_test() ->
+    History = {[{?SNODE, [
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(11, 10), History), {[
+        {?SNODE, [
+            ?ENTRY(11, 10),
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+add_fourth_checkpoint_test() ->
+    History = {[{?SNODE, [
+        ?ENTRY(11, 10),
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(12, 13), History), {[
+        {?SNODE, [
+            ?ENTRY(12, 13),
+            ?ENTRY(11, 10),
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+add_checkpoint_with_replacement_test() ->
+    History = {[{?SNODE, [
+        ?ENTRY(12, 13),
+        ?ENTRY(11, 10),
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]}]},
+    % Picking a source_seq of 16 to force 10, 11, and 12
+    % into the same bucket to show we drop the 11 entry.
+    ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[
+        {?SNODE, [
+            ?ENTRY(16, 16),
+            ?ENTRY(12, 13),
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+add_checkpoint_drops_redundant_checkpoints_test() ->
+    % I've added comments showing the bucket ID based
+    % on the ?ENTRY passed to add_checkpoint
+    History = {[{?SNODE, [
+        ?ENTRY(15, 15), % Bucket 0
+        ?ENTRY(14, 14), % Bucket 1
+        ?ENTRY(13, 13), % Bucket 1
+        ?ENTRY(12, 12), % Bucket 2
+        ?ENTRY(11, 11), % Bucket 2
+        ?ENTRY(10, 10), % Bucket 2
+        ?ENTRY(9, 9),   % Bucket 2
+        ?ENTRY(8, 8),   % Bucket 3
+        ?ENTRY(7, 7),   % Bucket 3
+        ?ENTRY(6, 6),   % Bucket 3
+        ?ENTRY(5, 5),   % Bucket 3
+        ?ENTRY(4, 4),   % Bucket 3
+        ?ENTRY(3, 3),   % Bucket 3
+        ?ENTRY(2, 2),   % Bucket 3
+        ?ENTRY(1, 1)    % Bucket 3
+    ]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[
+        {?SNODE, [
+            ?ENTRY(16, 16), % Bucket 0
+            ?ENTRY(15, 15), % Bucket 0
+            ?ENTRY(14, 14), % Bucket 1
+            ?ENTRY(13, 13), % Bucket 1
+            ?ENTRY(12, 12), % Bucket 2
+            ?ENTRY(9, 9),   % Bucket 2
+            ?ENTRY(8, 8),   % Bucket 3
+            ?ENTRY(1, 1)    % Bucket 3
+        ]}
+    ]}).
+
+
+add_checkpoint_show_not_always_a_drop_test() ->
+    % Depending on the edge conditions of buckets we
+    % may not always drop values when adding new
+    % checkpoints. In this case 12 stays because there's
+    % no longer a value for 10 or 11.
+    %
+    % I've added comments showing the bucket ID based
+    % on the ?ENTRY passed to add_checkpoint
+    History = {[{?SNODE, [
+        ?ENTRY(16, 16), % Bucket 0
+        ?ENTRY(15, 15), % Bucket 1
+        ?ENTRY(14, 14), % Bucket 1
+        ?ENTRY(13, 13), % Bucket 2
+        ?ENTRY(12, 12), % Bucket 2
+        ?ENTRY(9, 9),   % Bucket 3
+        ?ENTRY(8, 8),   % Bucket 3
+        ?ENTRY(1, 1)    % Bucket 4
+    ]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(17, 17), History), {[
+        {?SNODE, [
+            ?ENTRY(17, 17), % Bucket 0
+            ?ENTRY(16, 16), % Bucket 0
+            ?ENTRY(15, 15), % Bucket 1
+            ?ENTRY(14, 14), % Bucket 1
+            ?ENTRY(13, 13), % Bucket 2
+            ?ENTRY(12, 12), % Bucket 2
+            ?ENTRY(9, 9),   % Bucket 3
+            ?ENTRY(8, 8),   % Bucket 3
+            ?ENTRY(1, 1)    % Bucket 4
+        ]}
+    ]}).
+
+
+add_checkpoint_big_jump_show_lots_drop_test() ->
+    % I've added comments showing the bucket ID based
+    % on the ?ENTRY passed to add_checkpoint
+    History = {[{?SNODE, [
+        ?ENTRY(16, 16), % Bucket 4
+        ?ENTRY(15, 15), % Bucket 4
+        ?ENTRY(14, 14), % Bucket 4
+        ?ENTRY(13, 13), % Bucket 4
+        ?ENTRY(12, 12), % Bucket 4
+        ?ENTRY(9, 9),   % Bucket 4
+        ?ENTRY(8, 8),   % Bucket 4
+        ?ENTRY(1, 1)    % Bucket 4
+    ]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(32, 32), History), {[
+        {?SNODE, [
+            ?ENTRY(32, 32), % Bucket 0
+            ?ENTRY(16, 16), % Bucket 4
+            ?ENTRY(1, 1)    % Bucket 4
+        ]}
+    ]}).
+
+
+add_checkpoint_show_filter_history_test() ->
+    History = {[{?SNODE, [
+        ?ENTRY(16, 16),
+        ?ENTRY(15, 15),
+        ?ENTRY(14, 14),
+        ?ENTRY(13, 13),
+        ?ENTRY(12, 12),
+        ?ENTRY(9, 9),
+        ?ENTRY(8, 8),
+        ?ENTRY(1, 1)
+    ]}]},
+    % Drop for both
+    ?assertEqual(add_checkpoint(?ENTRY(10, 10), History), {[
+        {?SNODE, [
+            ?ENTRY(10, 10),
+            ?ENTRY(9, 9),
+            ?ENTRY(8, 8),
+            ?ENTRY(1, 1)
+        ]}
+    ]}),
+    % Drop four source
+    ?assertEqual(add_checkpoint(?ENTRY(10, 200), History), {[
+        {?SNODE, [
+            ?ENTRY(10, 200),
+            ?ENTRY(9, 9),
+            ?ENTRY(8, 8),
+            ?ENTRY(1, 1)
+        ]}
+    ]}),
+    % Drop for target. Obviously a source_seq of 200
+    % will end up droping the 8 entry.
+    ?assertEqual(add_checkpoint(?ENTRY(200, 10), History), {[
+        {?SNODE, [
+            ?ENTRY(200, 10),
+            ?ENTRY(9, 9),
+            ?ENTRY(1, 1)
+        ]}
+    ]}).
+
+
+add_checkpoint_from_other_node_test() ->
+    History = {[{<<"not_the_source">>, [
+        ?ENTRY(12, 13),
+        ?ENTRY(11, 10),
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]}]},
+    % No filtering
+    ?assertEqual(add_checkpoint(?ENTRY(1, 1), History), {[
+        {?SNODE, [
+            ?ENTRY(1, 1)
+        ]},
+        {<<"not_the_source">>, [
+            ?ENTRY(12, 13),
+            ?ENTRY(11, 10),
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}),
+    % No dropping
+    ?assertEqual(add_checkpoint(?ENTRY(200, 200), History), {[
+        {?SNODE, [
+            ?ENTRY(200, 200)
+        ]},
+        {<<"not_the_source">>, [
+            ?ENTRY(12, 13),
+            ?ENTRY(11, 10),
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+-endif.


Mime
View raw message