Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 426A211C5B for ; Thu, 28 Aug 2014 12:22:30 +0000 (UTC) Received: (qmail 42713 invoked by uid 500); 28 Aug 2014 12:22:23 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 42566 invoked by uid 500); 28 Aug 2014 12:22:23 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 41413 invoked by uid 99); 28 Aug 2014 12:22:22 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Aug 2014 12:22:22 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7CF33A02DFA; Thu, 28 Aug 2014 12:22:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rnewson@apache.org To: commits@couchdb.apache.org Date: Thu, 28 Aug 2014 12:23:02 -0000 Message-Id: <3fa3c26e54c94327af69b64f5a23c3a0@git.apache.org> In-Reply-To: <29bb38a445e2454d9b33ebc655e7f389@git.apache.org> References: <29bb38a445e2454d9b33ebc655e7f389@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [42/50] mem3 commit: updated refs/heads/master to 64c0c74 Refactor mem3_rpc:add_checkpoint/2 This is based on Adam Kocoloski's original add_checkpoint/2 but uses a body recursive function to avoid the final reverse/filter steps. BugzId: 21973 Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/e64dd028 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/e64dd028 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/e64dd028 Branch: refs/heads/master Commit: e64dd0281f1d9b9b22511b3625c1fb1f97a42042 Parents: e147621 Author: Paul J. Davis Authored: Mon Dec 9 14:04:39 2013 -0600 Committer: Robert Newson Committed: Wed Jul 23 18:51:10 2014 +0100 ---------------------------------------------------------------------- src/mem3_rpc.erl | 133 +++++++++++++++++++++++++++----------------------- 1 file changed, 71 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/e64dd028/src/mem3_rpc.erl ---------------------------------------------------------------------- diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl index 8d8c832..10294a7 100644 --- a/src/mem3_rpc.erl +++ b/src/mem3_rpc.erl @@ -125,68 +125,11 @@ add_checkpoint({Props}, {History}) -> % any larger update seq than we're currently recording. FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory), - % Insert the new entry into the history and trim the history - % to keep an exponentially increasing delta between checkpoints. - % We do this by defining logical buckets of exponentially - % increasing size and then keep the smallest and largest values - % in each bucket. We keep both min and max points so that - % we don't end up with empty buckets as new points are added. - % - % NB: We're guaranteed to keep the newest entry passed to this - % function because we filter out all larger update sequences - % which means it is guaranteed to be the smallest value in the - % first bucket with a delta of 0. - WithNewEntry = [{Props} | FilteredHistory], - - % Tag each entry with the bucket id - BucketTagged = lists:map(fun({Entry}) -> - EntrySourceSeq = couch_util:get_value(<<"source_seq">>, Entry), - BucketTag = case SourceSeq - EntrySourceSeq of - 0 -> - 0; - N when N > 0 -> - % This is int(log2(SourceSeq - EntrySourceSeq)) - trunc(math:log(N) / math:log(2)) - end, - {BucketTag, SourceSeq - EntrySourceSeq, {Entry}} - end, WithNewEntry), - - % Find the min/max entries for each bucket - Buckets = lists:foldl(fun({Bucket, Delta, Entry}, BucketAcc) -> - {MinEntry, MaxEntry} = case dict:find(Bucket, BucketAcc) of - {ok, Value} -> Value; - error -> {nil, nil} - end, - NewMin = case MinEntry of - {MinDelta, _} when Delta < MinDelta -> - {Delta, Entry}; - nil -> - {Delta, Entry}; - _ -> - MinEntry - end, - NewMax = case MaxEntry of - {MaxDelta, _} when Delta > MaxDelta -> - {Delta, Entry}; - nil -> - {Delta, Entry}; - _ -> - MaxEntry - end, - dict:store(Bucket, {NewMin, NewMax}, BucketAcc) - end, dict:new(), BucketTagged), - - % Turn our bucket dict back into a list sorted by increasing - % deltas (which corresponds to decreasing source_seq values). - NewSourceHistory = lists:flatmap(fun({_Bucket, {Min, Max}}) -> - % If there's a single point in a bucket its both the min - % and max entry so we account for that here. - if Min == Max -> - [element(2, Min)]; - true -> - [element(2, Min), element(2, Max)] - end - end, lists:sort(dict:to_list(Buckets))), + % Re-bucket our history based on the most recent source + % sequence. This is where we drop old checkpoints to + % maintain the exponential distribution. + {_, RebucketedHistory} = rebucket(FilteredHistory, SourceSeq, 0), + NewSourceHistory = [{Props} | RebucketedHistory], % Finally update the source node history and we're done. NodeRemoved = lists:keydelete(SourceNode, 1, History), @@ -206,6 +149,72 @@ filter_history(SourceSeqThresh, TargetSeqThresh, History) -> lists:filter(TargetFilter, SourceFiltered). +%% @doc This function adjusts our history to maintain a +%% history of checkpoints that follow an exponentially +%% increasing age from the most recent checkpoint. +%% +%% The terms newest and oldest used in these comments +%% refers to the (NewSeq - CurSeq) difference where smaller +%% values are considered newer. +%% +%% It works by assigning each entry to a bucket and keeping +%% the newest and oldest entry in each bucket. Keeping +%% both the newest and oldest means that we won't end up +%% with empty buckets as checkpoints are promoted to new +%% buckets. +%% +%% The return value of this function is a two-tuple of the +%% form `{BucketId, History}` where BucketId is the id of +%% the bucket for the first entry in History. This is used +%% when recursing to detect the oldest value in a given +%% bucket. +%% +%% This function expects the provided history to be sorted +%% in descending order of source_seq values. +rebucket([], _NewSeq, Bucket) -> + {Bucket+1, []}; +rebucket([{Entry} | RestHistory], NewSeq, Bucket) -> + CurSeq = couch_util:get_value(<<"source_seq">>, Entry), + case find_bucket(NewSeq, CurSeq, Bucket) of + Bucket -> + % This entry is in an existing bucket which means + % we will only keep it if its the oldest value + % in the bucket. To detect this we rebucket the + % rest of the list and only include Entry if the + % rest of the list is in a bigger bucket. + case rebucket(RestHistory, NewSeq, Bucket) of + {Bucket, NewHistory} -> + % There's another entry in this bucket so we drop the + % current entry. + {Bucket, NewHistory}; + {NextBucket, NewHistory} when NextBucket > Bucket -> + % The rest of the history was rebucketed into a larger + % bucket so this is the oldest entry in the current + % bucket. + {Bucket, [{Entry} | NewHistory]} + end; + NextBucket when NextBucket > Bucket -> + % This entry is the newest in NextBucket so we add it + % to our history and continue rebucketing. + {_, NewHistory} = rebucket(RestHistory, NewSeq, NextBucket), + {NextBucket, [{Entry} | NewHistory]} + end. + + +%% @doc Find the bucket id for the given sequence pair. +find_bucket(NewSeq, CurSeq, Bucket) -> + % The +1 constant in this comparison is a bit subtle. The + % reason for it is to make sure that the first entry in + % the history is guaranteed to have a BucketId of 1. This + % also relies on never having a duplicated update + % sequence so adding 1 here guarantees a difference >= 2. + if (NewSeq - CurSeq + 1) > (2 bsl Bucket) -> + find_bucket(NewSeq, CurSeq, Bucket+1); + true -> + Bucket + end. + + rexi_call(Node, MFA) -> Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]), Ref = rexi:cast(Node, self(), MFA, [sync]),