couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1004041 - in /couchdb/branches/new_replicator/src/couchdb: couch_replicator.erl couch_replicator_doc_copiers.erl
Date Sun, 03 Oct 2010 20:30:52 GMT
Author: fdmanana
Date: Sun Oct  3 20:30:52 2010
New Revision: 1004041

URL: http://svn.apache.org/viewvc?rev=1004041&view=rev
Log:
New replicator: simplifying some code (handling the exit of worker processes).

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1004041&r1=1004040&r2=1004041&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Sun Oct  3 20:30:52 2010
@@ -56,7 +56,6 @@
     changes_reader,
     missing_rev_finders,
     doc_copiers,
-    finished_doc_copiers = 0,
     seqs_in_progress = gb_trees:empty(),
     stats = #rep_stats{}
     }).
@@ -274,25 +273,22 @@ handle_info({'EXIT', Pid, normal}, #rep_
     {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
-    cancel_timer(State),
     ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, State};
+    {stop, changes_reader_died, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_queue=Pid} = St) ->
     {noreply, St};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_queue=Pid} = St) ->
-    cancel_timer(St),
     ?LOG_ERROR("MissingRevsQueue process died with reason: ~p", [Reason]),
-    {stop, missing_revs_queue_died, St};
+    {stop, missing_revs_queue_died, cancel_timer(St)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
     {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
-    cancel_timer(State),
     ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, State};
+    {stop, changes_queue_died, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, State) ->
     #rep_state{
@@ -306,7 +302,12 @@ handle_info({'EXIT', Pid, normal}, State
         undefined ->
             {stop, {unknown_process_died, Pid, normal}, State};
         _CopierId ->
-            {noreply, State}
+            case lists:keydelete(Pid, 1, DocCopiers) of
+            [] ->
+                {stop, normal, do_last_checkpoint(State)};
+            DocCopiers2 ->
+                {noreply, State#rep_state{doc_copiers = DocCopiers2}}
+            end
         end;
     _FinderId ->
         case lists:keydelete(Pid, 1, RevFinders) of
@@ -323,21 +324,21 @@ handle_info({'EXIT', Pid, Reason}, State
         doc_copiers = DocCopiers,
         missing_rev_finders = RevFinders
     } = State,
-    cancel_timer(State),
+    State2 = cancel_timer(State),
     case get_value(Pid, DocCopiers) of
     undefined ->
         case get_value(Pid, RevFinders) of
         undefined ->
-            {stop, {unknown_process_died, Pid, Reason}, State};
+            {stop, {unknown_process_died, Pid, Reason}, State2};
         FinderId ->
             ?LOG_ERROR("RevsFinder process ~p died with reason: ~p",
                 [FinderId, Reason]),
-            {stop, {revs_finder_died, Pid, Reason}, State}
+            {stop, {revs_finder_died, Pid, Reason}, State2}
         end;
     CopierId ->
         ?LOG_ERROR("DocCopier process ~p died with reason: ~p",
             [CopierId, Reason]),
-        {stop, {doc_copier_died, Pid, Reason}, State}
+        {stop, {doc_copier_died, Pid, Reason}, State2}
     end.
 
 
@@ -369,34 +370,6 @@ handle_cast({add_stat, {StatPos, Val}}, 
     NewStats = setelement(StatPos, Stats, Stat + Val),
     {noreply, State#rep_state{stats = NewStats}};
 
-handle_cast({done, _CopierId}, State) ->
-    #rep_state{
-        finished_doc_copiers = Finished,
-        doc_copiers = DocCopiers,
-        next_through_seqs = DoneSeqs,
-        current_through_seq = Seq
-    } = State,
-    State1 = State#rep_state{finished_doc_copiers = Finished + 1},
-    case length(DocCopiers) - 1 of
-    Finished ->
-        % This means all the worker processes have completed their work.
-        % Assert that all the seqs have been processed.
-        0 = gb_trees:size(State#rep_state.seqs_in_progress),
-        LastSeq = case DoneSeqs of
-        [] ->
-            Seq;
-        _ ->
-            lists:max([Seq, lists:last(DoneSeqs)])
-        end,
-        State2 = do_checkpoint(State1#rep_state{
-            current_through_seq = LastSeq,
-            next_through_seqs = []}),
-        cancel_timer(State2),
-        {stop, normal, State2};
-    _ ->
-        {noreply, State1}
-    end;
-
 handle_cast(Msg, State) ->
     ?LOG_ERROR("Replicator received an unexpected asynchronous call: ~p", [Msg]),
     {stop, unexpected_async_message, State}.
@@ -424,6 +397,25 @@ terminate_cleanup(#rep_state{source = So
     couch_api_wrap:db_close(Target).
 
 
+do_last_checkpoint(State) ->
+    #rep_state{
+        next_through_seqs = DoneSeqs,
+        current_through_seq = Seq,
+        seqs_in_progress = InProgress
+    } = State,
+    0 = gb_trees:size(InProgress),
+    LastSeq = case DoneSeqs of
+    [] ->
+        Seq;
+    _ ->
+        lists:max([Seq, lists:last(DoneSeqs)])
+    end,
+    State2 = do_checkpoint(State#rep_state{
+        current_through_seq = LastSeq,
+        next_through_seqs = []}),
+    cancel_timer(State2).
+
+
 start_timer(#rep_state{rep_details = #rep{options = Options}} = State) ->
     case get_value(doc_ids, Options) of
     undefined ->
@@ -440,10 +432,11 @@ start_timer(#rep_state{rep_details = #re
     end.
 
 
-cancel_timer(#rep_state{timer = nil}) ->
-    ok;
-cancel_timer(#rep_state{timer = Timer}) ->
-    {ok, cancel} = timer:cancel(Timer).
+cancel_timer(#rep_state{timer = nil} = State) ->
+    State;
+cancel_timer(#rep_state{timer = Timer} = State) ->
+    {ok, cancel} = timer:cancel(Timer),
+    State#rep_state{timer = nil}.
 
 
 get_result(#rep_state{stats = Stats, rep_details = Rep} = State) ->

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1004041&r1=1004040&r2=1004041&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Sun Oct 
3 20:30:52 2010
@@ -45,7 +45,7 @@ doc_copy_loop(CopierId, Cp, Source, Targ
     case couch_work_queue:dequeue(MissingRevsQueue, ?DOC_BATCH_SIZE) of
     closed ->
         ?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [CopierId]),
-        ok = gen_server:cast(Cp, {done, CopierId});
+        ok;
 
     {ok, [{doc_id, _} | _] = DocIds} ->
         DocAcc = lists:foldl(



Mime
View raw message