couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1085023 - /couchdb/trunk/src/couchdb/couch_replication_manager.erl
Date Thu, 24 Mar 2011 16:18:55 GMT
Author: fdmanana
Date: Thu Mar 24 16:18:55 2011
New Revision: 1085023

URL: http://svn.apache.org/viewvc?rev=1085023&view=rev
Log:
Replication manager: fix occasional race conditions

Fix rare race conditions when making the transition of replication states.
These are likely to happen for very short lived replications like the ones in the
test suite, or when deleting or changing the replicator database during runtime.

These race conditions were added in the refactoring done in revision 1079483 (which
adapted the replication manager to the new replicator code).


Modified:
    couchdb/trunk/src/couchdb/couch_replication_manager.erl

Modified: couchdb/trunk/src/couchdb/couch_replication_manager.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replication_manager.erl?rev=1085023&r1=1085022&r2=1085023&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replication_manager.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replication_manager.erl Thu Mar 24 16:18:55 2011
@@ -28,7 +28,8 @@
     rep,
     starting,
     retries_left,
-    max_retries
+    max_retries,
+    last_error
 }).
 
 -import(couch_replicator_utils, [
@@ -81,69 +82,40 @@ handle_call({rep_db_update, Change}, _Fr
     {reply, ok, process_update(State, Change)};
 
 handle_call({rep_started, {BaseId, _Ext} = RepId}, _From, State) ->
-    #rep_state{rep = #rep{doc_id = DocId}} = RepState = rep_state(RepId),
-    true = ets:insert(
-        ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}}),
-    update_rep_doc(DocId, [
-        {<<"_replication_state">>, <<"triggered">>},
-        {<<"_replication_id">>, ?l2b(BaseId)}]),
-    {reply, ok, State};
-
-handle_call({rep_start_failure, Rep, Error}, _From, State) ->
-    #rep{id = {BaseId, _} = RepId, doc_id = DocId} = Rep,
-    #rep_state{max_retries = MaxRetries} = rep_state(RepId),
-    ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
-        "the document `~s`. Last error reason was: ~p",
-        [pp_rep_id(RepId), MaxRetries, DocId, Error]),
-    update_rep_doc(
-        DocId,
-        [{<<"_replication_state">>, <<"error">>},
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} = RepState ->
+        true = ets:insert(
+            ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}}),
+        update_rep_doc(DocId, [
+            {<<"_replication_state">>, <<"triggered">>},
             {<<"_replication_id">>, ?l2b(BaseId)}]),
-    true = ets:delete(?REP_TO_STATE, RepId),
-    true = ets:delete(?DOC_TO_REP, DocId),
+        ?LOG_INFO("Document `~s` triggered replication `~s`",
+            [DocId, pp_rep_id(RepId)])
+    end,
     {reply, ok, State};
 
 handle_call({rep_complete, RepId}, _From, State) ->
-    #rep_state{rep = #rep{doc_id = DocId}} = rep_state(RepId),
-    couch_replicator:cancel_replication(RepId),
-    true = ets:delete(?REP_TO_STATE, RepId),
-    true = ets:delete(?DOC_TO_REP, DocId),
-    ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
-        [pp_rep_id(RepId), DocId]),
-    update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}])
+    end,
     {reply, ok, State};
 
 handle_call({rep_error, RepId, Error}, _From, State) ->
-    #rep_state{
-        rep = #rep{doc_id = DocId} = Rep,
-        retries_left = RetriesLeft,
-        max_retries = MaxRetries
-    } = RepState = rep_state(RepId),
-    NewState = case RetriesLeft > 0 of
-    false ->
-        couch_replicator:cancel_replication(RepId),
-        true = ets:delete(?REP_TO_STATE, RepId),
-        true = ets:delete(?DOC_TO_REP, DocId),
-        ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
-            "~nReached maximum retry attempts (~p).",
-            [pp_rep_id(RepId), DocId,
-                to_binary(error_reason(Error)), MaxRetries]),
-        State;
-    true ->
-        NewRepState = RepState#rep_state{retries_left = RetriesLeft - 1},
+    case rep_state(RepId) of
+    nil ->
+        {reply, ok, State};
+    #rep_state{rep = #rep{doc_id = DocId}} = RepState ->
+        NewRepState = RepState#rep_state{last_error = Error},
         true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
-        Wait = wait_period(NewRepState),
-        ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
-            "~nRestarting replication in ~p seconds.",
-            [pp_rep_id(RepId), DocId,
-                to_binary(error_reason(Error)), Wait]),
-        Server = self(),
-        Pid = spawn_link(fun() -> start_replication(Server, Rep, Wait) end),
-        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}
-    end,
-    % TODO: maybe add error reason to replication document
-     update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]),
-    {reply, ok, NewState};
+        % TODO: maybe add error reason to replication document
+        update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]),
+        {reply, ok, State}
+    end;
 
 handle_call(Msg, From, State) ->
     ?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
@@ -196,7 +168,8 @@ terminate(_Reason, State) ->
     #state{
         rep_start_pids = StartPids,
         changes_feed_loop = Loop,
-        db_notifier = Notifier
+        db_notifier = DbNotifier,
+        rep_notifier = RepNotifier
     } = State,
     stop_all_replications(),
     lists:foreach(
@@ -207,7 +180,8 @@ terminate(_Reason, State) ->
         [Loop | StartPids]),
     true = ets:delete(?REP_TO_STATE),
     true = ets:delete(?DOC_TO_REP),
-    couch_db_update_notifier:stop(Notifier).
+    couch_replication_notifier:stop(RepNotifier),
+    couch_db_update_notifier:stop(DbNotifier).
 
 
 code_change(_OldVsn, State, _Extra) ->
@@ -329,8 +303,11 @@ process_update(State, {Change}) ->
             maybe_start_replication(State, DocId, JsonRepDoc);
         <<"triggered">> ->
             maybe_start_replication(State, DocId, JsonRepDoc);
-        _ ->
-           State
+        <<"completed">> ->
+            replication_complete(DocId),
+            State;
+        <<"error">> ->
+            replication_error(State, DocId)
         end
     end.
 
@@ -389,18 +366,29 @@ maybe_tag_rep_doc(DocId, {RepProps}, Rep
     end.
 
 
-start_replication(Server, #rep{id = RepId, doc_id = DocId} = Rep, Wait) ->
+start_replication(Server, #rep{id = RepId} = Rep, Wait) ->
     ok = timer:sleep(Wait * 1000),
     case (catch couch_replicator:async_replicate(Rep)) of
     {ok, _} ->
-        ok = gen_server:call(Server, {rep_started, RepId}, infinity),
-        ?LOG_INFO("Document `~s` triggered replication `~s`",
-            [DocId, pp_rep_id(RepId)]);
+        ok = gen_server:call(Server, {rep_started, RepId}, infinity);
     Error ->
         ok = gen_server:call(Server, {rep_error, RepId, Error}, infinity)
     end.
 
 
+replication_complete(DocId) ->
+    case ets:lookup(?DOC_TO_REP, DocId) of
+    [{DocId, RepId}] ->
+        couch_replicator:cancel_replication(RepId),
+        true = ets:delete(?REP_TO_STATE, RepId),
+        true = ets:delete(?DOC_TO_REP, DocId),
+        ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+            [pp_rep_id(RepId), DocId]);
+    _ ->
+        ok
+    end.
+
+
 rep_doc_deleted(DocId) ->
     case ets:lookup(?DOC_TO_REP, DocId) of
     [{DocId, RepId}] ->
@@ -414,6 +402,45 @@ rep_doc_deleted(DocId) ->
     end.
 
 
+replication_error(State, DocId) ->
+    case ets:lookup(?DOC_TO_REP, DocId) of
+    [{DocId, RepId}] ->
+        maybe_retry_replication(rep_state(RepId), State);
+    _ ->
+        State
+    end.
+
+maybe_retry_replication(#rep_state{retries_left = 0} = RepState, State) ->
+    #rep_state{
+        rep = #rep{id = RepId, doc_id = DocId},
+        max_retries = MaxRetries,
+        last_error = Error
+    } = RepState,
+    couch_replicator:cancel_replication(RepId),
+    true = ets:delete(?REP_TO_STATE, RepId),
+    true = ets:delete(?DOC_TO_REP, DocId),
+    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+        "~nReached maximum retry attempts (~p).",
+        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
+    State;
+
+maybe_retry_replication(RepState, State) ->
+    #rep_state{
+        rep = #rep{id = RepId, doc_id = DocId} = Rep,
+        retries_left = RetriesLeft,
+        last_error = Error
+    } = RepState,
+    NewRepState = RepState#rep_state{retries_left = RetriesLeft - 1},
+    true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
+    Wait = wait_period(NewRepState),
+    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+        "~nRestarting replication in ~p seconds.",
+        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
+    Server = self(),
+    Pid = spawn_link(fun() -> start_replication(Server, Rep, Wait) end),
+    State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
+
+
 stop_all_replications() ->
     ?LOG_INFO("Stopping all ongoing replications because the replicator"
         " database was deleted or changed", []),
@@ -429,8 +456,12 @@ stop_all_replications() ->
 update_rep_doc(RepDocId, KVs) ->
     {ok, RepDb} = ensure_rep_db_exists(),
     try
-        {ok, LatestRepDoc} = couch_db:open_doc(RepDb, RepDocId, []),
-        update_rep_doc(RepDb, LatestRepDoc, KVs)
+        case couch_db:open_doc(RepDb, RepDocId, []) of
+        {ok, LatestRepDoc} ->
+            update_rep_doc(RepDb, LatestRepDoc, KVs);
+        _ ->
+            ok
+        end
     catch throw:conflict ->
         % Shouldn't happen, as by default only the role _replicator can
         % update replication documents.



Mime
View raw message