couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1004195 - /couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
Date Mon, 04 Oct 2010 10:55:47 GMT
Author: fdmanana
Date: Mon Oct  4 10:55:47 2010
New Revision: 1004195

URL: http://svn.apache.org/viewvc?rev=1004195&view=rev
Log:
New replicator: accumulate seq_changes_done messages for documents not written in bulk mode
as well.

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1004195&r1=1004194&r2=1004195&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Mon Oct 
4 10:55:47 2010
@@ -37,8 +37,7 @@ spawn_doc_copiers(Cp, Source, Target, Mi
     seqs = [],
     read = 0,
     written = 0,
-    wfail = 0,
-    cp
+    wfail = 0
 }).
 
 doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) ->
@@ -56,9 +55,12 @@ doc_copy_loop(CopierId, Cp, Source, Targ
                     fun(R, A) -> doc_handler(R, nil, Target, A) end, Acc),
                 Acc2
             end,
-            #doc_acc{cp = Cp}, DocIds),
+            #doc_acc{}, DocIds),
         maybe_send_stat(DocAcc#doc_acc.read, #rep_stats.docs_read, Cp),
         #doc_acc{written = W, wfail = Wf} = bulk_write_docs(DocAcc, Target),
+        #doc_acc{written = W, wfail = Wf, seqs = SeqsDone} =
+            bulk_write_docs(DocAcc, Target),
+        seqs_done(SeqsDone, Cp),
         maybe_send_stat(W, #rep_stats.docs_written, Cp),
         maybe_send_stat(Wf, #rep_stats.doc_write_failures, Cp),
         doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue);
@@ -74,9 +76,11 @@ doc_copy_loop(CopierId, Cp, Source, Targ
                     BulkAcc),
                 {SrcDb2, BulkAcc2}
             end,
-            {Source, #doc_acc{cp = Cp}}, IdRevList),
+            {Source, #doc_acc{}}, IdRevList),
         maybe_send_stat(DocAcc#doc_acc.read, #rep_stats.docs_read, Cp),
-        #doc_acc{written = W, wfail = Wf} = bulk_write_docs(DocAcc, Target),
+        #doc_acc{written = W, wfail = Wf, seqs = SeqsDone} =
+            bulk_write_docs(DocAcc, Target),
+        seqs_done(SeqsDone, Cp),
         maybe_send_stat(W, #rep_stats.docs_written, Cp),
         maybe_send_stat(Wf, #rep_stats.doc_write_failures, Cp),
         doc_copy_loop(CopierId, Cp, Source2, Target, MissingRevsQueue)
@@ -115,23 +119,27 @@ update_bulk_doc_acc(#doc_acc{seqs = Seqs
 
 
 write_doc(Doc, Seq, Db, #doc_acc{written = W, wfail = F, read = R} = Acc) ->
-    Acc2 = case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
+    SeqsDone = case Acc#doc_acc.seqs of
+    [{Seq, Count} | RestSeqs] ->
+        [{Seq, Count + 1} | RestSeqs];
+    RestSeqs ->
+        [{Seq, 1} | RestSeqs]
+    end,
+    case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
     {ok, _} ->
-        Acc#doc_acc{written = W + 1, read = R + 1};
+        Acc#doc_acc{written = W + 1, read = R + 1, seqs = SeqsDone};
     {error, <<"unauthorized">>} ->
         ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
             [Doc#doc.id, couch_api_wrap:db_uri(Db)]),
-        Acc#doc_acc{wfail = F + 1, read = R + 1};
+        Acc#doc_acc{wfail = F + 1, read = R + 1, seqs = SeqsDone};
     _ ->
-        Acc#doc_acc{wfail = F + 1, read = R + 1}
-    end,
-    seqs_done([{Seq, 1}], Acc#doc_acc.cp),
-    Acc2.
+        Acc#doc_acc{wfail = F + 1, read = R + 1, seqs = SeqsDone}
+    end.
 
 
 bulk_write_docs(#doc_acc{docs = []} = Acc, _) ->
     Acc;
-bulk_write_docs(#doc_acc{docs = Docs, seqs = Seqs, cp = Cp} = Acc, Db) ->
+bulk_write_docs(#doc_acc{docs = Docs, written = W, wfail = Wf} = Acc, Db) ->
     {ok, Errors} = couch_api_wrap:update_docs(
         Db, Docs, [delay_commit], replicated_changes),
     DbUri = couch_api_wrap:db_uri(Db),
@@ -142,10 +150,9 @@ bulk_write_docs(#doc_acc{docs = Docs, se
             (_) ->
                 ok
         end, Errors),
-    seqs_done(Seqs, Cp),
     Acc#doc_acc{
-        wfail = Acc#doc_acc.wfail + length(Errors),
-        written = Acc#doc_acc.written + length(Docs) - length(Errors)
+        wfail = Wf + length(Errors),
+        written = W + length(Docs) - length(Errors)
     }.
 
 



Mime
View raw message