couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1003151 - in /couchdb/branches/new_replicator: etc/couchdb/default.ini.tpl.in src/couchdb/couch_replicate.erl
Date Thu, 30 Sep 2010 16:45:38 GMT
Author: fdmanana
Date: Thu Sep 30 16:45:37 2010
New Revision: 1003151

URL: http://svn.apache.org/viewvc?rev=1003151&view=rev
Log:
New replicator: add support for multiple missing revs finder processes.

Modified:
    couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl

Modified: couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in?rev=1003151&r1=1003150&r2=1003151&view=diff
==============================================================================
--- couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in Thu Sep 30 16:45:37 2010
@@ -116,4 +116,5 @@ compression_level = 8 ; from 1 (lowest, 
 compressible_types = text/*, application/javascript, application/json,  application/xml
 
 [replicator]
-copy_processes = 10
+; should be at least 2
+worker_processes = 10

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=1003151&r1=1003150&r2=1003151&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Thu Sep 30 16:45:37 2010
@@ -31,6 +31,7 @@
 % Can't be greater than the maximum number of child restarts specified
 % in couch_rep_sup.erl.
 -define(MAX_RESTARTS, 3).
+-define(REV_BATCH_SIZE, 100).
 -define(DOC_BATCH_SIZE, 50).
 
 
@@ -64,7 +65,7 @@
     missing_revs_queue,
     changes_queue,
     changes_reader,
-    missing_revs_finder,
+    missing_rev_finders,
     doc_copiers,
     finished_doc_copiers = 0,
     seqs_in_progress = gb_trees:from_orddict([]),
@@ -229,46 +230,50 @@ do_init(#rep{options = Options} = Rep) -
     } = State = init_state(Rep),
 
     {ok, MissingRevsQueue} = couch_work_queue:new(
-        [{max_size, 100000}, {max_items, 500}, {multi_workers, true}]),
+        [{max_size, 100000}, {max_items, 2000}, {multi_workers, true}]),
+
+    {RevFindersCount, CopiersCount} = case ?l2i(
+        couch_config:get("replicator", "worker_processes", "10")) of
+    Small when Small < 2 ->
+        ?LOG_ERROR("The number of worker processes for the replicator "
+            "should be at least 2", []),
+        {1, 1};
+    N ->
+        {N div 2, (N div 2) + (N rem 2)}
+    end,
 
     case get_value(doc_ids, Options) of
     undefined ->
         {ok, ChangesQueue} = couch_work_queue:new(
-            [{max_size, 100000}, {max_items, 500}]),
+            [{max_size, 100000}, {max_items, 500}, {multi_workers, true}]),
 
         % This starts the _changes reader process. It adds the changes from
         % the source db to the ChangesQueue.
         ChangesReader = spawn_changes_reader(self(), StartSeq, Source,
             ChangesQueue, Options),
 
-        % This starts the missing revs finder. It checks the target for changes
+        % This starts the missing rev finders. They check the target for changes
         % in the ChangesQueue to see if they exist on the target or not. If not,
         % adds them to MissingRevsQueue.
-        MissingRevsFinder = spawn_missing_revs_finder(self(), Target,
-            ChangesQueue, MissingRevsQueue);
+        MissingRevFinders = spawn_missing_rev_finders(
+            self(), Target, ChangesQueue, MissingRevsQueue, RevFindersCount);
     DocIds ->
         ChangesQueue = nil,
         ChangesReader = nil,
-        MissingRevsFinder = case DocIds of
-        [] ->
-            % avoid getting the doc copier process get blocked if it dequeues
-            % before the queue is closed
-            couch_work_queue:close(MissingRevsQueue),
-            nil;
-        Ids when is_list(Ids) ->
-            spawn_missing_revs_finder(self(), Target, Ids, MissingRevsQueue)
-        end
+        MissingRevFinders = spawn_missing_rev_finders(
+            self(), Target, DocIds, MissingRevsQueue, RevFindersCount)
     end,
 
     % This starts the doc copy processes. They fetch documents from the
     % MissingRevsQueue and copy them from the source to the target database.
-    DocCopiers = spawn_doc_copiers(self(), Source, Target, MissingRevsQueue),
+    DocCopiers = spawn_doc_copiers(
+        self(), Source, Target, MissingRevsQueue, CopiersCount),
 
     {ok, State#rep_state{
             missing_revs_queue = MissingRevsQueue,
             changes_queue = ChangesQueue,
             changes_reader = ChangesReader,
-            missing_revs_finder = MissingRevsFinder,
+            missing_rev_finders = MissingRevFinders,
             doc_copiers = DocCopiers,
             is_successor_seq = get_value(is_successor_seq, Options,
                 fun(Seq, NextSeq) -> (Seq + 1) =:= NextSeq end)
@@ -358,14 +363,6 @@ handle_info({'EXIT', Pid, Reason}, #rep_
     ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
     {stop, changes_reader_died, State};
 
-handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_finder=Pid} = St) ->
-    {noreply, St};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_finder=Pid} = St) ->
-    cancel_timer(St),
-    ?LOG_ERROR("MissingRevsFinder process died with reason: ~p", [Reason]),
-    {stop, missing_revs_finder_died, St};
-
 handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_queue=Pid} = St) ->
     {noreply, St};
 
@@ -382,22 +379,50 @@ handle_info({'EXIT', Pid, Reason}, #rep_
     ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
     {stop, changes_queue_died, State};
 
+handle_info({'EXIT', Pid, normal}, State) ->
+    #rep_state{
+        doc_copiers = DocCopiers,
+        missing_rev_finders = RevFinders,
+        missing_revs_queue = RevsQueue
+    } = State,
+    case get_value(Pid, RevFinders) of
+    undefined ->
+        case get_value(Pid, DocCopiers) of
+        undefined ->
+            {stop, {unknown_process_died, Pid, normal}, State};
+        _CopierId ->
+            {noreply, State}
+        end;
+    _FinderId ->
+        case lists:keydelete(Pid, 1, RevFinders) of
+        [] ->
+            couch_work_queue:close(RevsQueue),
+            {noreply, State#rep_state{missing_rev_finders = []}};
+        RevFinders2 ->
+            {noreply, State#rep_state{missing_rev_finders = RevFinders2}}
+        end
+    end;
+
 handle_info({'EXIT', Pid, Reason}, State) ->
-    #rep_state{doc_copiers = DocCopiers} = State,
+    #rep_state{
+        doc_copiers = DocCopiers,
+        missing_rev_finders = RevFinders
+    } = State,
+    cancel_timer(State),
     case get_value(Pid, DocCopiers) of
     undefined ->
-        cancel_timer(State),
-        {stop, {unknown_process_died, Pid, Reason}, State};
+        case get_value(Pid, RevFinders) of
+        undefined ->
+            {stop, {unknown_process_died, Pid, Reason}, State};
+        FinderId ->
+            ?LOG_ERROR("RevsFinder process ~p died with reason: ~p",
+                [FinderId, Reason]),
+            {stop, {revs_finder_died, Pid, Reason}, State}
+        end;
     CopierId ->
-        case Reason of
-        normal ->
-            {noreply, State};
-        _ ->
-            cancel_timer(State),
-            ?LOG_ERROR("DocCopier process ~p died with reason: ~p",
-                [CopierId, Reason]),
-            {stop, doc_copier_died, State}
-        end
+        ?LOG_ERROR("DocCopier process ~p died with reason: ~p",
+            [CopierId, Reason]),
+        {stop, {doc_copier_died, Pid, Reason}, State}
     end.
 
 
@@ -531,18 +556,8 @@ spawn_changes_reader(Cp, StartSeq, Sourc
         end).
 
 
-spawn_missing_revs_finder(StatsProcess, 
-        Target, ChangesQueue, MissingRevsQueue) ->
-    % Note, we could spawn more missing revs processes here. Before that's
-    % possible the work_queue code needs to be modified to work with multiple
-    % dequeueing processes
-    spawn_link(fun() ->
-        missing_revs_finder_loop(StatsProcess, 
-                Target, ChangesQueue, MissingRevsQueue)
-        end).
-
-
-missing_revs_finder_loop(_, _, DocIds, MissingRevsQueue) when is_list(DocIds) ->
+spawn_missing_rev_finders(_, _, DocIds, MissingRevsQueue, _)
+    when is_list(DocIds) ->
     lists:foreach(
         fun(DocId) ->
             % Ensure same behaviour as old replicator: accept a list of percent
@@ -550,17 +565,33 @@ missing_revs_finder_loop(_, _, DocIds, M
             Id = ?l2b(couch_httpd:unquote(DocId)),
             ok = couch_work_queue:queue(MissingRevsQueue, {doc_id, Id})
         end, DocIds),
-    couch_work_queue:close(MissingRevsQueue);
+    couch_work_queue:close(MissingRevsQueue),
+    [];
+
+spawn_missing_rev_finders(StatsProcess,
+        Target, ChangesQueue, MissingRevsQueue, RevFindersCount) ->
+    lists:map(
+        fun(FinderId) ->
+            Pid = spawn_link(fun() ->
+                missing_revs_finder_loop(FinderId, StatsProcess,
+                    Target, ChangesQueue, MissingRevsQueue)
+            end),
+            {Pid, FinderId}
+        end, lists:seq(1, RevFindersCount)).
+
 
-missing_revs_finder_loop(Cp, 
-        Target, ChangesQueue, MissingRevsQueue) ->
-    case couch_work_queue:dequeue(ChangesQueue) of
+missing_revs_finder_loop(FinderId, Cp, Target, ChangesQueue, RevsQueue) ->
+    case couch_work_queue:dequeue(ChangesQueue, ?REV_BATCH_SIZE) of
     closed ->
-        couch_work_queue:close(MissingRevsQueue);
+        ok;
     {ok, DocInfos} ->
         IdRevs = [{Id, [Rev || #rev_info{rev=Rev} <- RevsInfo]} ||
                 #doc_info{id=Id, revs=RevsInfo} <- DocInfos],
+        ?LOG_DEBUG("Revs finder ~p got ~p IdRev pairs from queue",
+            [FinderId, length(IdRevs)]),
         {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs),
+        ?LOG_DEBUG("Revs finder ~p found ~p missing IdRev pairs",
+            [FinderId, length(Missing)]),
         % Figured out which on the target are missing.
         % Missing contains the id and revs missing, and any possible
         % ancestors that already exist on the target. This enables
@@ -583,9 +614,9 @@ missing_revs_finder_loop(Cp, 
             % PA means "possible ancestor"
             Cp ! {add_stat, {#stats.missing_found, length(Revs)}},
             {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
-            ok = couch_work_queue:queue(MissingRevsQueue, {Id, Revs, PAs, Seq})
+            ok = couch_work_queue:queue(RevsQueue, {Id, Revs, PAs, Seq})
             end, Missing),
-        missing_revs_finder_loop(Cp, Target, ChangesQueue, MissingRevsQueue)
+        missing_revs_finder_loop(FinderId, Cp, Target, ChangesQueue, RevsQueue)
     end.
 
 
@@ -603,8 +634,7 @@ remove_missing(IdRevsSeqDict, [{MissingI
     end.
 
 
-spawn_doc_copiers(Cp, Source, Target, MissingRevsQueue) ->
-    Count = ?l2i(couch_config:get("replicator", "copy_processes", "10")),
+spawn_doc_copiers(Cp, Source, Target, MissingRevsQueue, CopiersCount) ->
     lists:map(
         fun(CopierId) ->
             Pid = spawn_link(fun() ->
@@ -612,7 +642,7 @@ spawn_doc_copiers(Cp, Source, Target, Mi
             end),
             {Pid, CopierId}
         end,
-        lists:seq(1, Count)).
+        lists:seq(1, CopiersCount)).
 
 
 doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) ->



Mime
View raw message