couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1004040 - in /couchdb/branches/new_replicator/src/couchdb: couch_replicator.erl couch_replicator_doc_copiers.erl couch_replicator_rev_finders.erl
Date Sun, 03 Oct 2010 20:29:09 GMT
Author: fdmanana
Date: Sun Oct  3 20:29:08 2010
New Revision: 1004040

URL: http://svn.apache.org/viewvc?rev=1004040&view=rev
Log:
New replicator: replacing regular message passing with gen_server casts.

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1004040&r1=1004039&r2=1004040&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Sun Oct  3 20:29:08 2010
@@ -270,53 +270,6 @@ do_init(#rep{options = Options} = Rep) -
     }.
 
 
-handle_info({seq_start, {Seq, NumChanges}}, State) ->
-    #rep_state{
-        seqs_in_progress = SeqsInProgress,
-        stats = #rep_stats{missing_checked = Mc} = Stats
-    } = State,
-    NewState = State#rep_state{
-        seqs_in_progress = gb_trees:insert(Seq, NumChanges, SeqsInProgress),
-        stats = Stats#rep_stats{missing_checked = Mc + NumChanges}
-    },
-    {noreply, NewState};
-
-handle_info({seq_changes_done, Changes}, State) ->
-    {noreply, process_seq_changes_done(Changes, State)};
-
-handle_info({add_stat, {StatPos, Val}}, #rep_state{stats = Stats} = State) ->
-    Stat = element(StatPos, Stats),
-    NewStats = setelement(StatPos, Stats, Stat + Val),
-    {noreply, State#rep_state{stats = NewStats}};
-
-handle_info({done, _CopierId}, State) ->
-    #rep_state{
-        finished_doc_copiers = Finished,
-        doc_copiers = DocCopiers,
-        next_through_seqs = DoneSeqs,
-        current_through_seq = Seq
-    } = State,
-    State1 = State#rep_state{finished_doc_copiers = Finished + 1},
-    case length(DocCopiers) - 1 of
-    Finished ->
-        % This means all the worker processes have completed their work.
-        % Assert that all the seqs have been processed.
-        0 = gb_trees:size(State#rep_state.seqs_in_progress),
-        LastSeq = case DoneSeqs of
-        [] ->
-            Seq;
-        _ ->
-            lists:max([Seq, lists:last(DoneSeqs)])
-        end,
-        State2 = do_checkpoint(State1#rep_state{
-            current_through_seq = LastSeq,
-            next_through_seqs = []}),
-        cancel_timer(State2),
-        {stop, normal, State2};
-    _ ->
-        {noreply, State1}
-    end;
-
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
     {noreply, State};
 
@@ -397,6 +350,53 @@ handle_cast(checkpoint, State) ->
     State2 = do_checkpoint(State),
     {noreply, State2#rep_state{timer = start_timer(State)}};
 
+handle_cast({seq_start, {Seq, NumChanges}}, State) ->
+    #rep_state{
+        seqs_in_progress = SeqsInProgress,
+        stats = #rep_stats{missing_checked = Mc} = Stats
+    } = State,
+    NewState = State#rep_state{
+        seqs_in_progress = gb_trees:insert(Seq, NumChanges, SeqsInProgress),
+        stats = Stats#rep_stats{missing_checked = Mc + NumChanges}
+    },
+    {noreply, NewState};
+
+handle_cast({seq_changes_done, Changes}, State) ->
+    {noreply, process_seq_changes_done(Changes, State)};
+
+handle_cast({add_stat, {StatPos, Val}}, #rep_state{stats = Stats} = State) ->
+    Stat = element(StatPos, Stats),
+    NewStats = setelement(StatPos, Stats, Stat + Val),
+    {noreply, State#rep_state{stats = NewStats}};
+
+handle_cast({done, _CopierId}, State) ->
+    #rep_state{
+        finished_doc_copiers = Finished,
+        doc_copiers = DocCopiers,
+        next_through_seqs = DoneSeqs,
+        current_through_seq = Seq
+    } = State,
+    State1 = State#rep_state{finished_doc_copiers = Finished + 1},
+    case length(DocCopiers) - 1 of
+    Finished ->
+        % This means all the worker processes have completed their work.
+        % Assert that all the seqs have been processed.
+        0 = gb_trees:size(State#rep_state.seqs_in_progress),
+        LastSeq = case DoneSeqs of
+        [] ->
+            Seq;
+        _ ->
+            lists:max([Seq, lists:last(DoneSeqs)])
+        end,
+        State2 = do_checkpoint(State1#rep_state{
+            current_through_seq = LastSeq,
+            next_through_seqs = []}),
+        cancel_timer(State2),
+        {stop, normal, State2};
+    _ ->
+        {noreply, State1}
+    end;
+
 handle_cast(Msg, State) ->
     ?LOG_ERROR("Replicator received an unexpected asynchronous call: ~p", [Msg]),
     {stop, unexpected_async_message, State}.
@@ -510,7 +510,7 @@ spawn_changes_reader(Cp, StartSeq, Sourc
         fun()->
             couch_api_wrap:changes_since(Source, all_docs, StartSeq,
                 fun(#doc_info{high_seq=Seq, revs=Revs} = DocInfo) ->
-                    Cp ! {seq_start, {Seq, length(Revs)}},
+                    ok = gen_server:cast(Cp, {seq_start, {Seq, length(Revs)}}),
                     ok = couch_work_queue:queue(ChangesQueue, DocInfo)
                 end, Options),
             couch_work_queue:close(ChangesQueue)

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1004040&r1=1004039&r2=1004040&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Sun Oct 
3 20:29:08 2010
@@ -45,7 +45,7 @@ doc_copy_loop(CopierId, Cp, Source, Targ
     case couch_work_queue:dequeue(MissingRevsQueue, ?DOC_BATCH_SIZE) of
     closed ->
         ?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [CopierId]),
-        Cp ! {done, CopierId};
+        ok = gen_server:cast(Cp, {done, CopierId});
 
     {ok, [{doc_id, _} | _] = DocIds} ->
         DocAcc = lists:foldl(
@@ -154,11 +154,11 @@ seqs_done([], _) ->
 seqs_done([{nil, _} | _], _) ->
     ok;
 seqs_done(SeqCounts, Cp) ->
-    Cp ! {seq_changes_done, SeqCounts}.
+    ok = gen_server:cast(Cp, {seq_changes_done, SeqCounts}).
 
 
 maybe_send_stat(0, _StatPos, _Cp) ->
     ok;
 maybe_send_stat(Value, StatPos, Cp) ->
-    Cp ! {add_stat, {StatPos, Value}}.
+    ok = gen_server:cast(Cp, {add_stat, {StatPos, Value}}).
 

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1004040&r1=1004039&r2=1004040&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Sun Oct 
3 20:29:08 2010
@@ -89,16 +89,16 @@ report_non_missing(RevsDict, Cp) ->
     0 ->
         ok;
     N when N > 0 ->
-        Cp ! {seq_changes_done,
-            [{Seq, length(Revs)} ||
-                {_Id, {Revs, Seq}} <- dict:to_list(RevsDict)]}
+        SeqsDone = [{Seq, length(Revs)} ||
+            {_Id, {Revs, Seq}} <- dict:to_list(RevsDict)],
+        ok = gen_server:cast(Cp, {seq_changes_done, SeqsDone})
     end.
 
 
 maybe_add_stat(0, _StatPos, _Cp) ->
     ok;
 maybe_add_stat(Value, StatPos, Cp) ->
-    Cp ! {add_stat, {StatPos, Value}}.
+    ok = gen_server:cast(Cp, {add_stat, {StatPos, Value}}).
 
 
 remove_missing(IdRevsSeqDict, []) ->



Mime
View raw message