couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1142258 - in /couchdb/branches/1.1.x: etc/couchdb/default.ini.tpl.in src/couchdb/couch_rep.erl src/couchdb/couch_replication_manager.erl
Date Sat, 02 Jul 2011 18:44:00 GMT
Author: fdmanana
Date: Sat Jul  2 18:44:00 2011
New Revision: 1142258

URL: http://svn.apache.org/viewvc?rev=1142258&view=rev
Log:
Restart replications on error

If a replication transitions to the "error" state, attempt
to restart it up to "max_replication_retry_count" times
(.ini configuration parameter). This number of retry attempts
can now be set to "infinity" as well.

This was already current behaviour in trunk (upcoming 1.2).

Closes COUCHDB-1194.


Modified:
    couchdb/branches/1.1.x/etc/couchdb/default.ini.tpl.in
    couchdb/branches/1.1.x/src/couchdb/couch_rep.erl
    couchdb/branches/1.1.x/src/couchdb/couch_replication_manager.erl

Modified: couchdb/branches/1.1.x/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/branches/1.1.x/etc/couchdb/default.ini.tpl.in?rev=1142258&r1=1142257&r2=1142258&view=diff
==============================================================================
--- couchdb/branches/1.1.x/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/branches/1.1.x/etc/couchdb/default.ini.tpl.in Sat Jul  2 18:44:00 2011
@@ -137,6 +137,7 @@ compressible_types = text/*, application
 
 [replicator]
 db = _replicator
+; Maximum replicaton retry count can be a non-negative integer or "infinity".
 max_replication_retry_count = 10
 max_http_sessions = 20
 max_http_pipeline_size = 50

Modified: couchdb/branches/1.1.x/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.1.x/src/couchdb/couch_rep.erl?rev=1142258&r1=1142257&r2=1142258&view=diff
==============================================================================
--- couchdb/branches/1.1.x/src/couchdb/couch_rep.erl (original)
+++ couchdb/branches/1.1.x/src/couchdb/couch_rep.erl Sat Jul  2 18:44:00 2011
@@ -16,12 +16,10 @@
     code_change/3]).
 
 -export([replicate/2, checkpoint/1]).
--export([ensure_rep_db_exists/0, make_replication_id/2]).
+-export([make_replication_id/2]).
 -export([start_replication/3, end_replication/1, get_result/4]).
--export([update_rep_doc/2]).
 
 -include("couch_db.hrl").
--include("couch_js_functions.hrl").
 -include("../ibrowse/ibrowse.hrl").
 
 -define(REP_ID_VERSION, 2).
@@ -54,7 +52,6 @@
     committed_seq = 0,
 
     stats = nil,
-    rep_doc = nil,
     source_db_update_notifier = nil,
     target_db_update_notifier = nil
 }).
@@ -94,11 +91,11 @@ end_replication({BaseId, Extension}) ->
         end
     end.
 
-start_replication(RepDoc, {BaseId, Extension}, UserCtx) ->
+start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx) ->
     Replicator = {
         BaseId ++ Extension,
         {gen_server, start_link,
-            [?MODULE, [BaseId, RepDoc, UserCtx], []]},
+            [?MODULE, [RepId, RepDoc, UserCtx], []]},
         temporary,
         1,
         worker,
@@ -135,7 +132,7 @@ init(InitArgs) ->
         {stop, Error}
     end.
 
-do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
+do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) ->
     process_flag(trap_exit, true),
 
     SourceProps = couch_util:get_value(<<"source">>, PostProps),
@@ -152,10 +149,8 @@ do_init([RepId, {PostProps} = RepDoc, Us
     SourceInfo = dbinfo(Source),
     TargetInfo = dbinfo(Target),
 
-    maybe_set_triggered(RepDoc, RepId),
-
     [SourceLog, TargetLog] = find_replication_logs(
-        [Source, Target], RepId, {PostProps}, UserCtx),
+        [Source, Target], BaseId, {PostProps}, UserCtx),
     {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
 
     {ok, ChangesFeed} =
@@ -174,10 +169,12 @@ do_init([RepId, {PostProps} = RepDoc, Us
     ets:insert(Stats, {docs_written, 0}),
     ets:insert(Stats, {doc_write_failures, 0}),
 
-    {ShortId, _} = lists:split(6, RepId),
+    {ShortId, _} = lists:split(6, BaseId),
     couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
         [ShortId, dbname(Source), dbname(Target)]), "Starting"),
 
+    couch_replication_manager:replication_started(RepId),
+
     State = #state{
         changes_feed = ChangesFeed,
         missing_revs = MissingRevs,
@@ -200,7 +197,6 @@ do_init([RepId, {PostProps} = RepDoc, Us
         rep_starttime = httpd_util:rfc1123_date(),
         src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
         tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
-        rep_doc = RepDoc,
         source_db_update_notifier = source_db_update_notifier(Source),
         target_db_update_notifier = target_db_update_notifier(Target)
     },
@@ -272,27 +268,24 @@ handle_info({'EXIT', _Pid, {Err, Reason}
 handle_info({'EXIT', _Pid, Reason}, State) ->
     {stop, Reason, State}.
 
-terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
+terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId | _]} = State) ->
     do_terminate(State),
-    update_rep_doc(
-        State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]);
+    couch_replication_manager:replication_completed(RepId);
     
-terminate(normal, State) ->
+terminate(normal, #state{init_args=[RepId | _]} = State) ->
     timer:cancel(State#state.checkpoint_scheduled),
     do_terminate(do_checkpoint(State)),
-    update_rep_doc(
-        State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]);
+    couch_replication_manager:replication_completed(RepId);
 
 terminate(shutdown, #state{listeners = Listeners} = State) ->
     % continuous replication stopped
     [gen_server:reply(L, {ok, stopped}) || L <- Listeners],
     terminate_cleanup(State);
 
-terminate(Reason, #state{listeners = Listeners} = State) ->
+terminate(Reason, #state{listeners = Listeners, init_args=[RepId | _]} = State) ->
     [gen_server:reply(L, {error, Reason}) || L <- Listeners],
     terminate_cleanup(State),
-    update_rep_doc(
-        State#state.rep_doc, [{<<"_replication_state">>, <<"error">>}]).
+    couch_replication_manager:replication_error(RepId, Reason).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -698,7 +691,7 @@ do_checkpoint(State) ->
         src_starttime = SrcInstanceStartTime,
         tgt_starttime = TgtInstanceStartTime,
         stats = Stats,
-        rep_doc = {RepDoc}
+        init_args = [_RepId, {RepDoc} | _]
     } = State,
     case commit_to_both(Source, Target, NewSeqNum) of
     {SrcInstanceStartTime, TgtInstanceStartTime} ->
@@ -901,113 +894,6 @@ parse_proxy_params(ProxyUrl) ->
             [{proxy_user, User}, {proxy_password, Passwd}]
         end.
 
-update_rep_doc({Props} = _RepDoc, KVs) ->
-    case couch_util:get_value(<<"_id">>, Props) of
-    undefined ->
-        % replication triggered by POSTing to _replicate/
-        ok;
-    RepDocId ->
-        % replication triggered by adding a Rep Doc to the replicator DB
-        {ok, RepDb} = ensure_rep_db_exists(),
-        case couch_db:open_doc(RepDb, RepDocId, []) of
-        {ok, LatestRepDoc} ->
-            update_rep_doc(RepDb, LatestRepDoc, KVs);
-        _ ->
-            ok
-        end,
-        couch_db:close(RepDb)
-    end.
-
-update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
-    NewRepDocBody = lists:foldl(
-        fun({<<"_replication_state">> = K, State} = KV, Body) ->
-                case couch_util:get_value(K, Body) of
-                State ->
-                    Body;
-                _ ->
-                    Body1 = lists:keystore(K, 1, Body, KV),
-                    lists:keystore(
-                        <<"_replication_state_time">>, 1,
-                        Body1, {<<"_replication_state_time">>, timestamp()})
-                end;
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody,
-        KVs
-    ),
-    case NewRepDocBody of
-    RepDocBody ->
-        ok;
-    _ ->
-       % might not succeed - when the replication doc is deleted right
-       % before this update (not an error)
-        couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
-    end.
-
-% RFC3339 timestamps.
-% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
-timestamp() ->
-    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
-    UTime = erlang:universaltime(),
-    LocalTime = calendar:universal_time_to_local_time(UTime),
-    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
-        calendar:datetime_to_gregorian_seconds(UTime),
-    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
-    iolist_to_binary(
-        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
-            [Year, Month, Day, Hour, Min, Sec,
-                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
-
-zone(Hr, Min) when Hr >= 0, Min >= 0 ->
-    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
-zone(Hr, Min) ->
-    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
-
-
-maybe_set_triggered({RepProps} = RepDoc, RepId) ->
-    case couch_util:get_value(<<"_replication_state">>, RepProps) of
-    <<"triggered">> ->
-        ok;
-    _ ->
-        update_rep_doc(
-            RepDoc,
-            [
-                {<<"_replication_state">>, <<"triggered">>},
-                {<<"_replication_id">>, ?l2b(RepId)}
-            ]
-        )
-    end.
-
-ensure_rep_db_exists() ->
-    DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
-    Opts = [
-        {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
-        sys_db
-    ],
-    case couch_db:open(DbName, Opts) of
-    {ok, Db} ->
-        Db;
-    _Error ->
-        {ok, Db} = couch_db:create(DbName, Opts)
-    end,
-    ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
-    {ok, Db}.
-
-ensure_rep_ddoc_exists(RepDb, DDocID) ->
-    case couch_db:open_doc(RepDb, DDocID, []) of
-    {ok, _Doc} ->
-        ok;
-    _ ->
-        DDoc = couch_doc:from_json_obj({[
-            {<<"_id">>, DDocID},
-            {<<"language">>, <<"javascript">>},
-            {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-        ]}),
-        {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
-    end,
-    ok.
-
 source_db_update_notifier(#db{name = DbName}) ->
     Server = self(),
     {ok, Notifier} = couch_db_update_notifier:start_link(

Modified: couchdb/branches/1.1.x/src/couchdb/couch_replication_manager.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.1.x/src/couchdb/couch_replication_manager.erl?rev=1142258&r1=1142257&r2=1142258&view=diff
==============================================================================
--- couchdb/branches/1.1.x/src/couchdb/couch_replication_manager.erl (original)
+++ couchdb/branches/1.1.x/src/couchdb/couch_replication_manager.erl Sat Jul  2 18:44:00 2011
@@ -13,14 +13,20 @@
 -module(couch_replication_manager).
 -behaviour(gen_server).
 
+% public API
+-export([replication_started/1, replication_completed/1, replication_error/2]).
+
+% gen_server callbacks
 -export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
 -export([code_change/3, terminate/2]).
 
 -include("couch_db.hrl").
+-include("couch_js_functions.hrl").
 
--define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
--define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
--define(INITIAL_WAIT, 5).
+-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
+-define(REP_TO_STATE, couch_rep_id_to_rep_state).
+-define(INITIAL_WAIT, 2.5). % seconds
+-define(MAX_WAIT, 600).     % seconds
 
 -record(state, {
     changes_feed_loop = nil,
@@ -30,6 +36,16 @@
     max_retries
 }).
 
+-record(rep_state, {
+    doc_id,
+    user_ctx,
+    doc,
+    starting,
+    retries_left,
+    max_retries,
+    wait = ?INITIAL_WAIT
+}).
+
 -import(couch_util, [
     get_value/2,
     get_value/3,
@@ -40,17 +56,56 @@
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
+
+replication_started({BaseId, _} = RepId) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{doc_id = DocId} ->
+        update_rep_doc(DocId, [
+            {<<"_replication_state">>, <<"triggered">>},
+            {<<"_replication_id">>, ?l2b(BaseId)}]),
+        ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+        ?LOG_INFO("Document `~s` triggered replication `~s`",
+            [DocId, pp_rep_id(RepId)])
+    end.
+
+
+replication_completed(RepId) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{doc_id = DocId} = St ->
+        update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
+        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+        ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+            [pp_rep_id(RepId), DocId])
+    end.
+
+
+replication_error({BaseId, _} = RepId, Error) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{doc_id = DocId} ->
+        % TODO: maybe add error reason to replication document
+        update_rep_doc(DocId, [
+            {<<"_replication_state">>, <<"error">>},
+            {<<"_replication_id">>, ?l2b(BaseId)}]),
+        ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
+    end.
+
+
 init(_) ->
     process_flag(trap_exit, true),
-    _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]),
-    _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
+    ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
+    ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
     Server = self(),
     ok = couch_config:register(
         fun("replicator", "db", NewName) ->
             ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
-        ("replicator", "max_replication_retry_count", NewMaxRetries1) ->
-            NewMaxRetries = list_to_integer(NewMaxRetries1),
-            ok = gen_server:cast(Server, {set_max_retries, NewMaxRetries})
+        ("replicator", "max_replication_retry_count", V) ->
+            ok = gen_server:cast(Server, {set_max_retries, retries_value(V)})
         end
     ),
     {Loop, RepDbName} = changes_feed_loop(),
@@ -58,7 +113,7 @@ init(_) ->
         changes_feed_loop = Loop,
         rep_db_name = RepDbName,
         db_notifier = db_update_notifier(),
-        max_retries = list_to_integer(
+        max_retries = retries_value(
             couch_config:get("replicator", "max_replication_retry_count", "10"))
     }}.
 
@@ -68,32 +123,35 @@ handle_call({rep_db_update, {ChangeProps
         process_update(State, Change)
     catch
     _Tag:Error ->
-        JsonRepDoc = get_value(doc, ChangeProps),
-        rep_db_update_error(Error, JsonRepDoc),
+        {RepProps} = get_value(doc, ChangeProps),
+        DocId = get_value(<<"_id">>, RepProps),
+        rep_db_update_error(Error, DocId),
         State
     end,
     {reply, ok, NewState};
 
-handle_call({triggered, {BaseId, _}}, _From, State) ->
-    [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
-    true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}),
+handle_call({rep_started, RepId}, _From, State) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    RepState ->
+        NewRepState = RepState#rep_state{
+            starting = false,
+            retries_left = State#state.max_retries,
+            max_retries = State#state.max_retries,
+            wait = ?INITIAL_WAIT
+        },
+        true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
+    end,
     {reply, ok, State};
 
-handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
-    DocId = get_value(<<"_id">>, Props),
-    [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup(
-        ?DOC_ID_TO_REP_ID, DocId),
-    ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
-        "the document `~s`. Last error reason was: ~p",
-        [pp_rep_id(RepId), MaxRetries, DocId, Error]),
-    couch_rep:update_rep_doc(
-        RepDoc,
-        [{<<"_replication_state">>, <<"error">>},
-            {<<"_replication_id">>, ?l2b(BaseId)}]),
-    true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
-    true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
+handle_call({rep_complete, RepId}, _From, State) ->
+    true = ets:delete(?REP_TO_STATE, RepId),
     {reply, ok, State};
 
+handle_call({rep_error, RepId, Error}, _From, State) ->
+    {reply, ok, replication_error(State, RepId, Error)};
+
 handle_call(Msg, From, State) ->
     ?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
         [Msg, From]),
@@ -150,8 +208,8 @@ terminate(_Reason, State) ->
             catch exit(Pid, stop)
         end,
         [Loop | StartPids]),
-    true = ets:delete(?REP_ID_TO_DOC_ID),
-    true = ets:delete(?DOC_ID_TO_REP_ID),
+    true = ets:delete(?REP_TO_STATE),
+    true = ets:delete(?DOC_TO_REP),
     couch_db_update_notifier:stop(Notifier).
 
 
@@ -160,7 +218,7 @@ code_change(_OldVsn, State, _Extra) ->
 
 
 changes_feed_loop() ->
-    {ok, RepDb} = couch_rep:ensure_rep_db_exists(),
+    {ok, RepDb} = ensure_rep_db_exists(),
     Server = self(),
     Pid = spawn_link(
         fun() ->
@@ -245,21 +303,20 @@ process_update(State, {Change}) ->
         State;
     false ->
         case get_value(<<"_replication_state">>, RepProps) of
+        undefined ->
+            maybe_start_replication(State, DocId, JsonRepDoc);
+        <<"triggered">> ->
+            maybe_start_replication(State, DocId, JsonRepDoc);
         <<"completed">> ->
             replication_complete(DocId),
             State;
-        <<"error">> ->
-            stop_replication(DocId),
-            State;
-        <<"triggered">> ->
-            maybe_start_replication(State, DocId, JsonRepDoc);
-        undefined ->
-            maybe_start_replication(State, DocId, JsonRepDoc)
+        _ ->
+            State
         end
     end.
 
 
-rep_db_update_error(Error, {Props} = JsonRepDoc) ->
+rep_db_update_error(Error, DocId) ->
     case Error of
     {bad_rep_doc, Reason} ->
         ok;
@@ -267,9 +324,8 @@ rep_db_update_error(Error, {Props} = Jso
         Reason = to_binary(Error)
     end,
     ?LOG_ERROR("Replication manager, error processing document `~s`: ~s",
-        [get_value(<<"_id">>, Props), Reason]),
-    couch_rep:update_rep_doc(
-        JsonRepDoc, [{<<"_replication_state">>, <<"error">>}]).
+        [DocId, Reason]),
+    update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]).
 
 
 rep_user_ctx({RepDoc}) ->
@@ -284,30 +340,39 @@ rep_user_ctx({RepDoc}) ->
     end.
 
 
-maybe_start_replication(#state{max_retries = MaxRetries} = State,
-        DocId, JsonRepDoc) ->
-    UserCtx = rep_user_ctx(JsonRepDoc),
-    {BaseId, _} = RepId = make_rep_id(JsonRepDoc, UserCtx),
-    case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
-    [] ->
-        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
-        true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}),
+maybe_start_replication(State, DocId, RepDoc) ->
+    UserCtx = rep_user_ctx(RepDoc),
+    {BaseId, _} = RepId = make_rep_id(RepDoc, UserCtx),
+    case rep_state(RepId) of
+    nil ->
+        RepState = #rep_state{
+            doc_id = DocId,
+            user_ctx = UserCtx,
+            doc = RepDoc,
+            starting = true,
+            retries_left = State#state.max_retries,
+            max_retries = State#state.max_retries
+        },
+        true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
+        true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
+        ?LOG_INFO("Attempting to start replication `~s` (document `~s`).",
+            [pp_rep_id(RepId), DocId]),
         Server = self(),
         Pid = spawn_link(fun() ->
-            start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries)
+            start_replication(Server, RepDoc, RepId, UserCtx, 0)
         end),
         State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
-    [{BaseId, {DocId, _}}] ->
+    #rep_state{doc_id = DocId} ->
         State;
-    [{BaseId, {OtherDocId, false}}] ->
+    #rep_state{starting = false, doc_id = OtherDocId} ->
         ?LOG_INFO("The replication specified by the document `~s` was already"
             " triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
         State;
-    [{BaseId, {OtherDocId, true}}] ->
+    #rep_state{starting = true, doc_id = OtherDocId} ->
         ?LOG_INFO("The replication specified by the document `~s` is already"
             " being triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
         State
     end.
 
@@ -323,98 +388,233 @@ make_rep_id(RepDoc, UserCtx) ->
     end.
 
 
-maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
-    case get_value(<<"_replication_id">>, Props) of
+maybe_tag_rep_doc(DocId, {RepProps}, RepId) ->
+    case get_value(<<"_replication_id">>, RepProps) of
     RepId ->
         ok;
     _ ->
-        couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
+        update_rep_doc(DocId, [{<<"_replication_id">>, RepId}])
     end.
 
 
-start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) ->
+start_replication(Server, RepDoc, RepId, UserCtx, Wait) ->
+    ok = timer:sleep(Wait * 1000),
     case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
     Pid when is_pid(Pid) ->
-        ?LOG_INFO("Document `~s` triggered replication `~s`",
-            [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
-        ok = gen_server:call(Server, {triggered, RepId}, infinity),
+        ok = gen_server:call(Server, {rep_started, RepId}, infinity),
         couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
     Error ->
-        couch_rep:update_rep_doc(
-            RepDoc,
-            [{<<"_replication_state">>, <<"error">>},
-                {<<"_replication_id">>, ?l2b(element(1, RepId))}]),
-        keep_retrying(
-            Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries)
+        replication_error(RepId, Error)
     end.
 
 
-keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) ->
-    ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity);
-
-keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
-    {RepProps} = RepDoc,
-    DocId = get_value(<<"_id">>, RepProps),
-    ?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. "
-        "Retrying in ~p seconds", [pp_rep_id(RepId), DocId, Error, Wait]),
-    ok = timer:sleep(Wait * 1000),
-    case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
-    Pid when is_pid(Pid) ->
-        ok = gen_server:call(Server, {triggered, RepId}, infinity),
-        [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
-        ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
-            [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]),
-        couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
-    NewError ->
-        keep_retrying(
-            Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1)
+replication_complete(DocId) ->
+    case ets:lookup(?DOC_TO_REP, DocId) of
+    [{DocId, RepId}] ->
+        case rep_state(RepId) of
+        nil ->
+            couch_rep:end_replication(RepId);
+        #rep_state{} ->
+            ok
+        end,
+        true = ets:delete(?DOC_TO_REP, DocId);
+    _ ->
+        ok
     end.
 
 
 rep_doc_deleted(DocId) ->
-    case stop_replication(DocId) of
-    {ok, RepId} ->
+    case ets:lookup(?DOC_TO_REP, DocId) of
+    [{DocId, RepId}] ->
+        couch_rep:end_replication(RepId),
+        true = ets:delete(?REP_TO_STATE, RepId),
+        true = ets:delete(?DOC_TO_REP, DocId),
         ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
             " was deleted", [pp_rep_id(RepId), DocId]);
-    none ->
+    [] ->
         ok
     end.
 
 
-replication_complete(DocId) ->
-    case stop_replication(DocId) of
-    {ok, RepId} ->
-        ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
-            [pp_rep_id(RepId), DocId]);
-    none ->
-        ok
+replication_error(State, RepId, Error) ->
+    case rep_state(RepId) of
+    nil ->
+        State;
+    RepState ->
+        maybe_retry_replication(RepId, RepState, Error, State)
     end.
 
-
-stop_replication(DocId) ->
-    case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
-    [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] ->
-        couch_rep:end_replication(RepId),
-        true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
-        true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
-        {ok, RepId};
-    [] ->
-        none
-    end.
+maybe_retry_replication(RepId, #rep_state{retries_left = 0} = RepState, Error, State) ->
+    #rep_state{
+        doc_id = DocId,
+        max_retries = MaxRetries
+    } = RepState,
+    couch_rep:end_replication(RepId),
+    true = ets:delete(?REP_TO_STATE, RepId),
+    true = ets:delete(?DOC_TO_REP, DocId),
+    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+        "~nReached maximum retry attempts (~p).",
+        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
+    State;
+
+maybe_retry_replication(RepId, RepState, Error, State) ->
+    #rep_state{
+        doc_id = DocId,
+        user_ctx = UserCtx,
+        doc = RepDoc
+    } = RepState,
+    #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
+    true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
+    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+        "~nRestarting replication in ~p seconds.",
+        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
+    Server = self(),
+    Pid = spawn_link(fun() ->
+        start_replication(Server, RepDoc, RepId, UserCtx, Wait)
+    end),
+    State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
 
 
 stop_all_replications() ->
     ?LOG_INFO("Stopping all ongoing replications because the replicator"
         " database was deleted or changed", []),
     ets:foldl(
-        fun({_, {RepId, _}}, _) ->
+        fun({_, RepId}, _) ->
             couch_rep:end_replication(RepId)
         end,
-        ok, ?DOC_ID_TO_REP_ID),
-    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
-    true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
+        ok, ?DOC_TO_REP),
+    true = ets:delete_all_objects(?REP_TO_STATE),
+    true = ets:delete_all_objects(?DOC_TO_REP).
+
+
+update_rep_doc(RepDocId, KVs) ->
+    {ok, RepDb} = ensure_rep_db_exists(),
+    try
+        case couch_db:open_doc(RepDb, RepDocId, []) of
+        {ok, LatestRepDoc} ->
+            update_rep_doc(RepDb, LatestRepDoc, KVs);
+        _ ->
+            ok
+        end
+    catch throw:conflict ->
+        % Shouldn't happen, as by default only the role _replicator can
+        % update replication documents.
+        ?LOG_ERROR("Conflict error when updating replication document `~s`."
+            " Retrying.", [RepDocId]),
+        ok = timer:sleep(5),
+        update_rep_doc(RepDocId, KVs)
+    after
+        couch_db:close(RepDb)
+    end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+    NewRepDocBody = lists:foldl(
+        fun({<<"_replication_state">> = K, State} = KV, Body) ->
+                case get_value(K, Body) of
+                State ->
+                    Body;
+                _ ->
+                    Body1 = lists:keystore(K, 1, Body, KV),
+                    lists:keystore(
+                        <<"_replication_state_time">>, 1, Body1,
+                        {<<"_replication_state_time">>, timestamp()})
+                end;
+            ({K, _V} = KV, Body) ->
+                lists:keystore(K, 1, Body, KV)
+        end,
+        RepDocBody, KVs),
+    case NewRepDocBody of
+    RepDocBody ->
+        ok;
+    _ ->
+        % Might not succeed - when the replication doc is deleted right
+        % before this update (not an error, ignore).
+        couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
+    end.
+
+
+% RFC3339 timestamps.
+% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
+timestamp() ->
+    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
+    UTime = erlang:universaltime(),
+    LocalTime = calendar:universal_time_to_local_time(UTime),
+    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
+        calendar:datetime_to_gregorian_seconds(UTime),
+    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
+    iolist_to_binary(
+        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
+            [Year, Month, Day, Hour, Min, Sec,
+                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
+
+zone(Hr, Min) when Hr >= 0, Min >= 0 ->
+    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
+zone(Hr, Min) ->
+    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
+
+
+ensure_rep_db_exists() ->
+    DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+    Opts = [
+        {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
+        sys_db
+    ],
+    case couch_db:open(DbName, Opts) of
+    {ok, Db} ->
+        Db;
+    _Error ->
+        {ok, Db} = couch_db:create(DbName, Opts)
+    end,
+    ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+    {ok, Db}.
+
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+    case couch_db:open_doc(RepDb, DDocID, []) of
+    {ok, _Doc} ->
+        ok;
+    _ ->
+        DDoc = couch_doc:from_json_obj({[
+            {<<"_id">>, DDocID},
+            {<<"language">>, <<"javascript">>},
+            {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+        ]}),
+        {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+    end,
+    ok.
 
 
 % pretty-print replication id
 pp_rep_id({Base, Extension}) ->
     Base ++ Extension.
+
+
+rep_state(RepId) ->
+    case ets:lookup(?REP_TO_STATE, RepId) of
+    [{RepId, RepState}] ->
+        RepState;
+    [] ->
+        nil
+    end.
+
+
+error_reason({error, Reason}) ->
+    Reason;
+error_reason(Reason) ->
+    Reason.
+
+
+retries_value("infinity") ->
+    infinity;
+retries_value(Value) ->
+    list_to_integer(Value).
+
+
+state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
+    Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
+    case Left of
+    infinity ->
+        State#rep_state{wait = Wait2};
+    _ ->
+        State#rep_state{retries_left = Left - 1, wait = Wait2}
+    end.



Mime
View raw message