couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1091371 - in /couchdb/trunk/src/couchdb: couch_replication_manager.erl couch_replicator.erl
Date Tue, 12 Apr 2011 11:06:53 GMT
Author: fdmanana
Date: Tue Apr 12 11:06:52 2011
New Revision: 1091371

URL: http://svn.apache.org/viewvc?rev=1091371&view=rev
Log:
Replication manager refactoring

Update the state of replication documents outside the replication manager
gen_server. This allows for a faster transition of replication states without
adding substantial complexity, more or less similar to what is done in the 
1.1.x branch.

Modified:
    couchdb/trunk/src/couchdb/couch_replication_manager.erl
    couchdb/trunk/src/couchdb/couch_replicator.erl

Modified: couchdb/trunk/src/couchdb/couch_replication_manager.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replication_manager.erl?rev=1091371&r1=1091370&r2=1091371&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replication_manager.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replication_manager.erl Tue Apr 12 11:06:52 2011
@@ -13,6 +13,10 @@
 -module(couch_replication_manager).
 -behaviour(gen_server).
 
+% public API
+-export([replication_started/1, replication_completed/1, replication_error/2]).
+
+% gen_server callbacks
 -export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
 -export([code_change/3, terminate/2]).
 
@@ -28,8 +32,7 @@
     rep,
     starting,
     retries_left,
-    max_retries,
-    last_error
+    max_retries
 }).
 
 -import(couch_replicator_utils, [
@@ -44,7 +47,6 @@
 -record(state, {
     changes_feed_loop = nil,
     db_notifier = nil,
-    rep_notifier = nil,
     rep_db_name = nil,
     rep_start_pids = [],
     max_retries
@@ -54,6 +56,44 @@
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
+
+replication_started(#rep{id = {BaseId, _} = RepId}) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        update_rep_doc(DocId, [
+            {<<"_replication_state">>, <<"triggered">>},
+            {<<"_replication_id">>, ?l2b(BaseId)}]),
+        ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+        ?LOG_INFO("Document `~s` triggered replication `~s`",
+            [DocId, pp_rep_id(RepId)])
+    end.
+
+
+replication_completed(#rep{id = RepId}) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
+        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+        ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+            [pp_rep_id(RepId), DocId])
+    end.
+
+
+replication_error(#rep{id = RepId}, Error) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        % TODO: maybe add error reason to replication document
+        update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]),
+        ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
+    end.
+
+
 init(_) ->
     process_flag(trap_exit, true),
     ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
@@ -72,7 +112,6 @@ init(_) ->
         changes_feed_loop = Loop,
         rep_db_name = RepDbName,
         db_notifier = db_update_notifier(),
-        rep_notifier = rep_notifier(),
         max_retries = list_to_integer(
             couch_config:get("replicator", "max_replication_retry_count", "10"))
     }}.
@@ -81,41 +120,22 @@ init(_) ->
 handle_call({rep_db_update, Change}, _From, State) ->
     {reply, ok, process_update(State, Change)};
 
-handle_call({rep_started, {BaseId, _Ext} = RepId}, _From, State) ->
+handle_call({rep_started, RepId}, _From, State) ->
     case rep_state(RepId) of
     nil ->
         ok;
-    #rep_state{rep = #rep{doc_id = DocId}} = RepState ->
+    RepState ->
         true = ets:insert(
-            ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}}),
-        update_rep_doc(DocId, [
-            {<<"_replication_state">>, <<"triggered">>},
-            {<<"_replication_id">>, ?l2b(BaseId)}]),
-        ?LOG_INFO("Document `~s` triggered replication `~s`",
-            [DocId, pp_rep_id(RepId)])
+            ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}})
     end,
     {reply, ok, State};
 
 handle_call({rep_complete, RepId}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}])
-    end,
+    true = ets:delete(?REP_TO_STATE, RepId),
     {reply, ok, State};
 
 handle_call({rep_error, RepId, Error}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        {reply, ok, State};
-    #rep_state{rep = #rep{doc_id = DocId}} = RepState ->
-        NewRepState = RepState#rep_state{last_error = Error},
-        true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
-        % TODO: maybe add error reason to replication document
-        update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]),
-        {reply, ok, State}
-    end;
+    {reply, ok, replication_error(State, RepId, Error)};
 
 handle_call(Msg, From, State) ->
     ?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
@@ -151,10 +171,6 @@ handle_info({'EXIT', From, Reason}, #sta
     ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
     {stop, {db_update_notifier_died, Reason}, State};
 
-handle_info({'EXIT', From, Reason}, #state{rep_notifier = From} = State) ->
-    ?LOG_ERROR("Replication notifier died. Reason: ~p", [Reason]),
-    {stop, {rep_notifier_died, Reason}, State};
-
 handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
     % one of the replication start processes terminated successfully
     {noreply, State#state{rep_start_pids = Pids -- [From]}};
@@ -168,8 +184,7 @@ terminate(_Reason, State) ->
     #state{
         rep_start_pids = StartPids,
         changes_feed_loop = Loop,
-        db_notifier = DbNotifier,
-        rep_notifier = RepNotifier
+        db_notifier = DbNotifier
     } = State,
     stop_all_replications(),
     lists:foreach(
@@ -180,7 +195,6 @@ terminate(_Reason, State) ->
         [Loop | StartPids]),
     true = ets:delete(?REP_TO_STATE),
     true = ets:delete(?DOC_TO_REP),
-    couch_replication_notifier:stop(RepNotifier),
     couch_db_update_notifier:stop(DbNotifier).
 
 
@@ -249,31 +263,6 @@ db_update_notifier() ->
     Notifier.
 
 
-rep_notifier() ->
-    Server = self(),
-    {ok, Notifier} = couch_replication_notifier:start_link(
-        fun({finished, RepId, _CheckPointHistory}) ->
-            case rep_state(RepId) of
-            nil ->
-                ok;
-            #rep_state{} ->
-                % TODO: maybe add replication stats to the doc
-                ok = gen_server:call(Server, {rep_complete, RepId}, infinity)
-            end;
-        ({error, RepId, Error}) ->
-            case rep_state(RepId) of
-            nil ->
-                ok;
-            #rep_state{} ->
-                ok = gen_server:call(
-                    Server, {rep_error, RepId, Error}, infinity)
-            end;
-        (_) ->
-            ok
-        end),
-    Notifier.
-
-
 restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
     stop_all_replications(),
     lists:foreach(
@@ -306,8 +295,8 @@ process_update(State, {Change}) ->
         <<"completed">> ->
             replication_complete(DocId),
             State;
-        <<"error">> ->
-            replication_error(State, DocId)
+        _ ->
+            State
         end
     end.
 
@@ -379,11 +368,13 @@ start_replication(Server, #rep{id = RepI
 replication_complete(DocId) ->
     case ets:lookup(?DOC_TO_REP, DocId) of
     [{DocId, RepId}] ->
-        couch_replicator:cancel_replication(RepId),
-        true = ets:delete(?REP_TO_STATE, RepId),
-        true = ets:delete(?DOC_TO_REP, DocId),
-        ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
-            [pp_rep_id(RepId), DocId]);
+        case rep_state(RepId) of
+        nil ->
+            couch_replicator:cancel_replication(RepId);
+        #rep_state{} ->
+            ok
+        end,
+        true = ets:delete(?DOC_TO_REP, DocId);
     _ ->
         ok
     end.
@@ -402,19 +393,18 @@ rep_doc_deleted(DocId) ->
     end.
 
 
-replication_error(State, DocId) ->
-    case ets:lookup(?DOC_TO_REP, DocId) of
-    [{DocId, RepId}] ->
-        maybe_retry_replication(rep_state(RepId), State);
-    _ ->
-        State
+replication_error(State, RepId, Error) ->
+    case rep_state(RepId) of
+    nil ->
+        State;
+    RepState ->
+        maybe_retry_replication(RepState, Error, State)
     end.
 
-maybe_retry_replication(#rep_state{retries_left = 0} = RepState, State) ->
+maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
     #rep_state{
         rep = #rep{id = RepId, doc_id = DocId},
-        max_retries = MaxRetries,
-        last_error = Error
+        max_retries = MaxRetries
     } = RepState,
     couch_replicator:cancel_replication(RepId),
     true = ets:delete(?REP_TO_STATE, RepId),
@@ -424,11 +414,10 @@ maybe_retry_replication(#rep_state{retri
         [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
     State;
 
-maybe_retry_replication(RepState, State) ->
+maybe_retry_replication(RepState, Error, State) ->
     #rep_state{
         rep = #rep{id = RepId, doc_id = DocId} = Rep,
-        retries_left = RetriesLeft,
-        last_error = Error
+        retries_left = RetriesLeft
     } = RepState,
     NewRepState = RepState#rep_state{retries_left = RetriesLeft - 1},
     true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),

Modified: couchdb/trunk/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1091371&r1=1091370&r2=1091371&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Tue Apr 12 11:06:52 2011
@@ -278,6 +278,8 @@ do_init(#rep{options = Options, id = {Ba
     ?LOG_DEBUG("Missing rev finder pids are: ~p", [MissingRevFinders]),
     ?LOG_DEBUG("Worker pids are: ~p", [Workers]),
 
+    couch_replication_manager:replication_started(Rep),
+
     {ok, State#rep_state{
             missing_revs_queue = MissingRevsQueue,
             changes_queue = ChangesQueue,
@@ -443,10 +445,11 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-terminate(normal, #rep_state{rep_details = #rep{id = RepId},
+terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
     checkpoint_history = CheckpointHistory} = State) ->
     terminate_cleanup(State),
-    couch_replication_notifier:notify({finished, RepId, CheckpointHistory});
+    couch_replication_notifier:notify({finished, RepId, CheckpointHistory}),
+    couch_replication_manager:replication_completed(Rep);
 
 terminate(shutdown, State) ->
     % cancelled replication throught ?MODULE:cancel_replication/1
@@ -456,12 +459,13 @@ terminate(Reason, State) ->
     #rep_state{
         source_name = Source,
         target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId}
+        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
     } = State,
     ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
         [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
     terminate_cleanup(State),
-    couch_replication_notifier:notify({error, RepId, Reason}).
+    couch_replication_notifier:notify({error, RepId, Reason}),
+    couch_replication_manager:replication_error(Rep, Reason).
 
 
 terminate_cleanup(State) ->



Mime
View raw message