couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1027669 - in /couchdb/branches/new_replicator: etc/couchdb/default.ini.tpl.in src/couchdb/couch_replicator.erl src/couchdb/couch_replicator_doc_copiers.erl src/couchdb/couch_replicator_rev_finders.erl
Date Tue, 26 Oct 2010 18:24:48 GMT
Author: fdmanana
Date: Tue Oct 26 18:24:48 2010
New Revision: 1027669

URL: http://svn.apache.org/viewvc?rev=1027669&view=rev
Log:
New replicator: simplified some code and fixed some cases where a wrong sequence number was
stored in the checkpoints.

Modified:
    couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
    couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl

Modified: couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in?rev=1027669&r1=1027668&r2=1027669&view=diff
==============================================================================
--- couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in Tue Oct 26 18:24:48 2010
@@ -118,5 +118,3 @@ compressible_types = text/*, application
 [replicator]
 ; should be at least 2
 worker_processes = 10
-; integer value in bytes
-buffer_size = 1048576
\ No newline at end of file

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1027669&r1=1027668&r2=1027669&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Tue Oct 26 18:24:48 2010
@@ -32,6 +32,10 @@
 % in couch_rep_sup.erl.
 -define(MAX_RESTARTS, 3).
 
+% maximum number of elements (per iteration) that each missing
+% revs finder process fetches from the missing revs queue
+-define(REV_BATCH_SIZE, 1000).
+
 
 -record(rep_state, {
     rep_details,
@@ -56,7 +60,7 @@
     changes_reader,
     missing_rev_finders,
     doc_copiers,
-    seqs_in_progress = gb_trees:empty(),
+    seqs_in_progress = gb_sets:empty(),
     stats = #rep_stats{}
     }).
 
@@ -217,11 +221,6 @@ do_init(#rep{options = Options} = Rep) -
         start_seq = StartSeq
     } = State = init_state(Rep),
 
-    QueueSize = round(
-        (?l2i(couch_config:get("replicator", "buffer_size", "1048576"))) / 2),
-    {ok, MissingRevsQueue} = couch_work_queue:new(
-        [{max_size, QueueSize}, {multi_workers, true}]),
-
     {RevFindersCount, CopiersCount} = case ?l2i(
         couch_config:get("replicator", "worker_processes", "10")) of
     Small when Small < 2 ->
@@ -231,11 +230,16 @@ do_init(#rep{options = Options} = Rep) -
     N ->
         {N div 2, N div 2 + N rem 2}
     end,
+    {ok, MissingRevsQueue} = couch_work_queue:new([
+        {multi_workers, true}, {max_items, trunc(CopiersCount * 1.5)}
+    ]),
 
     case get_value(doc_ids, Options) of
     undefined ->
-        {ok, ChangesQueue} = couch_work_queue:new(
-            [{max_size, QueueSize}, {multi_workers, true}]),
+        {ok, ChangesQueue} = couch_work_queue:new([
+            {multi_workers, true},
+            {max_items, trunc(RevFindersCount * 1.5) * ?REV_BATCH_SIZE}
+        ]),
 
         % This starts the _changes reader process. It adds the changes from
         % the source db to the ChangesQueue.
@@ -246,8 +250,9 @@ do_init(#rep{options = Options} = Rep) -
         % in the ChangesQueue to see if they exist on the target or not. If not,
         % adds them to MissingRevsQueue.
         MissingRevFinders =
-            couch_replicator_rev_finders:spawn_missing_rev_finders(self(),
-                Target, ChangesQueue, MissingRevsQueue, RevFindersCount);
+            couch_replicator_rev_finders:spawn_missing_rev_finders(
+                self(), Target, ChangesQueue, MissingRevsQueue,
+                RevFindersCount, ?REV_BATCH_SIZE);
     DocIds ->
         ChangesQueue = nil,
         ChangesReader = nil,
@@ -353,17 +358,20 @@ handle_cast(checkpoint, State) ->
 
 handle_cast({seq_start, {LargestSeq, NumChanges}}, State) ->
     #rep_state{
-        seqs_in_progress = SeqsTree,
-        stats = #rep_stats{missing_checked = Mc} = Stats
+        seqs_in_progress = SeqsInProgress,
+        stats = #rep_stats{missing_checked = Mc, missing_found = Mf} = Stats
     } = State,
     NewState = State#rep_state{
-        seqs_in_progress = gb_trees:insert(LargestSeq, NumChanges, SeqsTree),
-        stats = Stats#rep_stats{missing_checked = Mc + NumChanges}
+        seqs_in_progress = gb_sets:insert(LargestSeq, SeqsInProgress),
+        stats = Stats#rep_stats{
+            missing_checked = Mc + NumChanges,
+            missing_found = Mf + NumChanges
+        }
     },
     {noreply, NewState};
 
-handle_cast({seq_changes_done, Changes}, State) ->
-    {noreply, process_seq_changes_done(Changes, State)};
+handle_cast({seq_done, SeqDone}, State) ->
+    {noreply, process_seq_done(SeqDone, State)};
 
 handle_cast({add_stats, StatsInc}, #rep_state{stats = Stats} = State) ->
     {noreply, State#rep_state{stats = sum_stats([Stats, StatsInc])}};
@@ -401,7 +409,7 @@ do_last_checkpoint(State) ->
         current_through_seq = Seq,
         seqs_in_progress = InProgress
     } = State,
-    0 = gb_trees:size(InProgress),
+    0 = gb_sets:size(InProgress),
     LastSeq = case DoneSeqs of
     [] ->
         Seq;
@@ -658,26 +666,20 @@ has_session_id(SessionId, [{Props} | Res
     end.
 
 
-process_seq_changes_done({Seq, NumChangesDone}, State) ->
+process_seq_done(Seq, State) ->
     #rep_state{
         seqs_in_progress = SeqsInProgress,
         next_through_seqs = DoneSeqs,
         current_through_seq = ThroughSeq
     } = State,
 
-    Total = gb_trees:get(Seq, SeqsInProgress),
-    {ThroughSeq2, SeqsInProgress2, DoneSeqs2} = case Total - NumChangesDone of
-    0 ->
-        {NewDoneSeqs, NewThroughSeq} = case gb_trees:smallest(SeqsInProgress) of
-        {Seq, Total} ->
-            {DoneSeqs, Seq};
-        _ ->
-            {ordsets:add_element(Seq, DoneSeqs), ThroughSeq}
-        end,
-        {NewThroughSeq, gb_trees:delete(Seq, SeqsInProgress), NewDoneSeqs};
-    NewTotal when NewTotal > 0 ->
-        {ThroughSeq, gb_trees:update(Seq, NewTotal, SeqsInProgress), DoneSeqs}
+    {DoneSeqs2, ThroughSeq2} = case gb_sets:smallest(SeqsInProgress) of
+    Seq ->
+        {DoneSeqs, Seq};
+    _ ->
+        {ordsets:add_element(Seq, DoneSeqs), ThroughSeq}
     end,
+    SeqsInProgress2 = gb_sets:delete(Seq, SeqsInProgress),
 
     {ThroughSeq3, DoneSeqs3} =
         get_next_through_seq(ThroughSeq2, SeqsInProgress2, DoneSeqs2),
@@ -690,7 +692,7 @@ process_seq_changes_done({Seq, NumChange
 
 
 get_next_through_seq(Current, InProgress, Done) ->
-    case gb_trees:is_empty(InProgress) of
+    case gb_sets:is_empty(InProgress) of
     true ->
         case Done of
         [] ->
@@ -699,7 +701,7 @@ get_next_through_seq(Current, InProgress
             {lists:last(Done), []}
         end;
     false ->
-        {SmallestInProgress, _} = gb_trees:smallest(InProgress),
+        SmallestInProgress = gb_sets:smallest(InProgress),
         get_next_through_seq(Current, SmallestInProgress, Done, [])
     end.
 

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1027669&r1=1027668&r2=1027669&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Tue Oct 26
18:24:48 2010
@@ -17,9 +17,6 @@
 -include("couch_db.hrl").
 -include("couch_api_wrap.hrl").
 
--define(DOC_BATCH_SIZE, 1000).
-
-
 
 spawn_doc_copiers(Cp, Source, Target, MissingRevsQueue, CopiersCount) ->
     lists:map(
@@ -39,9 +36,8 @@ spawn_doc_copiers(Cp, Source, Target, Mi
 }).
 
 doc_copy_loop(Cp, Source, Target, MissingRevsQueue) ->
-    Result = case couch_work_queue:dequeue(MissingRevsQueue, ?DOC_BATCH_SIZE) of
+    Result = case couch_work_queue:dequeue(MissingRevsQueue, 1) of
     closed ->
-        ?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [self()]),
         stop;
 
     {ok, [{doc_id, _} | _] = DocIds} ->
@@ -54,18 +50,9 @@ doc_copy_loop(Cp, Source, Target, Missin
                 Acc2
             end,
             #doc_acc{}, DocIds),
-        {Source, Acc, {nil, 0}};
+        {Source, Acc, nil};
 
-    {ok, [{_, FirstRevs, _, FirstSeq} | RestIdRevList] = IdRevList} ->
-        {LargestSeq, TotalChanges} = lists:foldl(
-            fun({_, Revs, _, Seq}, {Largest, Total}) when Seq > Largest ->
-                {Seq, Total + length(Revs)};
-            ({_, Revs, _, _}, {Largest, Total}) ->
-                {Largest, Total + length(Revs)}
-            end,
-            {FirstSeq, length(FirstRevs)}, RestIdRevList),
-        LastSeqDone = {LargestSeq, TotalChanges},
-        ok = gen_server:cast(Cp, {seq_start, LastSeqDone}),
+    {ok, [{ReportSeq, IdRevList}]} ->
         {NewSource, Acc} = lists:foldl(
             fun({Id, Revs, PossibleAncestors, Seq} = IdRev, {SrcDb, BulkAcc}) ->
                 ?LOG_DEBUG("Doc copier ~p got ~p", [self(), IdRev]),
@@ -77,14 +64,20 @@ doc_copy_loop(Cp, Source, Target, Missin
                 {SrcDb2, BulkAcc2}
             end,
             {Source, #doc_acc{}}, IdRevList),
-        {NewSource, Acc, LastSeqDone}
+        {NewSource, Acc, ReportSeq}
     end,
 
     case Result of
-    {Source2, DocAcc, LargestSeqDone} ->
+    {Source2, DocAcc, SeqDone} ->
+        #doc_acc{
+            written = W,
+            read = R
+        } = DocAcc2 = bulk_write_docs(DocAcc, Target),
         DocAcc2 = bulk_write_docs(DocAcc, Target),
-        seq_done(LargestSeqDone, Cp),
+        seq_done(SeqDone, Cp),
         send_stats(DocAcc2, Cp),
+        ?LOG_DEBUG("Replicator copy process: "
+            "read ~p documents, wrote ~p documents", [R, W]),
         doc_copy_loop(Cp, Source2, Target, MissingRevsQueue);
     stop ->
         ok
@@ -137,10 +130,10 @@ bulk_write_docs(#doc_acc{docs = Docs, wr
     }.
 
 
-seq_done({nil, _}, _Cp) ->
+seq_done(nil, _Cp) ->
     ok;
 seq_done(SeqDone, Cp) ->
-    ok = gen_server:cast(Cp, {seq_changes_done, SeqDone}).
+    ok = gen_server:cast(Cp, {seq_done, SeqDone}).
 
 
 send_stats(#doc_acc{read = R, written = W, wfail = Wf}, Cp) ->

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1027669&r1=1027668&r2=1027669&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Tue Oct 26
18:24:48 2010
@@ -12,15 +12,13 @@
 
 -module(couch_replicator_rev_finders).
 
--export([spawn_missing_rev_finders/5]).
+-export([spawn_missing_rev_finders/6]).
 
 -include("couch_db.hrl").
 
--define(REV_BATCH_SIZE, 1000).
 
 
-
-spawn_missing_rev_finders(_, _, DocIds, MissingRevsQueue, _)
+spawn_missing_rev_finders(_, _, DocIds, MissingRevsQueue, _, _)
     when is_list(DocIds) ->
     lists:foreach(
         fun(DocId) ->
@@ -33,18 +31,18 @@ spawn_missing_rev_finders(_, _, DocIds, 
     [];
 
 spawn_missing_rev_finders(StatsProcess,
-        Target, ChangesQueue, MissingRevsQueue, RevFindersCount) ->
+        Target, ChangesQueue, MissingRevsQueue, RevFindersCount, BatchSize) ->
     lists:map(
         fun(_) ->
             spawn_link(fun() ->
                 missing_revs_finder_loop(StatsProcess,
-                    Target, ChangesQueue, MissingRevsQueue)
+                    Target, ChangesQueue, MissingRevsQueue, BatchSize)
             end)
         end, lists:seq(1, RevFindersCount)).
 
 
-missing_revs_finder_loop(Cp, Target, ChangesQueue, RevsQueue) ->
-    case couch_work_queue:dequeue(ChangesQueue, ?REV_BATCH_SIZE) of
+missing_revs_finder_loop(Cp, Target, ChangesQueue, RevsQueue, BatchSize) ->
+    case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
     closed ->
         ok;
     {ok, DocInfos} ->
@@ -60,24 +58,25 @@ missing_revs_finder_loop(Cp, Target, Cha
         % ancestors that already exist on the target. This enables
         % incremental attachment replication, so the source only needs to send
         % attachments modified since the common ancestor on target.
-
-        IdRevsSeqDict = dict:from_list(
-            [{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} ||
-                    #doc_info{id=Id, revs=RevsInfo, high_seq=Seq} <- DocInfos]),
-        % Expand out each docs and seq into it's own work item
-        MissingCount = lists:foldl(
-            fun({Id, Revs, PAs}, Count) ->
-                % PA means "possible ancestor"
-                {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
-                ok = couch_work_queue:queue(RevsQueue, {Id, Revs, PAs, Seq}),
-                Count + length(Revs)
-            end, 0, Missing),
-        send_missing_found(MissingCount, Cp),
-        missing_revs_finder_loop(Cp, Target, ChangesQueue, RevsQueue)
+        queue_missing_revs(Missing, DocInfos, RevsQueue, Cp),
+        missing_revs_finder_loop(Cp, Target, ChangesQueue, RevsQueue, BatchSize)
     end.
 
 
-send_missing_found(0, _Cp) ->
+queue_missing_revs([], _, _, _) ->
     ok;
-send_missing_found(Value, Cp) ->
-    ok = gen_server:cast(Cp, {add_stats, #rep_stats{missing_found = Value}}).
+queue_missing_revs(Missing, DocInfos, Queue, Cp) ->
+    IdRevsSeqDict = dict:from_list(
+        [{Id, {[Rev || #rev_info{rev = Rev} <- RevsInfo], Seq}} ||
+            #doc_info{id = Id, revs = RevsInfo, high_seq = Seq} <- DocInfos]),
+    {MissingCount, QueueItemList} = lists:foldl(
+        fun({Id, Revs, PAs}, {Count, Q}) ->
+            {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
+            {Count + length(Revs), [{Id, Revs, PAs, Seq} | Q]}
+        end,
+        {0, []},
+        Missing),
+    [{_, _, _, LargestSeq} | _] = QueueItemList,
+    ok = gen_server:cast(Cp, {seq_start, {LargestSeq, MissingCount}}),
+    ok = couch_work_queue:queue(
+           Queue, {LargestSeq, QueueItemList}).



Mime
View raw message