couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1078273 - in /couchdb/trunk/src/couchdb: couch_api_wrap.erl couch_replicator.erl
Date Sat, 05 Mar 2011 11:20:31 GMT
Author: fdmanana
Date: Sat Mar  5 11:20:31 2011
New Revision: 1078273

URL: http://svn.apache.org/viewvc?rev=1078273&view=rev
Log:
Replicator: better behaviour on checkpoint failure

Replicator processes now terminate immediately after a checkpoint failure.
Checkpoint error detection and logging is improved as well.

Issue identified by, and initial patch provided by Randall. Thanks.
Closes COUCHDB-1080.


Modified:
    couchdb/trunk/src/couchdb/couch_api_wrap.erl
    couchdb/trunk/src/couchdb/couch_replicator.erl

Modified: couchdb/trunk/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_api_wrap.erl?rev=1078273&r1=1078272&r2=1078273&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/trunk/src/couchdb/couch_api_wrap.erl Sat Mar  5 11:20:31 2011
@@ -123,7 +123,9 @@ ensure_full_commit(#httpdb{} = Db) ->
         [{method, post}, {path, "_ensure_full_commit"}, {direct, true},
             {headers, [{"Content-Type", "application/json"}]}],
         fun(201, _, {Props}) ->
-            {ok, get_value(<<"instance_start_time">>, Props)}
+            {ok, get_value(<<"instance_start_time">>, Props)};
+        (_, _, {Props}) ->
+            {error, get_value(<<"error">>, Props)}
         end);
 ensure_full_commit(Db) ->
     couch_db:ensure_full_commit(Db).

Modified: couchdb/trunk/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1078273&r1=1078272&r2=1078273&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Sat Mar  5 11:20:31 2011
@@ -30,7 +30,8 @@
 
 -import(couch_util, [
     get_value/2,
-    get_value/3
+    get_value/3,
+    to_binary/1
 ]).
 
 -import(couch_replicator_utils, [
@@ -326,7 +327,7 @@ handle_info({'EXIT', Pid, normal}, State
         true ->
             case Workers -- [Pid] of
             [] ->
-                {stop, normal, do_last_checkpoint(State)};
+                do_last_checkpoint(State);
             Workers2 ->
                 {noreply, State#rep_state{workers = Workers2}}
             end
@@ -378,8 +379,12 @@ handle_cast({db_compacted, DbName},
     {noreply, State#rep_state{target = NewTarget}};
 
 handle_cast(checkpoint, State) ->
-    State2 = do_checkpoint(State),
-    {noreply, State2#rep_state{timer = start_timer(State)}};
+    case do_checkpoint(State) of
+    {ok, NewState} ->
+        {noreply, NewState#rep_state{timer = start_timer(State)}};
+    Error ->
+        {stop, Error, State}
+    end;
 
 handle_cast({report_seq, Seq},
     #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
@@ -444,10 +449,17 @@ terminate(shutdown, State) ->
     % cancelled replication throught ?MODULE:cancel_replication/1
     terminate_cleanup(State);
 
-terminate(Reason, #rep_state{rep_details = Rep} = State) ->
+terminate(Reason, State) ->
+    #rep_state{
+        source_name = Source,
+        target_name = Target,
+        rep_details = #rep{id = {BaseId, Ext} = RepId, doc = RepDoc}
+    } = State,
+    ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
+        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
     terminate_cleanup(State),
-    update_rep_doc(Rep#rep.doc, [{<<"_replication_state">>, <<"error">>}]),
-    couch_replication_notifier:notify({error, Rep#rep.id, Reason}).
+    update_rep_doc(RepDoc, [{<<"_replication_state">>, <<"error">>}]),
+    couch_replication_notifier:notify({error, RepId, Reason}).
 
 
 terminate_cleanup(State) ->
@@ -460,10 +472,15 @@ terminate_cleanup(State) ->
 
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = ?LOWEST_SEQ} = State) ->
-    cancel_timer(State);
+    {stop, normal, cancel_timer(State)};
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = Seq} = State) ->
-    cancel_timer(do_checkpoint(State#rep_state{current_through_seq = Seq})).
+    case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
+    {ok, NewState} ->
+        {stop, normal, cancel_timer(NewState)};
+    Error ->
+        {stop, Error, State}
+    end.
 
 
 start_timer(State) ->
@@ -569,7 +586,7 @@ checkpoint_interval(_State) ->
     5000.
 
 do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
-    State;
+    {ok, State};
 do_checkpoint(State) ->
     #rep_state{
         source_name=SourceName,
@@ -585,10 +602,16 @@ do_checkpoint(State) ->
         src_starttime = SrcInstanceStartTime,
         tgt_starttime = TgtInstanceStartTime,
         stats = Stats,
-        rep_details = #rep{options = Options, id = {BaseId, Ext}},
+        rep_details = #rep{options = Options},
         session_id = SessionId
     } = State,
     case commit_to_both(Source, Target) of
+    {source_error, Reason} ->
+         {checkpoint_commit_failure,
+             <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
+    {target_error, Reason} ->
+         {checkpoint_commit_failure,
+             <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
     {SrcInstanceStartTime, TgtInstanceStartTime} ->
         ?LOG_INFO("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
             [SourceName, TargetName, NewSeq]),
@@ -632,27 +655,51 @@ do_checkpoint(State) ->
         },
 
         try
-            {ok, {SrcRevPos,SrcRevId}} = couch_api_wrap:update_doc(Source,
-                SourceLog#doc{body=NewRepHistory}, [delay_commit]),
-            {ok, {TgtRevPos,TgtRevId}} = couch_api_wrap:update_doc(Target,
-                TargetLog#doc{body=NewRepHistory}, [delay_commit]),
-            State#rep_state{
+            {SrcRevPos, SrcRevId} = update_checkpoint(
+                Source, SourceLog#doc{body = NewRepHistory}, source),
+            {TgtRevPos, TgtRevId} = update_checkpoint(
+                Target, TargetLog#doc{body = NewRepHistory}, target),
+            NewState = State#rep_state{
                 checkpoint_history = NewRepHistory,
                 committed_seq = NewSeq,
                 source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
                 target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
-            }
-        catch throw:conflict ->
-            ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
-                "yourself?)", []),
-            State
+            },
+            {ok, NewState}
+        catch throw:{checkpoint_commit_failure, _} = Failure ->
+            Failure
         end;
-    _Else ->
-        ?LOG_INFO("rebooting replication `~s` (`~s` -> `~s`) from last known "
-            "replication checkpoint", [BaseId ++ Ext, SourceName, TargetName]),
-        RepInfo = io_lib:format("replication `~s` (`~s` -> `~s`)",
-            [BaseId ++ Ext, SourceName, TargetName]),
-        exit({checkpoint_commit_failure, lists:flatten(RepInfo)})
+    {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Target database out of sync. "
+            "Try to increase max_dbs_open at the target's server.">>};
+    {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Source database out of sync. "
+            "Try to increase max_dbs_open at the source's server.">>};
+    {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Source and target databases out of "
+            "sync. Try to increase max_dbs_open at both servers.">>}
+    end.
+
+
+update_checkpoint(Db, Doc, DbType) ->
+    try
+        update_checkpoint(Db, Doc)
+    catch throw:{checkpoint_commit_failure, Reason} ->
+        throw({checkpoint_commit_failure,
+            <<"Error updating the ", (to_binary(DbType))/binary,
+                " checkpoint document: ", (to_binary(Reason))/binary>>})
+    end.
+
+update_checkpoint(Db, Doc) ->
+    try
+        case couch_api_wrap:update_doc(Db, Doc, [delay_commit]) of
+        {ok, PosRevId} ->
+            PosRevId;
+        {error, Reason} ->
+            throw({checkpoint_commit_failure, Reason})
+        end
+    catch throw:conflict ->
+        throw({checkpoint_commit_failure, conflict})
     end.
 
 
@@ -661,24 +708,32 @@ commit_to_both(Source, Target) ->
     ParentPid = self(),
     SrcCommitPid = spawn_link(
         fun() ->
-            ParentPid ! {self(), couch_api_wrap:ensure_full_commit(Source)}
+            Result = (catch couch_api_wrap:ensure_full_commit(Source)),
+            ParentPid ! {self(), Result}
         end),
 
     % commit tgt sync
-    {ok, TargetStartTime} = couch_api_wrap:ensure_full_commit(Target),
+    TargetResult = (catch couch_api_wrap:ensure_full_commit(Target)),
 
-    SourceStartTime =
-    receive
-    {SrcCommitPid, {ok, Timestamp}} ->
-        receive
-        {'EXIT', SrcCommitPid, normal} ->
-            ok
-        end,
-        Timestamp;
-    {'EXIT', SrcCommitPid, _} ->
-        exit(replication_link_failure)
+    SourceResult = receive
+    {SrcCommitPid, Result} ->
+        unlink(SrcCommitPid),
+        receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
+        Result;
+    {'EXIT', SrcCommitPid, Reason} ->
+        {error, Reason}
     end,
-    {SourceStartTime, TargetStartTime}.
+    case TargetResult of
+    {ok, TargetStartTime} ->
+        case SourceResult of
+        {ok, SourceStartTime} ->
+            {SourceStartTime, TargetStartTime};
+        SourceError ->
+            {source_error, SourceError}
+        end;
+    TargetError ->
+        {target_error, TargetError}
+    end.
 
 
 compare_replication_logs(SrcDoc, TgtDoc) ->



Mime
View raw message