couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1062783 - in /couchdb/trunk: share/www/script/test/replicator_db.js src/couchdb/couch_rep_db_listener.erl
Date Mon, 24 Jan 2011 14:09:07 GMT
Author: fdmanana
Date: Mon Jan 24 14:09:06 2011
New Revision: 1062783

URL: http://svn.apache.org/viewvc?rev=1062783&view=rev
Log:
Replicator DB: on restart, make several attempts to restart the replications

Now on restart, the replicator database listener will make up to 10 attempts
to restart each replication. Before each attempt, it waits, using an exponential
backoff strategy, before doing the next attempt.
This is very useful because when one server restarts, other servers that are
endpoints of its replications, may not be online yet.


Modified:
    couchdb/trunk/share/www/script/test/replicator_db.js
    couchdb/trunk/src/couchdb/couch_rep_db_listener.erl

Modified: couchdb/trunk/share/www/script/test/replicator_db.js
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/test/replicator_db.js?rev=1062783&r1=1062782&r2=1062783&view=diff
==============================================================================
--- couchdb/trunk/share/www/script/test/replicator_db.js (original)
+++ couchdb/trunk/share/www/script/test/replicator_db.js Mon Jan 24 14:09:06 2011
@@ -805,9 +805,16 @@ couchTests.replicator_db = function(debu
   restartServer();
   continuous_replication_survives_restart();
 
-  repDb.deleteDb();
-  restartServer();
-  run_on_modified_server(server_config, error_state_replication);
+/*
+ * Disabled, since error state would be set on the document only after
+ * the exponential backoff retry done by the replicator database listener
+ * terminates, which takes too much time for a unit test.
+ */
+/*
+ * repDb.deleteDb();
+ * restartServer();
+ * run_on_modified_server(server_config, error_state_replication);
+ */
 
 
   // cleanup

Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=1062783&r1=1062782&r2=1062783&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Mon Jan 24 14:09:06 2011
@@ -20,6 +20,8 @@
 
 -define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
 -define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
+-define(MAX_RETRIES, 10).
+-define(INITIAL_WAIT, 5).
 
 -record(state, {
     changes_feed_loop = nil,
@@ -58,6 +60,29 @@ init(_) ->
 handle_call({rep_db_update, Change}, _From, State) ->
     {reply, ok, process_update(State, Change)};
 
+handle_call({triggered, {BaseId, _}}, _From, State) ->
+    case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
+    [{BaseId, {DocId, true}}] ->
+        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}});
+    _ ->
+        ok
+    end,
+    {reply, ok, State};
+
+handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
+    DocId = get_value(<<"_id">>, Props),
+    [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
+    ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
+        "the document `~s`. Last error reason was: ~p",
+        [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]),
+    couch_rep:update_rep_doc(
+        RepDoc,
+        [{<<"_replication_state">>, <<"error">>},
+            {<<"_replication_id">>, ?l2b(BaseId)}]),
+    true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
+    true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
+    {reply, ok, State};
+
 handle_call(Msg, From, State) ->
     ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",
         [Msg, From]),
@@ -239,18 +264,24 @@ maybe_start_replication(State, DocId, Js
     {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
     case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
     [] ->
-        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}),
+        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
         true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
+        Server = self(),
         Pid = spawn_link(fun() ->
-            start_replication(JsonRepDoc, RepId, UserCtx)
+            start_replication(Server, JsonRepDoc, RepId, UserCtx)
         end),
         State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
-    [{BaseId, DocId}] ->
+    [{BaseId, {DocId, _}}] ->
         State;
-    [{BaseId, OtherDocId}] ->
+    [{BaseId, {OtherDocId, false}}] ->
         ?LOG_INFO("The replication specified by the document `~s` was already"
             " triggered by the document `~s`", [DocId, OtherDocId]),
         maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+        State;
+    [{BaseId, {OtherDocId, true}}] ->
+        ?LOG_INFO("The replication specified by the document `~s` is already"
+            " being triggered by the document `~s`", [DocId, OtherDocId]),
+        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
         State
     end.
 
@@ -264,21 +295,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, 
     end.
 
 
-start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->
+start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) ->
     case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
     Pid when is_pid(Pid) ->
         ?LOG_INFO("Document `~s` triggered replication `~s`",
             [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
+        ok = gen_server:call(Server, {triggered, RepId}, infinity),
         couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
     Error ->
-        couch_rep:update_rep_doc(
-            RepDoc,
-            [
-                {<<"_replication_state">>, <<"error">>},
-                {<<"_replication_id">>, ?l2b(Base)}
-            ]
-        ),
-        ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error])
+        keep_retrying(
+            Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES)
+    end.
+
+
+keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) ->
+    ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity);
+
+keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
+    ?LOG_ERROR("Error starting replication `~s`: ~p. "
+        "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]),
+    ok = timer:sleep(Wait * 1000),
+    case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
+    Pid when is_pid(Pid) ->
+        ok = gen_server:call(Server, {triggered, RepId}, infinity),
+        {RepProps} = RepDoc,
+        ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
+            [get_value(<<"_id">>, RepProps), pp_rep_id(RepId),
+                ?MAX_RETRIES - RetriesLeft + 1]),
+        couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
+    NewError ->
+        keep_retrying(
+            Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1)
     end.
 
 



Mime
View raw message