Author: fdmanana Date: Fri Oct 1 12:25:43 2010 New Revision: 1003509 URL: http://svn.apache.org/viewvc?rev=1003509&view=rev Log: New replicator: moving specific code to new files, to make the code shorter and easier to read/modify. Added: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am couchdb/branches/new_replicator/src/couchdb/couch_db.hrl couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/Makefile.am?rev=1003509&r1=1003508&r2=1003509&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/Makefile.am (original) +++ couchdb/branches/new_replicator/src/couchdb/Makefile.am Fri Oct 1 12:25:43 2010 @@ -79,7 +79,9 @@ source_files = \ couch_view_group.erl \ couch_db_updater.erl \ couch_work_queue.erl \ - couch_replicate.erl \ + couch_replicator.erl \ + couch_replicator_rev_finders.erl \ + couch_replicator_doc_copiers.erl \ couch_replicator_utils.erl \ couch_replication_notifier.erl \ couch_httpd_rep.erl \ @@ -144,7 +146,9 @@ compiled_files = \ couch_view_group.beam \ couch_db_updater.beam \ couch_work_queue.beam \ - couch_replicate.beam \ + couch_replicator.beam \ + couch_replicator_rev_finders.beam \ + couch_replicator_doc_copiers.beam \ couch_replicator_utils.beam \ couch_replication_notifier.beam \ couch_httpd_rep.beam \ Modified: couchdb/branches/new_replicator/src/couchdb/couch_db.hrl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_db.hrl?rev=1003509&r1=1003508&r2=1003509&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_db.hrl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_db.hrl Fri Oct 1 12:25:43 2010 @@ -288,3 +288,10 @@ include_docs = false }). +-record(rep_stats, { + missing_checked = 0, + missing_found = 0, + docs_read = 0, + docs_written = 0, + doc_write_failures = 0 +}). Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=1003509&r1=1003508&r2=1003509&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Fri Oct 1 12:25:43 2010 @@ -26,7 +26,7 @@ handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) -> RepDoc = couch_httpd:json_body_obj(Req), {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepDoc, UserCtx), - case couch_replicate:replicate(Rep) of + case couch_replicator:replicate(Rep) of {error, Reason} -> try send_json(Req, 500, {[{error, Reason}]}) Added: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1003509&view=auto ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (added) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Fri Oct 1 12:25:43 2010 @@ -0,0 +1,712 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator). +-behaviour(gen_server). + +% public API +-export([replicate/1]). + +% gen_server callbacks +-export([init/1, terminate/2, code_change/3]). +-export([handle_call/3, handle_cast/2, handle_info/2]). + +-include("couch_db.hrl"). +-include("couch_api_wrap.hrl"). + +-import(couch_util, [ + get_value/2, + get_value/3 +]). + +% Can't be greater than the maximum number of child restarts specified +% in couch_rep_sup.erl. +-define(MAX_RESTARTS, 3). + + +-record(rep_state, { + rep_details, + source_name, + target_name, + source, + target, + history, + checkpoint_history, + start_seq, + current_through_seq, + next_through_seqs = ordsets:new(), + is_successor_seq, + committed_seq, + source_log, + target_log, + rep_starttime, + src_starttime, + tgt_starttime, + timer, % checkpoint timer + missing_revs_queue, + changes_queue, + changes_reader, + missing_rev_finders, + doc_copiers, + finished_doc_copiers = 0, + seqs_in_progress = gb_trees:from_orddict([]), + stats = #rep_stats{} + }). + + +replicate(#rep{id = RepId, options = Options} = Rep) -> + case get_value(cancel, Options, false) of + true -> + end_replication(RepId); + false -> + {ok, Listener} = rep_result_listener(RepId), + Result = do_replication_loop(Rep), + couch_replication_notifier:stop(Listener), + Result + end. + + +do_replication_loop(#rep{options = Options, source = Src} = Rep) -> + DocIds = get_value(doc_ids, Options), + Continuous = get_value(continuous, Options, false), + Seq = case {DocIds, Continuous} of + {undefined, false} -> + last_seq(Src, Rep#rep.user_ctx); + _ -> + undefined + end, + do_replication_loop(Rep, Seq). + +do_replication_loop(#rep{id = {BaseId,_} = Id, options = Options} = Rep, Seq) -> + case start_replication(Rep) of + {ok, _Pid} -> + case get_value(continuous, Options, false) of + true -> + {ok, {continuous, ?l2b(BaseId)}}; + false -> + Result = wait_for_result(Id), + maybe_retry(Result, Rep, Seq) + end; + Error -> + Error + end. + + +maybe_retry(RepResult, _Rep, undefined) -> + RepResult; +maybe_retry({ok, {Props}} = Result, Rep, Seq) -> + case get_value(source_last_seq, Props) >= Seq of + true -> + Result; + false -> + do_replication_loop(Rep, Seq) + end; +maybe_retry(RepResult, _Rep, _Seq) -> + RepResult. + + +last_seq(DbName, UserCtx) -> + case (catch couch_api_wrap:db_open(DbName, [{user_ctx, UserCtx}])) of + {ok, Db} -> + {ok, DbInfo} = couch_api_wrap:get_db_info(Db), + couch_api_wrap:db_close(Db), + get_value(<<"update_seq">>, DbInfo); + _ -> + undefined + end. + + +start_replication(#rep{id = {BaseId, Extension}} = Rep) -> + RepChildId = BaseId ++ Extension, + ChildSpec = { + RepChildId, + {gen_server, start_link, [?MODULE, Rep, []]}, + transient, + 1, + worker, + [?MODULE] + }, + case supervisor:start_child(couch_rep_sup, ChildSpec) of + {ok, Pid} -> + ?LOG_INFO("starting new replication ~p at ~p", [RepChildId, Pid]), + {ok, Pid}; + {error, already_present} -> + case supervisor:restart_child(couch_rep_sup, RepChildId) of + {ok, Pid} -> + ?LOG_INFO("starting replication ~p at ~p", [RepChildId, Pid]), + {ok, Pid}; + {error, running} -> + %% this error occurs if multiple replicators are racing + %% each other to start and somebody else won. Just grab + %% the Pid by calling start_child again. + {error, {already_started, Pid}} = + supervisor:start_child(couch_rep_sup, ChildSpec), + ?LOG_DEBUG("replication ~p already running at ~p", + [RepChildId, Pid]), + {ok, Pid}; + {error, _} = Err -> + Err + end; + {error, {already_started, Pid}} -> + ?LOG_DEBUG("replication ~p already running at ~p", [RepChildId, Pid]), + {ok, Pid}; + {error, {Error, _}} -> + {error, Error} + end. + + +rep_result_listener(RepId) -> + ReplyTo = self(), + {ok, _Listener} = couch_replication_notifier:start_link( + fun({_, RepId2, _} = Ev) when RepId2 =:= RepId -> + ReplyTo ! Ev; + (_) -> + ok + end). + + +wait_for_result(RepId) -> + wait_for_result(RepId, ?MAX_RESTARTS). + +wait_for_result(RepId, RetriesLeft) -> + receive + {finished, RepId, RepResult} -> + {ok, RepResult}; + {error, RepId, Reason} -> + case RetriesLeft > 0 of + true -> + wait_for_result(RepId, RetriesLeft - 1); + false -> + {error, Reason} + end + end. + + +end_replication({BaseId, Extension}) -> + FullRepId = BaseId ++ Extension, + case supervisor:terminate_child(couch_rep_sup, FullRepId) of + {error, not_found} = R -> + R; + ok -> + ok = supervisor:delete_child(couch_rep_sup, FullRepId), + {ok, {cancelled, ?l2b(BaseId)}} + end. + + +init(InitArgs) -> + try + do_init(InitArgs) + catch + throw:Error -> + {stop, Error} + end. + +do_init(#rep{options = Options} = Rep) -> + process_flag(trap_exit, true), + + #rep_state{ + source = Source, + target = Target, + start_seq = StartSeq + } = State = init_state(Rep), + + {ok, MissingRevsQueue} = couch_work_queue:new( + [{max_size, 100000}, {max_items, 2000}, {multi_workers, true}]), + + {RevFindersCount, CopiersCount} = case ?l2i( + couch_config:get("replicator", "worker_processes", "10")) of + Small when Small < 2 -> + ?LOG_ERROR("The number of worker processes for the replicator " + "should be at least 2", []), + {1, 1}; + N -> + {N div 2, (N div 2) + (N rem 2)} + end, + + case get_value(doc_ids, Options) of + undefined -> + {ok, ChangesQueue} = couch_work_queue:new( + [{max_size, 100000}, {max_items, 500}, {multi_workers, true}]), + + % This starts the _changes reader process. It adds the changes from + % the source db to the ChangesQueue. + ChangesReader = spawn_changes_reader(self(), StartSeq, Source, + ChangesQueue, Options), + + % This starts the missing rev finders. They check the target for changes + % in the ChangesQueue to see if they exist on the target or not. If not, + % adds them to MissingRevsQueue. + MissingRevFinders = + couch_replicator_rev_finders:spawn_missing_rev_finders(self(), + Target, ChangesQueue, MissingRevsQueue, RevFindersCount); + DocIds -> + ChangesQueue = nil, + ChangesReader = nil, + MissingRevFinders = + couch_replicator_rev_finders:spawn_missing_rev_finders(self(), + Target, DocIds, MissingRevsQueue, RevFindersCount) + end, + + % This starts the doc copy processes. They fetch documents from the + % MissingRevsQueue and copy them from the source to the target database. + DocCopiers = couch_replicator_doc_copiers:spawn_doc_copiers( + self(), Source, Target, MissingRevsQueue, CopiersCount), + + {ok, State#rep_state{ + missing_revs_queue = MissingRevsQueue, + changes_queue = ChangesQueue, + changes_reader = ChangesReader, + missing_rev_finders = MissingRevFinders, + doc_copiers = DocCopiers, + is_successor_seq = get_value(is_successor_seq, Options, + fun(Seq, NextSeq) -> (Seq + 1) =:= NextSeq end) + } + }. + + +handle_info({seq_start, {Seq, NumChanges}}, State) -> + SeqsInProgress2 = gb_trees:insert(Seq, NumChanges, + State#rep_state.seqs_in_progress), + {noreply, State#rep_state{seqs_in_progress = SeqsInProgress2}}; + +handle_info({seq_changes_done, {Seq, NumChangesDone}}, State) -> + #rep_state{ + seqs_in_progress = SeqsInProgress, + next_through_seqs = DoneSeqs, + is_successor_seq = IsSuccFun + } = State, + % Decrement the # changes for this seq by NumChangesDone. + TotalChanges = gb_trees:get(Seq, SeqsInProgress), + NewState = case TotalChanges - NumChangesDone of + 0 -> + % This seq is completely processed. Check to see if it was the + % smallest seq in progess. If so, we've made progress that can + % be checkpointed. + State2 = case gb_trees:smallest(SeqsInProgress) of + {Seq, _} -> + {CheckpointSeq, DoneSeqs2} = next_seq_before_gap( + Seq, DoneSeqs, IsSuccFun), + State#rep_state{ + current_through_seq = CheckpointSeq, + next_through_seqs = DoneSeqs2 + }; + _ -> + DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs), + State#rep_state{next_through_seqs = DoneSeqs2} + end, + State2#rep_state{ + seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress) + }; + NewTotalChanges when NewTotalChanges > 0 -> + % There are still some changes that need work done. + % Put the new count back. + State#rep_state{ + seqs_in_progress = + gb_trees:update(Seq, NewTotalChanges, SeqsInProgress) + } + end, + {noreply, NewState}; + +handle_info({add_stat, {StatPos, Val}}, #rep_state{stats = Stats} = State) -> + Stat = element(StatPos, Stats), + NewStats = setelement(StatPos, Stats, Stat + Val), + {noreply, State#rep_state{stats = NewStats}}; + +handle_info({done, _CopierId}, State) -> + #rep_state{ + finished_doc_copiers = Finished, + doc_copiers = DocCopiers, + next_through_seqs = DoneSeqs, + current_through_seq = Seq + } = State, + State1 = State#rep_state{finished_doc_copiers = Finished + 1}, + case length(DocCopiers) - 1 of + Finished -> + % This means all the worker processes have completed their work. + % Assert that all the seqs have been processed. + 0 = gb_trees:size(State#rep_state.seqs_in_progress), + LastSeq = case DoneSeqs of + [] -> + Seq; + _ -> + lists:max([Seq, lists:last(DoneSeqs)]) + end, + State2 = do_checkpoint(State1#rep_state{current_through_seq = LastSeq}), + cancel_timer(State2), + {stop, normal, State2}; + _ -> + {noreply, State1} + end; + +handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) -> + {noreply, State}; + +handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) -> + cancel_timer(State), + ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]), + {stop, changes_reader_died, State}; + +handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_queue=Pid} = St) -> + {noreply, St}; + +handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_queue=Pid} = St) -> + cancel_timer(St), + ?LOG_ERROR("MissingRevsQueue process died with reason: ~p", [Reason]), + {stop, missing_revs_queue_died, St}; + +handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> + {noreply, State}; + +handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) -> + cancel_timer(State), + ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]), + {stop, changes_queue_died, State}; + +handle_info({'EXIT', Pid, normal}, State) -> + #rep_state{ + doc_copiers = DocCopiers, + missing_rev_finders = RevFinders, + missing_revs_queue = RevsQueue + } = State, + case get_value(Pid, RevFinders) of + undefined -> + case get_value(Pid, DocCopiers) of + undefined -> + {stop, {unknown_process_died, Pid, normal}, State}; + _CopierId -> + {noreply, State} + end; + _FinderId -> + case lists:keydelete(Pid, 1, RevFinders) of + [] -> + couch_work_queue:close(RevsQueue), + {noreply, State#rep_state{missing_rev_finders = []}}; + RevFinders2 -> + {noreply, State#rep_state{missing_rev_finders = RevFinders2}} + end + end; + +handle_info({'EXIT', Pid, Reason}, State) -> + #rep_state{ + doc_copiers = DocCopiers, + missing_rev_finders = RevFinders + } = State, + cancel_timer(State), + case get_value(Pid, DocCopiers) of + undefined -> + case get_value(Pid, RevFinders) of + undefined -> + {stop, {unknown_process_died, Pid, Reason}, State}; + FinderId -> + ?LOG_ERROR("RevsFinder process ~p died with reason: ~p", + [FinderId, Reason]), + {stop, {revs_finder_died, Pid, Reason}, State} + end; + CopierId -> + ?LOG_ERROR("DocCopier process ~p died with reason: ~p", + [CopierId, Reason]), + {stop, {doc_copier_died, Pid, Reason}, State} + end. + + +handle_call(Msg, _From, State) -> + ?LOG_ERROR("Replicator received an unexpected synchronous call: ~p", [Msg]), + {stop, unexpected_sync_message, State}. + + +handle_cast(checkpoint, State) -> + State2 = do_checkpoint(State), + {noreply, State2#rep_state{timer = start_timer(State)}}; + +handle_cast(Msg, State) -> + ?LOG_ERROR("Replicator received an unexpected asynchronous call: ~p", [Msg]), + {stop, unexpected_async_message, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +terminate(normal, #rep_state{rep_details = #rep{id = RepId}} = State) -> + terminate_cleanup(State), + couch_replication_notifier:notify({finished, RepId, get_result(State)}); + +terminate(shutdown, State) -> + % cancelled replication throught ?MODULE:end_replication/1 + terminate_cleanup(State); + +terminate(Reason, #rep_state{rep_details = #rep{id = RepId}} = State) -> + terminate_cleanup(State), + couch_replication_notifier:notify({error, RepId, Reason}). + + +terminate_cleanup(#rep_state{source = Source, target = Target}) -> + couch_api_wrap:db_close(Source), + couch_api_wrap:db_close(Target). + + +start_timer(#rep_state{rep_details = #rep{options = Options}} = State) -> + case get_value(doc_ids, Options) of + undefined -> + After = checkpoint_interval(State), + case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of + {ok, Ref} -> + Ref; + Error -> + ?LOG_ERROR("Replicator, error scheduling checkpoint: ~p", [Error]), + nil + end; + _DocIdList -> + nil + end. + + +cancel_timer(#rep_state{timer = nil}) -> + ok; +cancel_timer(#rep_state{timer = Timer}) -> + {ok, cancel} = timer:cancel(Timer). + + +get_result(#rep_state{stats = Stats, rep_details = Rep} = State) -> + case get_value(doc_ids, Rep#rep.options) of + undefined -> + State#rep_state.checkpoint_history; + _DocIdList -> + {[ + {<<"start_time">>, ?l2b(State#rep_state.rep_starttime)}, + {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())}, + {<<"docs_read">>, Stats#rep_stats.docs_read}, + {<<"docs_written">>, Stats#rep_stats.docs_written}, + {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures} + ]} + end. + + +init_state(Rep) -> + #rep{ + id = {BaseId, _Ext}, + source = Src, target = Tgt, + options = Options, user_ctx = UserCtx + } = Rep, + {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]), + {ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}], + get_value(create_target, Options, false)), + + {ok, SourceInfo} = couch_api_wrap:get_db_info(Source), + {ok, TargetInfo} = couch_api_wrap:get_db_info(Target), + + DocId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId), + case couch_api_wrap:open_doc(Source, DocId, []) of + {ok, SourceLog} -> SourceLog; + _ -> SourceLog = #doc{id=DocId} + end, + case couch_api_wrap:open_doc(Target, DocId, []) of + {ok, TargetLog} -> TargetLog; + _ -> TargetLog = #doc{id=DocId} + end, + {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), + #doc{body={CheckpointHistory}} = SourceLog, + State = #rep_state{ + rep_details = Rep, + source_name = couch_api_wrap:db_uri(Source), + target_name = couch_api_wrap:db_uri(Target), + source = Source, + target = Target, + history = History, + checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]}, + start_seq = StartSeq, + current_through_seq = StartSeq, + committed_seq = StartSeq, + source_log = SourceLog, + target_log = TargetLog, + rep_starttime = httpd_util:rfc1123_date(), + src_starttime = get_value(<<"instance_start_time">>, SourceInfo), + tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo) + }, + State#rep_state{timer = start_timer(State)}. + + +spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue, Options) -> + spawn_link( + fun()-> + couch_api_wrap:changes_since(Source, all_docs, StartSeq, + fun(#doc_info{high_seq=Seq, revs=Revs} = DocInfo) -> + Cp ! {seq_start, {Seq, length(Revs)}}, + Cp ! {add_stat, {#rep_stats.missing_checked, length(Revs)}}, + ok = couch_work_queue:queue(ChangesQueue, DocInfo) + end, Options), + couch_work_queue:close(ChangesQueue) + end). + + +checkpoint_interval(_State) -> + 5000. + +do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) -> + State; +do_checkpoint(State) -> + #rep_state{ + source_name=SourceName, + target_name=TargetName, + source = Source, + target = Target, + history = OldHistory, + start_seq = StartSeq, + current_through_seq = NewSeq, + source_log = SourceLog, + target_log = TargetLog, + rep_starttime = ReplicationStartTime, + src_starttime = SrcInstanceStartTime, + tgt_starttime = TgtInstanceStartTime, + stats = Stats + } = State, + case commit_to_both(Source, Target) of + {SrcInstanceStartTime, TgtInstanceStartTime} -> + ?LOG_INFO("recording a checkpoint for ~p -> ~p at source update_seq ~p", + [SourceName, TargetName, NewSeq]), + SessionId = couch_uuids:random(), + NewHistoryEntry = {[ + {<<"session_id">>, SessionId}, + {<<"start_time">>, list_to_binary(ReplicationStartTime)}, + {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_last_seq">>, StartSeq}, + {<<"end_last_seq">>, NewSeq}, + {<<"recorded_seq">>, NewSeq}, + {<<"missing_checked">>, Stats#rep_stats.missing_checked}, + {<<"missing_found">>, Stats#rep_stats.missing_found}, + {<<"docs_read">>, Stats#rep_stats.docs_read}, + {<<"docs_written">>, Stats#rep_stats.docs_written}, + {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures} + ]}, + % limit history to 50 entries + NewRepHistory = {[ + {<<"session_id">>, SessionId}, + {<<"source_last_seq">>, NewSeq}, + {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} + ]}, + + try + {ok, {SrcRevPos,SrcRevId}} = couch_api_wrap:update_doc(Source, + SourceLog#doc{body=NewRepHistory}, [delay_commit]), + {ok, {TgtRevPos,TgtRevId}} = couch_api_wrap:update_doc(Target, + TargetLog#doc{body=NewRepHistory}, [delay_commit]), + State#rep_state{ + checkpoint_history = NewRepHistory, + committed_seq = NewSeq, + source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, + target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} + } + catch throw:conflict -> + ?LOG_ERROR("checkpoint failure: conflict (are you replicating to " + "yourself?)", []), + State + end; + _Else -> + ?LOG_INFO("rebooting ~p -> ~p from last known replication checkpoint", + [SourceName, TargetName]), + throw(restart) + end. + + +next_seq_before_gap(Seq, [], _IsSuccFun) -> + {Seq, []}; + +next_seq_before_gap(Seq, [Next | NextSeqs] = AllSeqs , IsSuccFun) -> + case IsSuccFun(Seq, Next) of + false -> + {Seq, AllSeqs}; + true -> + next_seq_before_gap(Next, NextSeqs, IsSuccFun) + end. + + +commit_to_both(Source, Target) -> + % commit the src async + ParentPid = self(), + SrcCommitPid = spawn_link( + fun() -> + ParentPid ! {self(), couch_api_wrap:ensure_full_commit(Source)} + end), + + % commit tgt sync + {ok, TargetStartTime} = couch_api_wrap:ensure_full_commit(Target), + + SourceStartTime = + receive + {SrcCommitPid, {ok, Timestamp}} -> + receive + {'EXIT', SrcCommitPid, normal} -> + ok + end, + Timestamp; + {'EXIT', SrcCommitPid, _} -> + exit(replication_link_failure) + end, + {SourceStartTime, TargetStartTime}. + + +compare_replication_logs(SrcDoc, TgtDoc) -> + #doc{body={RepRecProps}} = SrcDoc, + #doc{body={RepRecPropsTgt}} = TgtDoc, + case get_value(<<"session_id">>, RepRecProps) == + get_value(<<"session_id">>, RepRecPropsTgt) of + true -> + % if the records have the same session id, + % then we have a valid replication history + OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, 0), + OldHistory = get_value(<<"history">>, RepRecProps, []), + {OldSeqNum, OldHistory}; + false -> + SourceHistory = get_value(<<"history">>, RepRecProps, []), + TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []), + ?LOG_INFO("Replication records differ. " + "Scanning histories to find a common ancestor.", []), + ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", + [RepRecProps, RepRecPropsTgt]), + compare_rep_history(SourceHistory, TargetHistory) + end. + +compare_rep_history(S, T) when S =:= [] orelse T =:= [] -> + ?LOG_INFO("no common ancestry -- performing full replication", []), + {0, []}; +compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) -> + SourceId = get_value(<<"session_id">>, S), + case has_session_id(SourceId, Target) of + true -> + RecordSeqNum = get_value(<<"recorded_seq">>, S, 0), + ?LOG_INFO("found a common replication record with source_seq ~p", + [RecordSeqNum]), + {RecordSeqNum, SourceRest}; + false -> + TargetId = get_value(<<"session_id">>, T), + case has_session_id(TargetId, SourceRest) of + true -> + RecordSeqNum = get_value(<<"recorded_seq">>, T, 0), + ?LOG_INFO("found a common replication record with source_seq ~p", + [RecordSeqNum]), + {RecordSeqNum, TargetRest}; + false -> + compare_rep_history(SourceRest, TargetRest) + end + end. + + +has_session_id(_SessionId, []) -> + false; +has_session_id(SessionId, [{Props} | Rest]) -> + case get_value(<<"session_id">>, Props, nil) of + SessionId -> + true; + _Else -> + has_session_id(SessionId, Rest) + end. + Added: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1003509&view=auto ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (added) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Fri Oct 1 12:25:43 2010 @@ -0,0 +1,133 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_doc_copiers). + +-export([spawn_doc_copiers/5]). + +-include("couch_db.hrl"). +-include("couch_api_wrap.hrl"). + +-define(DOC_BATCH_SIZE, 50). + + + +spawn_doc_copiers(Cp, Source, Target, MissingRevsQueue, CopiersCount) -> + lists:map( + fun(CopierId) -> + Pid = spawn_link(fun() -> + doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) + end), + {Pid, CopierId} + end, + lists:seq(1, CopiersCount)). + + +doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) -> + case couch_work_queue:dequeue(MissingRevsQueue, ?DOC_BATCH_SIZE) of + closed -> + ?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [CopierId]), + Cp ! {done, CopierId}; + {ok, [{doc_id, _} | _] = DocIds} -> + {BulkList, []} = lists:foldl( + fun({doc_id, Id}, Acc) -> + ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [CopierId, Id]), + {ok, Acc2} = couch_api_wrap:open_doc_revs( + Source, Id, all, [], + fun(R, A) -> doc_handler(R, nil, Target, Cp, A) end, Acc), + Acc2 + end, + {[], []}, DocIds), + bulk_write_docs(lists:reverse(BulkList), [], Target, Cp), + doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue); + {ok, IdRevList} -> + {Source2, {BulkList, SeqList}} = lists:foldl( + fun({Id, Revs, PossibleAncestors, Seq} = IdRev, {SrcDb, BulkAcc}) -> + ?LOG_DEBUG("Doc copier ~p got ~p", [CopierId, IdRev]), + SrcDb2 = couch_api_wrap:maybe_reopen_db(SrcDb, Seq), + {ok, BulkAcc2} = couch_api_wrap:open_doc_revs( + SrcDb2, Id, Revs, [{atts_since, PossibleAncestors}], + fun(R, A) -> doc_handler(R, Seq, Target, Cp, A) end, + BulkAcc), + {SrcDb2, BulkAcc2} + end, + {Source, {[], []}}, IdRevList), + bulk_write_docs( + lists:reverse(BulkList), + lists:reverse(SeqList), + Target, + Cp), + doc_copy_loop(CopierId, Cp, Source2, Target, MissingRevsQueue) + end. + + +doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Cp, Acc) -> + Cp ! {add_stat, {#rep_stats.docs_read, 1}}, + update_bulk_doc_acc(Acc, Seq, Doc); + +doc_handler({ok, Doc}, Seq, Target, Cp, Acc) -> + Cp ! {add_stat, {#rep_stats.docs_read, 1}}, + write_doc(Doc, Seq, Target, Cp), + Acc; + +doc_handler(_, _, _, _, Acc) -> + Acc. + + +update_bulk_doc_acc({DocAcc, SeqAcc}, nil, Doc) -> + {[Doc | DocAcc], SeqAcc}; +update_bulk_doc_acc({DocAcc, [{Seq, Count} | RestSeq]}, Seq, Doc) -> + {[Doc | DocAcc], [{Seq, Count + 1} | RestSeq]}; +update_bulk_doc_acc({DocAcc, SeqAcc}, Seq, Doc) -> + {[Doc | DocAcc], [{Seq, 1} | SeqAcc]}. + + +write_doc(Doc, Seq, Db, Cp) -> + case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of + {ok, _} -> + Cp ! {add_stat, {#rep_stats.docs_written, 1}}; + {error, <<"unauthorized">>} -> + Cp ! {add_stat, {#rep_stats.doc_write_failures, 1}}, + ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s", + [Doc#doc.id, couch_api_wrap:db_uri(Db)]); + _ -> + Cp ! {add_stat, {#rep_stats.doc_write_failures, 1}} + end, + seqs_done([{Seq, 1}], Cp). + + +bulk_write_docs(Docs, Seqs, Db, Cp) -> + case couch_api_wrap:update_docs( + Db, Docs, [delay_commit], replicated_changes) of + {ok, []} -> + Cp ! {add_stat, {#rep_stats.docs_written, length(Docs)}}; + {ok, Errors} -> + Cp ! {add_stat, {#rep_stats.doc_write_failures, length(Errors)}}, + Cp ! {add_stat, {#rep_stats.docs_written, length(Docs) - length(Errors)}}, + DbUri = couch_api_wrap:db_uri(Db), + lists:foreach( + fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) -> + ?LOG_ERROR("Replicator: unauthorized to write document" + " ~s to ~s", [Id, DbUri]); + (_) -> + ok + end, Errors) + end, + seqs_done(Seqs, Cp). + + +seqs_done(SeqCounts, Cp) -> + lists:foreach(fun({nil, _}) -> + ok; + (SeqCount) -> + Cp ! {seq_changes_done, SeqCount} + end, SeqCounts). Added: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1003509&view=auto ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (added) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Fri Oct 1 12:25:43 2010 @@ -0,0 +1,99 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_rev_finders). + +-export([spawn_missing_rev_finders/5]). + +-include("couch_db.hrl"). + +-define(REV_BATCH_SIZE, 100). + + + +spawn_missing_rev_finders(_, _, DocIds, MissingRevsQueue, _) + when is_list(DocIds) -> + lists:foreach( + fun(DocId) -> + % Ensure same behaviour as old replicator: accept a list of percent + % encoded doc IDs. + Id = ?l2b(couch_httpd:unquote(DocId)), + ok = couch_work_queue:queue(MissingRevsQueue, {doc_id, Id}) + end, DocIds), + couch_work_queue:close(MissingRevsQueue), + []; + +spawn_missing_rev_finders(StatsProcess, + Target, ChangesQueue, MissingRevsQueue, RevFindersCount) -> + lists:map( + fun(FinderId) -> + Pid = spawn_link(fun() -> + missing_revs_finder_loop(FinderId, StatsProcess, + Target, ChangesQueue, MissingRevsQueue) + end), + {Pid, FinderId} + end, lists:seq(1, RevFindersCount)). + + +missing_revs_finder_loop(FinderId, Cp, Target, ChangesQueue, RevsQueue) -> + case couch_work_queue:dequeue(ChangesQueue, ?REV_BATCH_SIZE) of + closed -> + ok; + {ok, DocInfos} -> + IdRevs = [{Id, [Rev || #rev_info{rev=Rev} <- RevsInfo]} || + #doc_info{id=Id, revs=RevsInfo} <- DocInfos], + ?LOG_DEBUG("Revs finder ~p got ~p IdRev pairs from queue", + [FinderId, length(IdRevs)]), + {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs), + ?LOG_DEBUG("Revs finder ~p found ~p missing IdRev pairs", + [FinderId, length(Missing)]), + % Figured out which on the target are missing. + % Missing contains the id and revs missing, and any possible + % ancestors that already exist on the target. This enables + % incremental attachment replication, so the source only needs to send + % attachments modified since the common ancestor on target. + + % Signal to the checkpointer any that are already on the target are + % now complete. + IdRevsSeqDict = dict:from_list( + [{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} || + #doc_info{id=Id, revs=RevsInfo, high_seq=Seq} <- DocInfos]), + NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing), + % signal the completion of these that aren't missing + lists:foreach(fun({_Id, {Revs, Seq}}) -> + Cp ! {seq_changes_done, {Seq, length(Revs)}} + end, dict:to_list(NonMissingIdRevsSeqDict)), + + % Expand out each docs and seq into it's own work item + lists:foreach(fun({Id, Revs, PAs}) -> + % PA means "possible ancestor" + Cp ! {add_stat, {#rep_stats.missing_found, length(Revs)}}, + {_, Seq} = dict:fetch(Id, IdRevsSeqDict), + ok = couch_work_queue:queue(RevsQueue, {Id, Revs, PAs, Seq}) + end, Missing), + missing_revs_finder_loop(FinderId, Cp, Target, ChangesQueue, RevsQueue) + end. + + +remove_missing(IdRevsSeqDict, []) -> + IdRevsSeqDict; + +remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _} | Rest]) -> + {AllChangedRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict), + case AllChangedRevs -- MissingRevs of + [] -> + remove_missing(dict:erase(MissingId, IdRevsSeqDict), Rest); + NotMissingRevs -> + IdRevsSeqDict2 = + dict:store(MissingId, {NotMissingRevs, Seq}, IdRevsSeqDict), + remove_missing(IdRevsSeqDict2, Rest) + end.