couchdb-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Benoit Chesneau <bchesn...@gmail.com>
Subject Re: svn commit: r1062783 - in /couchdb/trunk: share/www/script/test/replicator_db.js src/couchdb/couch_rep_db_listener.erl
Date Tue, 25 Jan 2011 17:23:13 GMT
On Mon, Jan 24, 2011 at 3:09 PM,  <fdmanana@apache.org> wrote:
> Author: fdmanana
> Date: Mon Jan 24 14:09:06 2011
> New Revision: 1062783
>
> URL: http://svn.apache.org/viewvc?rev=1062783&view=rev
> Log:
> Replicator DB: on restart, make several attempts to restart the replications
>
> Now on restart, the replicator database listener will make up to 10 attempts
> to restart each replication. Before each attempt, it waits, using an exponential
> backoff strategy, before doing the next attempt.
> This is very useful because when one server restarts, other servers that are
> endpoints of its replications, may not be online yet.
>
>
> Modified:
>    couchdb/trunk/share/www/script/test/replicator_db.js
>    couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
>
> Modified: couchdb/trunk/share/www/script/test/replicator_db.js
> URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/test/replicator_db.js?rev=1062783&r1=1062782&r2=1062783&view=diff
> ==============================================================================
> --- couchdb/trunk/share/www/script/test/replicator_db.js (original)
> +++ couchdb/trunk/share/www/script/test/replicator_db.js Mon Jan 24 14:09:06 2011
> @@ -805,9 +805,16 @@ couchTests.replicator_db = function(debu
>   restartServer();
>   continuous_replication_survives_restart();
>
> -  repDb.deleteDb();
> -  restartServer();
> -  run_on_modified_server(server_config, error_state_replication);
> +/*
> + * Disabled, since error state would be set on the document only after
> + * the exponential backoff retry done by the replicator database listener
> + * terminates, which takes too much time for a unit test.
> + */
> +/*
> + * repDb.deleteDb();
> + * restartServer();
> + * run_on_modified_server(server_config, error_state_replication);
> + */
>
>
>   // cleanup
>
> Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
> URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=1062783&r1=1062782&r2=1062783&view=diff
> ==============================================================================
> --- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original)
> +++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Mon Jan 24 14:09:06 2011
> @@ -20,6 +20,8 @@
>
>  -define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
>  -define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
> +-define(MAX_RETRIES, 10).
> +-define(INITIAL_WAIT, 5).
>
>  -record(state, {
>     changes_feed_loop = nil,
> @@ -58,6 +60,29 @@ init(_) ->
>  handle_call({rep_db_update, Change}, _From, State) ->
>     {reply, ok, process_update(State, Change)};
>
> +handle_call({triggered, {BaseId, _}}, _From, State) ->
> +    case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
> +    [{BaseId, {DocId, true}}] ->
> +        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}});
> +    _ ->
> +        ok
> +    end,
> +    {reply, ok, State};
> +
> +handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
> +    DocId = get_value(<<"_id">>, Props),
> +    [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
> +    ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
> +        "the document `~s`. Last error reason was: ~p",
> +        [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]),
> +    couch_rep:update_rep_doc(
> +        RepDoc,
> +        [{<<"_replication_state">>, <<"error">>},
> +            {<<"_replication_id">>, ?l2b(BaseId)}]),
> +    true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
> +    true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
> +    {reply, ok, State};
> +
>  handle_call(Msg, From, State) ->
>     ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",
>         [Msg, From]),
> @@ -239,18 +264,24 @@ maybe_start_replication(State, DocId, Js
>     {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
>     case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
>     [] ->
> -        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}),
> +        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
>         true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
> +        Server = self(),
>         Pid = spawn_link(fun() ->
> -            start_replication(JsonRepDoc, RepId, UserCtx)
> +            start_replication(Server, JsonRepDoc, RepId, UserCtx)
>         end),
>         State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
> -    [{BaseId, DocId}] ->
> +    [{BaseId, {DocId, _}}] ->
>         State;
> -    [{BaseId, OtherDocId}] ->
> +    [{BaseId, {OtherDocId, false}}] ->
>         ?LOG_INFO("The replication specified by the document `~s` was already"
>             " triggered by the document `~s`", [DocId, OtherDocId]),
>         maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
> +        State;
> +    [{BaseId, {OtherDocId, true}}] ->
> +        ?LOG_INFO("The replication specified by the document `~s` is already"
> +            " being triggered by the document `~s`", [DocId, OtherDocId]),
> +        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
>         State
>     end.
>
> @@ -264,21 +295,37 @@ maybe_tag_rep_doc({Props} = JsonRepDoc,
>     end.
>
>
> -start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->
> +start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) ->
>     case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
>     Pid when is_pid(Pid) ->
>         ?LOG_INFO("Document `~s` triggered replication `~s`",
>             [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
> +        ok = gen_server:call(Server, {triggered, RepId}, infinity),
>         couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
>     Error ->
> -        couch_rep:update_rep_doc(
> -            RepDoc,
> -            [
> -                {<<"_replication_state">>, <<"error">>},
> -                {<<"_replication_id">>, ?l2b(Base)}
> -            ]
> -        ),
> -        ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error])
> +        keep_retrying(
> +            Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES)
> +    end.
> +
> +
> +keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) ->
> +    ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity);
> +
> +keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
> +    ?LOG_ERROR("Error starting replication `~s`: ~p. "
> +        "Retrying in ~p seconds", [pp_rep_id(RepId), Error, Wait]),
> +    ok = timer:sleep(Wait * 1000),
> +    case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
> +    Pid when is_pid(Pid) ->
> +        ok = gen_server:call(Server, {triggered, RepId}, infinity),
> +        {RepProps} = RepDoc,
> +        ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
> +            [get_value(<<"_id">>, RepProps), pp_rep_id(RepId),
> +                ?MAX_RETRIES - RetriesLeft + 1]),
> +        couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
> +    NewError ->
> +        keep_retrying(
> +            Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft -
1)
>     end.
>
>
>
>
>

shouldn't MAX_RETRY be a config setting ?

Mime
View raw message