couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject svn commit: r830742 - in /couchdb/branches/0.10.x: ./ etc/default/couchdb src/couchdb/couch_rep.erl src/couchdb/couch_rep_reader.erl
Date Wed, 28 Oct 2009 19:44:27 GMT
Author: kocolosk
Date: Wed Oct 28 19:44:26 2009
New Revision: 830742

URL: http://svn.apache.org/viewvc?rev=830742&view=rev
Log:
reboot replication from last checkpoint if DB is compacted or server restarts

Modified:
    couchdb/branches/0.10.x/   (props changed)
    couchdb/branches/0.10.x/etc/default/couchdb   (props changed)
    couchdb/branches/0.10.x/src/couchdb/couch_rep.erl
    couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl

Propchange: couchdb/branches/0.10.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 28 19:44:26 2009
@@ -4,4 +4,4 @@
 /couchdb/branches/list-iterator:782292-784593
 /couchdb/branches/tail_header:775760-778477
 /couchdb/tags/0.10.0:825400
-/couchdb/trunk:806983,807208-807478,807771,808574,808632,808716,808876,809134,809977,810015,810028,810350,810358,810435,811910,813803,815921,817278,817398,817400,817403,817749,817793,818249,818357,819091,819341,819343,819436,819799,819977,820344,820469,820495,820851,825407,826692,829871,829919-829920
+/couchdb/trunk:806983,807208-807478,807771,808574,808632,808716,808876,809134,809977,810015,810028,810350,810358,810435,811910,813803,815921,817278,817398,817400,817403,817749,817793,818249,818357,819091,819341,819343,819436,819799,819977,820344,820469,820495,820851,825407,826692,829871,829919-829920,830737

Propchange: couchdb/branches/0.10.x/etc/default/couchdb
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 28 19:44:26 2009
@@ -4,5 +4,5 @@
 /couchdb/branches/list-iterator/etc/default/couchdb:782292-784593
 /couchdb/branches/tail_header/etc/default/couchdb:775760-778477
 /couchdb/tags/0.10.0/etc/default/couchdb:825400
-/couchdb/trunk/etc/default/couchdb:806983,807208-807478,807771,808574,808632,808716,808876,809134,809977,810015,810028,810350,810358,810435,811910,813803,815921,817277-817278,817398,817400,817403,817749,817793,818249,818357,819091,819341,819343,819436,819799,819977,820344,820469,820495,820851,825407,826692,829871,829919-829920
+/couchdb/trunk/etc/default/couchdb:806983,807208-807478,807771,808574,808632,808716,808876,809134,809977,810015,810028,810350,810358,810435,811910,813803,815921,817277-817278,817398,817400,817403,817749,817793,818249,818357,819091,819341,819343,819436,819799,819977,820344,820469,820495,820851,825407,826692,829871,829919-829920,830737
 /incubator/couchdb/trunk/etc/default/couchdb:642419-694440

Modified: couchdb/branches/0.10.x/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_rep.erl?rev=830742&r1=830741&r2=830742&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_rep.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_rep.erl Wed Oct 28 19:44:26 2009
@@ -482,53 +482,62 @@
         tgt_starttime = TgtInstanceStartTime,
         stats = Stats
     } = State,
-    ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
-    RecordSeqNum = case commit_to_both(Source, Target, NewSeqNum) of
+    case commit_to_both(Source, Target, NewSeqNum) of
     {SrcInstanceStartTime, TgtInstanceStartTime} ->
-        NewSeqNum;
+        ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
+            [dbname(Source), dbname(Target), NewSeqNum]),
+        SessionId = couch_uuids:random(),
+        NewHistoryEntry = {[
+            {<<"session_id">>, SessionId},
+            {<<"start_time">>, list_to_binary(ReplicationStartTime)},
+            {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+            {<<"start_last_seq">>, StartSeqNum},
+            {<<"end_last_seq">>, NewSeqNum},
+            {<<"recorded_seq">>, NewSeqNum},
+            {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+            {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+            {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+            {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+            {<<"doc_write_failures">>, 
+                ets:lookup_element(Stats, doc_write_failures, 2)}
+        ]},
+        % limit history to 50 entries
+        NewRepHistory = {[
+            {<<"session_id">>, SessionId},
+            {<<"source_last_seq">>, NewSeqNum},
+            {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
+        ]},
+
+        try
+        {SrcRevPos,SrcRevId} =
+            update_local_doc(Source, SourceLog#doc{body=NewRepHistory}),
+        {TgtRevPos,TgtRevId} =
+            update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
+        State#state{
+            checkpoint_scheduled = nil,
+            checkpoint_history = NewRepHistory,
+            source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+            target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+        }
+        catch throw:conflict ->
+        ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
+            "yourself?)", []),
+        State
+        end;
     _Else ->
-        ?LOG_INFO("A server has restarted since replication start. "
-            "Not recording the new sequence number to ensure the "
-            "replication is redone and documents reexamined.", []),
-        StartSeqNum
-    end,
-    SessionId = couch_util:new_uuid(),
-    NewHistoryEntry = {[
-        {<<"session_id">>, SessionId},
-        {<<"start_time">>, list_to_binary(ReplicationStartTime)},
-        {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
-        {<<"start_last_seq">>, StartSeqNum},
-        {<<"end_last_seq">>, NewSeqNum},
-        {<<"recorded_seq">>, RecordSeqNum},
-        {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
-        {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
-        {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
-        {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
-        {<<"doc_write_failures">>, 
-            ets:lookup_element(Stats, doc_write_failures, 2)}
-    ]},
-    % limit history to 50 entries
-    NewRepHistory = {[
-        {<<"session_id">>, SessionId},
-        {<<"source_last_seq">>, RecordSeqNum},
-        {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
-    ]},
-
-    try
-    {SrcRevPos,SrcRevId} =
-        update_local_doc(Source, SourceLog#doc{body=NewRepHistory}),
-    {TgtRevPos,TgtRevId} =
-        update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
-    State#state{
-        checkpoint_scheduled = nil,
-        checkpoint_history = NewRepHistory,
-        source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
-        target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
-    }
-    catch throw:conflict ->
-    ?LOG_ERROR("checkpoint failure: conflict (are you replicating to yourself?)",
-        []),
-    State
+        ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint",
+            [dbname(Source), dbname(Target)]),
+        #state{
+            changes_feed = CF,
+            missing_revs = MR,
+            reader = Reader,
+            writer = Writer
+        } = State,
+        Pids = [CF, MR, Reader, Writer],
+        [unlink(Pid) || Pid <- Pids],
+        [exit(Pid, shutdown) || Pid <- Pids],
+        {ok, NewState} = init(State#state.init_args),
+        NewState
     end.
 
 commit_to_both(Source, Target, RequiredSeq) ->
@@ -565,12 +574,12 @@
     InstanceStartTime = NewDb#db.instance_start_time,
     couch_db:close(NewDb),
     if UpdateSeq > CommitSeq ->
-        ?LOG_DEBUG("replication needs a full commit: update ~p commit ~p",
+        ?LOG_DEBUG("target needs a full commit: update ~p commit ~p",
             [UpdateSeq, CommitSeq]),
         {ok, DbStartTime} = couch_db:ensure_full_commit(Target),
         DbStartTime;
     true ->
-        ?LOG_DEBUG("replication doesn't need a full commit", []),
+        ?LOG_DEBUG("target doesn't need a full commit", []),
         InstanceStartTime
     end.
 
@@ -592,9 +601,12 @@
     InstanceStartTime = NewDb#db.instance_start_time,
     couch_db:close(NewDb),
     if RequiredSeq > CommitSeq ->
+        ?LOG_DEBUG("source needs a full commit: required ~p committed ~p",
+            [RequiredSeq, CommitSeq]),
         {ok, DbStartTime} = couch_db:ensure_full_commit(Source),
         DbStartTime;
     true ->
+        ?LOG_DEBUG("source doesn't need a full commit", []),
         InstanceStartTime
     end.
 

Modified: couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl?rev=830742&r1=830741&r2=830742&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl Wed Oct 28 19:44:26 2009
@@ -262,6 +262,7 @@
 
 maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
     {ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]),
+    couch_db:close(Db),
     NewDb;
 maybe_reopen_db(Db, _HighSeq) ->
     Db.



Mime
View raw message