Return-Path: Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: (qmail 43265 invoked from network); 4 Mar 2009 02:54:10 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Mar 2009 02:54:10 -0000 Received: (qmail 8751 invoked by uid 500); 4 Mar 2009 02:54:10 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 8728 invoked by uid 500); 4 Mar 2009 02:54:10 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 8719 invoked by uid 99); 4 Mar 2009 02:54:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2009 18:54:10 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2009 02:54:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E7E3E2388999; Wed, 4 Mar 2009 02:53:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r749886 - in /couchdb/branches/rep_security: share/www/script/couch_tests.js src/couchdb/couch_db.erl src/couchdb/couch_rep.erl Date: Wed, 04 Mar 2009 02:53:48 -0000 To: commits@couchdb.apache.org From: damien@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090304025348.E7E3E2388999@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: damien Date: Wed Mar 4 02:53:48 2009 New Revision: 749886 URL: http://svn.apache.org/viewvc?rev=749886&view=rev Log: changes to how replication stats are tracked. Modified: couchdb/branches/rep_security/share/www/script/couch_tests.js couchdb/branches/rep_security/src/couchdb/couch_db.erl couchdb/branches/rep_security/src/couchdb/couch_rep.erl Modified: couchdb/branches/rep_security/share/www/script/couch_tests.js URL: http://svn.apache.org/viewvc/couchdb/branches/rep_security/share/www/script/couch_tests.js?rev=749886&r1=749885&r2=749886&view=diff ============================================================================== --- couchdb/branches/rep_security/share/www/script/couch_tests.js [utf-8] (original) +++ couchdb/branches/rep_security/share/www/script/couch_tests.js [utf-8] Wed Mar 4 02:53:48 2009 @@ -3111,6 +3111,9 @@ // Now delete document T(user2Db.deleteDoc(doc).ok); + + + // Now test replication var AuthHeaders = {"WWW-Authenticate": "X-Couch-Test-Auth Christopher Lenz:dog food"}; var host = CouchDB.host; var dbPairs = [ Modified: couchdb/branches/rep_security/src/couchdb/couch_db.erl URL: http://svn.apache.org/viewvc/couchdb/branches/rep_security/src/couchdb/couch_db.erl?rev=749886&r1=749885&r2=749886&view=diff ============================================================================== --- couchdb/branches/rep_security/src/couchdb/couch_db.erl (original) +++ couchdb/branches/rep_security/src/couchdb/couch_db.erl Wed Mar 4 02:53:48 2009 @@ -483,7 +483,7 @@ close(Db2), case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of {ok, Conflicts} -> {ok, Conflicts}; - Else -> throw(Else) + retry -> throw({update_error, compaction_retry}) end end. Modified: couchdb/branches/rep_security/src/couchdb/couch_rep.erl URL: http://svn.apache.org/viewvc/couchdb/branches/rep_security/src/couchdb/couch_rep.erl?rev=749886&r1=749885&r2=749886&view=diff ============================================================================== --- couchdb/branches/rep_security/src/couchdb/couch_rep.erl (original) +++ couchdb/branches/rep_security/src/couchdb/couch_rep.erl Wed Mar 4 02:53:48 2009 @@ -19,8 +19,17 @@ headers }). +-record(rep_stats, { + docs_checked=0, + docs_missing=0, + docs_read=0, + docs_written=0, + doc_write_failures=0 +}). + -export([replicate/2, replicate/3]). + url_encode(Bin) when is_binary(Bin) -> url_encode(binary_to_list(Bin)); url_encode([H|T]) -> @@ -110,7 +119,8 @@ false -> ?LOG_INFO("Replication records differ. " "Performing full replication instead of incremental.", []), - ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", [OldRepHistoryProps, OldRepHistoryPropsTrg]), + ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", + [OldRepHistoryProps, OldRepHistoryPropsTrg]), 0 end, @@ -138,13 +148,16 @@ "replication is redone and documents reexamined.", []), SeqNum end, - + [rep_stats | StatsList] = tuple_to_list(Stats), + StatFieldNames = + [?l2b(tuple_to_list(T)) || T <- record_info(fields, rep_stats)], + StatProps = lists:zip(StatFieldNames, StatsList), HistEntries =[ { [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, {<<"start_last_seq">>, SeqNum}, - {<<"end_last_seq">>, NewSeqNum} | Stats]} + {<<"end_last_seq">>, NewSeqNum} | StatProps]} | proplists:get_value("history", OldRepHistoryProps, [])], % something changed, record results NewRepHistory = @@ -160,7 +173,7 @@ pull_rep(DbTarget, DbSource, SourceSeqNum) -> {ok, {NewSeq, Stats}} = - enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, []}), + enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, #rep_stats{}}), {NewSeq, Stats}. do_http_request(Url, Action, Headers) -> @@ -209,34 +222,49 @@ do_http_request(Url, Action, Headers, JsonBody, Retries - 1) end. -save_docs_buffer(DbTarget, DocsBuffer, []) -> +save_docs_buffer(DbTarget, DocsBuffer, [], Stats) -> receive {Src, shutdown} -> - {ok, _UpdateErrors} = update_docs(DbTarget, lists:reverse(DocsBuffer), [], replicated_changes), - Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]} + Stats2 = save_docs_with_stats(DbTarget, DocsBuffer, Stats), + Src ! {done, self(), Stats2} end; -save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences) -> +save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences, Stats) -> [NextSeq|Rest] = UpdateSequences, receive {Src, skip, NextSeq} -> Src ! got_it, - save_docs_buffer(DbTarget, DocsBuffer, Rest); + save_docs_buffer(DbTarget, DocsBuffer, Rest, Stats); {Src, docs, {NextSeq, Docs}} -> Src ! got_it, case couch_util:should_flush() of true -> - {ok, _UpdateErrors} = update_docs(DbTarget, lists:reverse(Docs++DocsBuffer), [], - replicated_changes), - save_docs_buffer(DbTarget, [], Rest); + Stats2 = + save_docs_with_stats(DbTarget, Docs++DocsBuffer, Stats), + save_docs_buffer(DbTarget, [], Rest, Stats2); false -> - save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest) + save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest, Stats) end; - {Src, shutdown} -> + {Src, shutdown} -> ?LOG_ERROR("received shutdown while waiting for more update_seqs", []), - {ok, _Errors} = update_docs(DbTarget, lists:reverse(DocsBuffer), [], replicated_changes), - Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]} + Stats2 = save_docs_with_stats(DbTarget,DocsBuffer,Stats), + Src ! {done, self(), Stats2} end. +save_docs_with_stats(Db, Docs, Stats) -> + {ok, Errors} = update_docs(Db, Docs, [], replicated_changes), + dump_update_errors(Errors), + Stats#rep_stats{ + docs_written=Stats#rep_stats.docs_written+length(Docs)-length(Errors), + doc_write_failures=Stats#rep_stats.doc_write_failures+length(Errors)}. + + +dump_update_errors([]) -> ok; +dump_update_errors([{{Id, Rev}, Error}|Rest]) -> + ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p", + [Id, couch_doc:rev_to_str(Rev), Error]), + dump_update_errors(Rest). + + pmap(F,List) -> [wait_result(Worker) || Worker <- [spawn_worker(self(),F,E) || E <- List]]. @@ -251,14 +279,15 @@ enum_docs_parallel(DbS, DbT, InfoList) -> UpdateSeqs = [Seq || {_, Seq, _, _} <- InfoList], - SaveDocsPid = spawn_link(fun() -> save_docs_buffer(DbT,[],UpdateSeqs) end), + SaveDocsPid = spawn_link(fun() -> + save_docs_buffer(DbT,[],UpdateSeqs, #rep_stats{}) end), - Stats = pmap(fun({Id, Seq, SrcRevs, MissingRevs}) -> + ReadStatsList = pmap(fun({Id, Seq, SrcRevs, MissingRevs}) -> case MissingRevs of [] -> SaveDocsPid ! {self(), skip, Seq}, receive got_it -> ok end, - [{missing_checked, length(SrcRevs)}]; + #rep_stats{docs_checked=length(SrcRevs)}; _ -> {ok, DocResults} = open_doc_revs(DbS, Id, MissingRevs, [latest]), @@ -268,29 +297,33 @@ % include update_seq so we save docs in order SaveDocsPid ! {self(), docs, {Seq, Docs}}, receive got_it -> ok end, - [{missing_checked, length(SrcRevs)}, - {missing_found, length(MissingRevs)}, - {docs_read, length(Docs)}] + #rep_stats{docs_checked=length(SrcRevs), + docs_missing=length(MissingRevs), + docs_read=length(Docs)} end end, InfoList), SaveDocsPid ! {self(), shutdown}, - {MissingChecked, MissingFound, DocsRead} = lists:foldl(fun(S, {C, F, R}) -> - C1 = C + proplists:get_value(missing_checked, S, 0), - F1 = F + proplists:get_value(missing_found, S, 0), - R1 = R + proplists:get_value(docs_read, S, 0), - {C1, F1, R1} - end, {0, 0, 0}, Stats), - receive - {done, SaveDocsPid, [{<<"docs_written">>, DocsWritten}]} -> ok + {done, SaveDocsPid, WriteStats} -> ok end, - [ {<<"missing_checked">>, MissingChecked}, - {<<"missing_found">>, MissingFound}, - {<<"docs_read">>, DocsRead}, - {<<"docs_written">>, DocsWritten} ]. + lists:foldl( + fun(StatIn, AccStat) -> + sum_rep_stats(StatIn, AccStat) + end, #rep_stats{}, [WriteStats | ReadStatsList]). + + +sum_rep_stats(StatsA, StatsB) -> + % Quick and dirty sum matchng members of the records + % convert to lists + [rep_stats | MembersA] = tuple_to_list(StatsA), + [rep_stats | MembersB] = tuple_to_list(StatsB), + % pairwise add the members and convert back to the record + list_to_tuple([rep_stats | + lists:zipwith(fun(A,B) -> A + B end, MembersA, MembersB)]). + fix_url(UrlBin) -> Url = binary_to_list(UrlBin), @@ -358,11 +391,11 @@ end, {0, []}), lists:reverse(DocInfoList). -enum_docs_since(DbSource, DbTarget, StartSeq, InAcc) -> +enum_docs_since(DbSource, DbTarget, StartSeq, {AccLastSeq, AccStats}) -> DocInfoList = get_doc_info_list(DbSource, StartSeq), case DocInfoList of [] -> - {ok, InAcc}; + {ok, {AccLastSeq, AccStats}}; _ -> UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList], SrcRevsList = lists:map(fun(SrcDocInfo) -> @@ -380,21 +413,7 @@ {Id, Seq, SrcRevs, MissingRevs} end, lists:zip(SrcRevsList, UpdateSeqs)), Stats = enum_docs_parallel(DbSource, DbTarget, InfoList), - OldStats = element(2, InAcc), - TotalStats = [ - {<<"missing_checked">>, - proplists:get_value(<<"missing_checked">>, OldStats, 0) + - proplists:get_value(<<"missing_checked">>, Stats, 0)}, - {<<"missing_found">>, - proplists:get_value(<<"missing_found">>, OldStats, 0) + - proplists:get_value(<<"missing_found">>, Stats, 0)}, - {<<"docs_read">>, - proplists:get_value(<<"docs_read">>, OldStats, 0) + - proplists:get_value(<<"docs_read">>, Stats, 0)}, - {<<"docs_written">>, - proplists:get_value(<<"docs_written">>, OldStats, 0) + - proplists:get_value(<<"docs_written">>, Stats, 0)} - ], + TotalStats = sum_rep_stats(Stats, AccStats), #doc_info{update_seq=LastSeq} = lists:last(DocInfoList), enum_docs_since(DbSource, DbTarget, LastSeq, {LastSeq, TotalStats})