Author: fdmanana Date: Sat Oct 2 00:53:15 2010 New Revision: 1003724 URL: http://svn.apache.org/viewvc?rev=1003724&view=rev Log: New replicator: accumulate stats where possible to avoid flooding replication gen_servers with tons of messages. Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1003724&r1=1003723&r2=1003724&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Sat Oct 2 00:53:15 2010 @@ -32,99 +32,121 @@ spawn_doc_copiers(Cp, Source, Target, Mi lists:seq(1, CopiersCount)). +-record(doc_acc, { + docs = [], + seqs = [], + read = 0, + written = 0, + wfail = 0, + cp +}). + doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) -> case couch_work_queue:dequeue(MissingRevsQueue, ?DOC_BATCH_SIZE) of closed -> ?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [CopierId]), Cp ! {done, CopierId}; + {ok, [{doc_id, _} | _] = DocIds} -> - {BulkList, []} = lists:foldl( + DocAcc = lists:foldl( fun({doc_id, Id}, Acc) -> ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [CopierId, Id]), {ok, Acc2} = couch_api_wrap:open_doc_revs( Source, Id, all, [], - fun(R, A) -> doc_handler(R, nil, Target, Cp, A) end, Acc), + fun(R, A) -> doc_handler(R, nil, Target, A) end, Acc), Acc2 end, - {[], []}, DocIds), - bulk_write_docs(lists:reverse(BulkList), [], Target, Cp), + #doc_acc{cp = Cp}, DocIds), + maybe_send_stat(DocAcc#doc_acc.read, #rep_stats.docs_read, Cp), + #doc_acc{written = W, wfail = Wf} = bulk_write_docs(DocAcc, Target), + maybe_send_stat(W, #rep_stats.docs_written, Cp), + maybe_send_stat(Wf, #rep_stats.doc_write_failures, Cp), doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue); + {ok, IdRevList} -> - {Source2, {BulkList, SeqList}} = lists:foldl( + {Source2, DocAcc} = lists:foldl( fun({Id, Revs, PossibleAncestors, Seq} = IdRev, {SrcDb, BulkAcc}) -> ?LOG_DEBUG("Doc copier ~p got ~p", [CopierId, IdRev]), SrcDb2 = couch_api_wrap:maybe_reopen_db(SrcDb, Seq), {ok, BulkAcc2} = couch_api_wrap:open_doc_revs( SrcDb2, Id, Revs, [{atts_since, PossibleAncestors}], - fun(R, A) -> doc_handler(R, Seq, Target, Cp, A) end, + fun(R, A) -> doc_handler(R, Seq, Target, A) end, BulkAcc), {SrcDb2, BulkAcc2} end, - {Source, {[], []}}, IdRevList), - bulk_write_docs( - lists:reverse(BulkList), - lists:reverse(SeqList), - Target, - Cp), + {Source, #doc_acc{cp = Cp}}, IdRevList), + maybe_send_stat(DocAcc#doc_acc.read, #rep_stats.docs_read, Cp), + #doc_acc{written = W, wfail = Wf} = bulk_write_docs(DocAcc, Target), + maybe_send_stat(W, #rep_stats.docs_written, Cp), + maybe_send_stat(Wf, #rep_stats.doc_write_failures, Cp), doc_copy_loop(CopierId, Cp, Source2, Target, MissingRevsQueue) end. -doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Cp, Acc) -> - Cp ! {add_stat, {#rep_stats.docs_read, 1}}, +doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Acc) -> update_bulk_doc_acc(Acc, Seq, Doc); -doc_handler({ok, Doc}, Seq, Target, Cp, Acc) -> - Cp ! {add_stat, {#rep_stats.docs_read, 1}}, - write_doc(Doc, Seq, Target, Cp), - Acc; +doc_handler({ok, Doc}, Seq, Target, Acc) -> + write_doc(Doc, Seq, Target, Acc); -doc_handler(_, _, _, _, Acc) -> +doc_handler(_, _, _, Acc) -> Acc. -update_bulk_doc_acc({DocAcc, SeqAcc}, nil, Doc) -> - {[Doc | DocAcc], SeqAcc}; -update_bulk_doc_acc({DocAcc, [{Seq, Count} | RestSeq]}, Seq, Doc) -> - {[Doc | DocAcc], [{Seq, Count + 1} | RestSeq]}; -update_bulk_doc_acc({DocAcc, SeqAcc}, Seq, Doc) -> - {[Doc | DocAcc], [{Seq, 1} | SeqAcc]}. +update_bulk_doc_acc(#doc_acc{docs = Docs, read = Read} = Acc, nil, Doc) -> + Acc#doc_acc{ + docs = [Doc | Docs], + read = Read + 1 + }; + +update_bulk_doc_acc(#doc_acc{seqs = [{Seq, Count} | Rest]} = Acc, Seq, Doc) -> + Acc#doc_acc{ + docs = [Doc | Acc#doc_acc.docs], + seqs = [{Seq, Count + 1} | Rest], + read = Acc#doc_acc.read + 1 + }; + +update_bulk_doc_acc(#doc_acc{seqs = Seqs, read = Read} = Acc, Seq, Doc) -> + Acc#doc_acc{ + docs = [Doc | Acc#doc_acc.docs], + seqs = [{Seq, 1} | Seqs], + read = Read + 1 + }. -write_doc(Doc, Seq, Db, Cp) -> - case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of +write_doc(Doc, Seq, Db, #doc_acc{written = W, wfail = F, read = R} = Acc) -> + Acc2 = case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of {ok, _} -> - Cp ! {add_stat, {#rep_stats.docs_written, 1}}; + Acc#doc_acc{written = W + 1, read = R + 1}; {error, <<"unauthorized">>} -> - Cp ! {add_stat, {#rep_stats.doc_write_failures, 1}}, ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s", - [Doc#doc.id, couch_api_wrap:db_uri(Db)]); + [Doc#doc.id, couch_api_wrap:db_uri(Db)]), + Acc#doc_acc{wfail = F + 1, read = R + 1}; _ -> - Cp ! {add_stat, {#rep_stats.doc_write_failures, 1}} + Acc#doc_acc{wfail = F + 1, read = R + 1} end, - seqs_done([{Seq, 1}], Cp). + seqs_done([{Seq, 1}], Acc#doc_acc.cp), + Acc2. -bulk_write_docs([], _, _, _) -> - ok; -bulk_write_docs(Docs, Seqs, Db, Cp) -> - case couch_api_wrap:update_docs( - Db, Docs, [delay_commit], replicated_changes) of - {ok, []} -> - Cp ! {add_stat, {#rep_stats.docs_written, length(Docs)}}; - {ok, Errors} -> - Cp ! {add_stat, {#rep_stats.doc_write_failures, length(Errors)}}, - Cp ! {add_stat, {#rep_stats.docs_written, length(Docs) - length(Errors)}}, - DbUri = couch_api_wrap:db_uri(Db), - lists:foreach( - fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) -> - ?LOG_ERROR("Replicator: unauthorized to write document" - " ~s to ~s", [Id, DbUri]); - (_) -> - ok - end, Errors) - end, - seqs_done(Seqs, Cp). +bulk_write_docs(#doc_acc{docs = []} = Acc, _) -> + Acc; +bulk_write_docs(#doc_acc{docs = Docs, seqs = Seqs, cp = Cp} = Acc, Db) -> + {ok, Errors} = couch_api_wrap:update_docs( + Db, Docs, [delay_commit], replicated_changes), + DbUri = couch_api_wrap:db_uri(Db), + lists:foreach( + fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) -> + ?LOG_ERROR("Replicator: unauthorized to write document" + " ~s to ~s", [Id, DbUri]); + (_) -> + ok + end, Errors), + seqs_done(Seqs, Cp), + Acc#doc_acc{ + wfail = Acc#doc_acc.wfail + length(Errors), + written = Acc#doc_acc.written + length(Docs) - length(Errors) + }. seqs_done([], _) -> @@ -133,3 +155,10 @@ seqs_done([{nil, _} | _], _) -> ok; seqs_done(SeqCounts, Cp) -> Cp ! {seq_changes_done, SeqCounts}. + + +maybe_send_stat(0, _StatPos, _Cp) -> + ok; +maybe_send_stat(Value, StatPos, Cp) -> + Cp ! {add_stat, {StatPos, Value}}. + Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1003724&r1=1003723&r2=1003724&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Sat Oct 2 00:53:15 2010 @@ -74,12 +74,14 @@ missing_revs_finder_loop(FinderId, Cp, T {_Id, {Revs, Seq}} <- dict:to_list(NonMissingIdRevsSeqDict)]}, % Expand out each docs and seq into it's own work item - lists:foreach(fun({Id, Revs, PAs}) -> - % PA means "possible ancestor" - Cp ! {add_stat, {#rep_stats.missing_found, length(Revs)}}, - {_, Seq} = dict:fetch(Id, IdRevsSeqDict), - ok = couch_work_queue:queue(RevsQueue, {Id, Revs, PAs, Seq}) - end, Missing), + MissingCount = lists:foldl( + fun({Id, Revs, PAs}, Count) -> + % PA means "possible ancestor" + {_, Seq} = dict:fetch(Id, IdRevsSeqDict), + ok = couch_work_queue:queue(RevsQueue, {Id, Revs, PAs, Seq}), + Count + length(Revs) + end, 0, Missing), + Cp ! {add_stat, {#rep_stats.missing_found, MissingCount}}, missing_revs_finder_loop(FinderId, Cp, Target, ChangesQueue, RevsQueue) end.