Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 15CBF6B6F for ; Fri, 8 Jul 2011 18:39:11 +0000 (UTC) Received: (qmail 11104 invoked by uid 500); 8 Jul 2011 18:39:10 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 10975 invoked by uid 500); 8 Jul 2011 18:39:10 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 10960 invoked by uid 99); 8 Jul 2011 18:39:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2011 18:39:09 +0000 X-ASF-Spam-Status: No, hits=-1996.4 required=5.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2011 18:39:06 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D434223889E3 for ; Fri, 8 Jul 2011 18:38:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1144419 - in /couchdb/trunk/src/couchdb: couch_replicator.erl couch_replicator_doc_copier.erl couch_replicator_rev_finder.erl Date: Fri, 08 Jul 2011 18:38:44 -0000 To: commits@couchdb.apache.org From: fdmanana@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110708183844.D434223889E3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fdmanana Date: Fri Jul 8 18:38:44 2011 New Revision: 1144419 URL: http://svn.apache.org/viewvc?rev=1144419&view=rev Log: Make replicator agnostic about the update seq type To determine which row, coming from a _changes feed, came earlier (or later), the replicator compared their respective update seq field which is a number in Apache CouchDB. This worked fine, as rows coming from the _changes stream are sorted by they're update seq field. However it didn't work when the update seq is not a number, such as in the case of BigCouch for example (strings). This change allows for pull replicating from a BigCouch cluster, simplifies some code and makes it more robust overall. Modified: couchdb/trunk/src/couchdb/couch_replicator.erl couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl Modified: couchdb/trunk/src/couchdb/couch_replicator.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1144419&r1=1144418&r2=1144419&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator.erl (original) +++ couchdb/trunk/src/couchdb/couch_replicator.erl Fri Jul 8 18:38:44 2011 @@ -591,16 +591,22 @@ spawn_changes_reader(StartSeq, #httpdb{} spawn_link(fun() -> put(last_seq, StartSeq), put(retries_left, Db#httpdb.retries), + put(row_ts, 1), read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options) end); spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) -> - spawn_link(fun() -> read_changes(StartSeq, Db, ChangesQueue, Options) end). + spawn_link(fun() -> + put(row_ts, 1), + read_changes(StartSeq, Db, ChangesQueue, Options) + end). read_changes(StartSeq, Db, ChangesQueue, Options) -> try couch_api_wrap:changes_since(Db, all_docs, StartSeq, fun(#doc_info{high_seq = Seq} = DocInfo) -> - ok = couch_work_queue:queue(ChangesQueue, DocInfo), + Ts = get(row_ts), + ok = couch_work_queue:queue(ChangesQueue, {Ts, DocInfo}), + put(row_ts, Ts + 1), put(last_seq, Seq) end, Options), couch_work_queue:close(ChangesQueue) @@ -641,7 +647,7 @@ do_checkpoint(State) -> target = Target, history = OldHistory, start_seq = StartSeq, - current_through_seq = NewSeq, + current_through_seq = {_Ts, NewSeq} = NewTsSeq, source_log = SourceLog, target_log = TargetLog, rep_starttime = ReplicationStartTime, @@ -708,7 +714,7 @@ do_checkpoint(State) -> Target, TargetLog#doc{body = NewRepHistory}, target), NewState = State#rep_state{ checkpoint_history = NewRepHistory, - committed_seq = NewSeq, + committed_seq = NewTsSeq, source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} }, Modified: couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl?rev=1144419&r1=1144418&r2=1144419&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl (original) +++ couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl Fri Jul 8 18:38:44 2011 @@ -50,7 +50,6 @@ -record(state, { loop, - cp, max_parallel_conns, source, target, @@ -58,7 +57,6 @@ writer = nil, pending_fetch = nil, flush_waiter = nil, - highest_seq_seen = ?LOWEST_SEQ, stats = #rep_stats{}, source_db_compaction_notifier = nil, target_db_compaction_notifier = nil, @@ -69,7 +67,7 @@ start_link(Cp, #db{} = Source, Target, MissingRevsQueue, _MaxConns) -> Pid = spawn_link( - fun() -> queue_fetch_loop(Source, Target, Cp, MissingRevsQueue) end), + fun() -> queue_fetch_loop(Source, Target, Cp, Cp, MissingRevsQueue) end), {ok, Pid}; start_link(Cp, Source, Target, MissingRevsQueue, MaxConns) -> @@ -81,10 +79,9 @@ init({Cp, Source, Target, MissingRevsQue process_flag(trap_exit, true), Parent = self(), LoopPid = spawn_link( - fun() -> queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) end + fun() -> queue_fetch_loop(Source, Target, Parent, Cp, MissingRevsQueue) end ), State = #state{ - cp = Cp, max_parallel_conns = MaxConns, loop = LoopPid, source = open_db(Source), @@ -97,19 +94,18 @@ init({Cp, Source, Target, MissingRevsQue {ok, State}. -handle_call({seq_done, Seq, RevCount}, {Pid, _}, - #state{loop = Pid, highest_seq_seen = HighSeq, stats = Stats} = State) -> +handle_call({seq_done, _Seq, RevCount}, {Pid, _}, + #state{loop = Pid, stats = Stats} = State) -> NewState = State#state{ - highest_seq_seen = lists:max([Seq, HighSeq]), stats = Stats#rep_stats{ missing_checked = Stats#rep_stats.missing_checked + RevCount } }, {reply, ok, NewState}; -handle_call({fetch_doc, {_Id, Revs, _PAs, Seq} = Params}, {Pid, _} = From, +handle_call({fetch_doc, {_Id, Revs, _PAs, _Seq} = Params}, {Pid, _} = From, #state{loop = Pid, readers = Readers, pending_fetch = nil, - highest_seq_seen = HighSeq, stats = Stats, source = Src, target = Tgt, + stats = Stats, source = Src, target = Tgt, max_parallel_conns = MaxConns} = State) -> Stats2 = Stats#rep_stats{ missing_checked = Stats#rep_stats.missing_checked + length(Revs), @@ -119,14 +115,12 @@ handle_call({fetch_doc, {_Id, Revs, _PAs Size when Size < MaxConns -> Reader = spawn_doc_reader(Src, Tgt, Params), NewState = State#state{ - highest_seq_seen = lists:max([Seq, HighSeq]), stats = Stats2, readers = [Reader | Readers] }, {reply, ok, NewState}; _ -> NewState = State#state{ - highest_seq_seen = lists:max([Seq, HighSeq]), stats = Stats2, pending_fetch = {From, Params} }, @@ -243,33 +237,33 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) -> +queue_fetch_loop(Source, Target, Parent, Cp, MissingRevsQueue) -> case couch_work_queue:dequeue(MissingRevsQueue, 1) of closed -> ok; - {ok, [IdRevs]} -> + {ok, [{ReportSeq, IdRevs}]} -> case Source of #db{} -> Source2 = open_db(Source), Target2 = open_db(Target), - {Stats, HighSeqDone} = local_process_batch( - IdRevs, Source2, Target2, #batch{}, #rep_stats{}, ?LOWEST_SEQ), + Stats = local_process_batch( + IdRevs, Source2, Target2, #batch{}, #rep_stats{}), close_db(Source2), - close_db(Target2), - ok = gen_server:cast(Parent, {report_seq_done, HighSeqDone, Stats}), - ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]); + close_db(Target2); #httpdb{} -> - remote_process_batch(IdRevs, Parent) + remote_process_batch(IdRevs, Parent), + {ok, Stats} = gen_server:call(Parent, flush, infinity) end, - queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) + ok = gen_server:cast(Cp, {report_seq_done, ReportSeq, Stats}), + ?LOG_DEBUG("Worker reported completion of seq ~p", [ReportSeq]), + queue_fetch_loop(Source, Target, Parent, Cp, MissingRevsQueue) end. -local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats, HighestSeqDone) -> - {Stats, HighestSeqDone}; +local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats) -> + Stats; -local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size}, - Stats, HighestSeqDone) -> +local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size}, Stats) -> case Target of #httpdb{} -> ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]); @@ -277,14 +271,13 @@ local_process_batch([], _Source, Target, ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]) end, {Written, WriteFailures} = flush_docs(Target, Docs), - Stats2 = Stats#rep_stats{ + Stats#rep_stats{ docs_written = Stats#rep_stats.docs_written + Written, doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures - }, - {Stats2, HighestSeqDone}; + }; local_process_batch([{Seq, {Id, Revs, NotMissingCount, PAs}} | Rest], - Source, Target, Batch, Stats, HighestSeqSeen) -> + Source, Target, Batch, Stats) -> {ok, {_, DocList, Written0, WriteFailures0}} = fetch_doc( Source, {Id, Revs, PAs, Seq}, fun local_doc_handler/2, {Target, [], 0, 0}), @@ -303,12 +296,11 @@ local_process_batch([{Seq, {Id, Revs, No docs_written = Stats#rep_stats.docs_written + Written, doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures }, - local_process_batch( - Rest, Source, Target, Batch2, Stats2, lists:max([Seq, HighestSeqSeen])). + local_process_batch(Rest, Source, Target, Batch2, Stats2). -remote_process_batch([], Parent) -> - ok = gen_server:call(Parent, flush, infinity); +remote_process_batch([], _Parent) -> + ok; remote_process_batch([{Seq, {Id, Revs, NotMissing, PAs}} | Rest], Parent) -> case NotMissing > 0 of @@ -416,17 +408,13 @@ spawn_writer(Target, #batch{docs = DocLi end). -after_full_flush(#state{cp = Cp, stats = Stats, flush_waiter = Waiter, - highest_seq_seen = HighSeqDone} = State) -> - ok = gen_server:cast(Cp, {report_seq_done, HighSeqDone, Stats}), - ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]), - gen_server:reply(Waiter, ok), +after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) -> + gen_server:reply(Waiter, {ok, Stats}), State#state{ stats = #rep_stats{}, flush_waiter = nil, writer = nil, - batch = #batch{}, - highest_seq_seen = ?LOWEST_SEQ + batch = #batch{} }. Modified: couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl?rev=1144419&r1=1144418&r2=1144419&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl (original) +++ couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl Fri Jul 8 18:38:44 2011 @@ -35,23 +35,24 @@ missing_revs_finder_loop(Cp, Target, Cha closed -> ok; {ok, DocInfos} -> - #doc_info{high_seq = ReportSeq} = lists:last(DocInfos), + {Ts, #doc_info{high_seq = Seq}} = lists:last(DocInfos), + ReportSeq = {Ts, Seq}, ok = gen_server:cast(Cp, {report_seq, ReportSeq}), ?LOG_DEBUG("Missing revs finder defined report seq ~p", [ReportSeq]), IdRevs = [{Id, [Rev || #rev_info{rev = Rev} <- RevsInfo]} || - #doc_info{id = Id, revs = RevsInfo} <- DocInfos], + {_, #doc_info{id = Id, revs = RevsInfo}} <- DocInfos], Target2 = open_db(Target), {ok, Missing} = couch_api_wrap:get_missing_revs(Target2, IdRevs), close_db(Target2), - queue_missing_revs(Missing, DocInfos, RevsQueue), + queue_missing_revs(Missing, DocInfos, ReportSeq, RevsQueue), missing_revs_finder_loop(Cp, Target2, ChangesQueue, RevsQueue, BatchSize) end. -queue_missing_revs(Missing, DocInfos, Queue) -> +queue_missing_revs(Missing, DocInfos, ReportSeq, Queue) -> IdRevsSeqDict = dict:from_list( [{Id, {[Rev || #rev_info{rev = Rev} <- RevsInfo], Seq}} || - #doc_info{id = Id, revs = RevsInfo, high_seq = Seq} <- DocInfos]), + {_, #doc_info{id = Id, revs = RevsInfo, high_seq = Seq}} <- DocInfos]), AllDict = lists:foldl( fun({Id, MissingRevs, PAs}, Acc) -> {_, Seq} = dict:fetch(Id, IdRevsSeqDict), @@ -71,7 +72,7 @@ queue_missing_revs(Missing, DocInfos, Qu AllDict, non_missing(IdRevsSeqDict, Missing)), ?LOG_DEBUG("Missing revs finder adding batch of ~p IdRevs to work queue", [dict:size(AllDict2)]), - ok = couch_work_queue:queue(Queue, dict:to_list(AllDict2)). + ok = couch_work_queue:queue(Queue, {ReportSeq, dict:to_list(AllDict2)}). non_missing(NonMissingDict, []) ->