couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [03/44] couch-replicator commit: updated refs/heads/63012-defensive to 1afa5ea
Date Tue, 07 Jun 2016 11:05:07 GMT
Introduce couch_replicator_scheduler

BugzID: 63012


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/bb34ad58
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/bb34ad58
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/bb34ad58

Branch: refs/heads/63012-defensive
Commit: bb34ad585c6471b4a54e1e0daf390b41acd27350
Parents: 13326bb
Author: Robert Newson <rnewson@apache.org>
Authored: Mon Apr 11 20:12:26 2016 +0100
Committer: Robert Newson <rnewson@apache.org>
Committed: Fri May 13 15:21:20 2016 +0100

----------------------------------------------------------------------
 src/couch_replicator_scheduler.erl     | 357 +++++++++++
 src/couch_replicator_scheduler.hrl     |  15 +
 src/couch_replicator_scheduler_job.erl | 881 ++++++++++++++++++++++++++++
 src/couch_replicator_scheduler_sup.erl |  54 ++
 src/couch_replicator_sup.erl           |  12 +
 5 files changed, 1319 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/bb34ad58/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
new file mode 100644
index 0000000..c8cb8f5
--- /dev/null
+++ b/src/couch_replicator_scheduler.erl
@@ -0,0 +1,357 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+-behaviour(gen_server).
+-behaviour(config_listener).
+-vsn(1).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+
+%% public api
+-export([start_link/0, add_job/1, remove_job/1]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+-export([format_status/2]).
+
+%% config_listener callback
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+%% types
+-type event_type() :: started | stopped | crashed.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: [Events :: event()].
+
+%% definitions
+-define(MAX_HISTORY, 20).
+-define(MINIMUM_CRASH_INTERVAL, 60 * 1000000).
+
+-define(DEFAULT_MAX_JOBS, 100).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 60000).
+-record(state, {interval, timer, max_jobs, max_churn}).
+-record(job, {
+          id :: job_id(),
+          rep :: #rep{},
+          pid :: pid(),
+          history :: history()}).
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok | {error, already_added}.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+    Job = #job{
+        id = Rep#rep.id,
+        rep = Rep,
+        history = []},
+    gen_server:call(?MODULE, {add_job, Job}).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+    gen_server:call(?MODULE, {remove_job, Id}).
+
+
+%% gen_server functions
+
+init(_) ->
+    ?MODULE = ets:new(?MODULE, [named_table, {keypos, #job.id}]),
+    ok = config:listen_for_changes(?MODULE, self()),
+    Interval = config:get_integer("replicator", "interval", ?DEFAULT_SCHEDULER_INTERVAL),
+    MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS),
+    MaxChurn = config:get_integer("replicator", "max_churn", ?DEFAULT_MAX_CHURN),
+    {ok, Timer} = timer:send_after(Interval, reschedule),
+    {ok, #state{interval = Interval, max_jobs = MaxJobs, max_churn = MaxChurn, timer = Timer}}.
+
+
+handle_call({add_job, Job}, _From, State) ->
+    case add_job_int(Job) of
+        true ->
+            start_pending_jobs(State#state.max_jobs),
+            {reply, ok, State};
+        false ->
+            {reply, {error, already_added}, State}
+    end;
+
+handle_call({remove_job, Id}, _From, State) ->
+    case job_by_id(Id) of
+        {ok, Job} ->
+            ok = stop_job_int(Job),
+            true = remove_job_int(Job),
+            {reply, ok, State};
+        {error, not_found} ->
+            {reply, ok, State}
+    end;
+
+handle_call(_, _From, State) ->
+    {noreply, State}.
+
+
+handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs), MaxJobs > 0 ->
+    couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]),
+    {noreply, State#state{max_jobs = MaxJobs}};
+
+handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn), MaxChurn > 0 ->
+    couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]),
+    {noreply, State#state{max_churn = MaxChurn}};
+
+handle_cast({set_interval, Interval}, State) when is_integer(Interval), Interval > 0 ->
+    couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
+    {noreply, State#state{interval = Interval}};
+
+handle_cast(_, State) ->
+    {noreply, State}.
+
+
+handle_info(reschedule, State) ->
+    ok = reschedule(State#state.max_jobs, State#state.max_churn),
+    {ok, cancel} = timer:cancel(State#state.timer),
+    {ok, Timer} = timer:send_after(State#state.interval, reschedule),
+    {noreply, State#state{timer = Timer}};
+
+handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+    case job_by_pid(Pid) of
+        {ok, #job{}=Job0} ->
+            couch_log:notice("~p: Job ~p died with reason: ~p",
+                             [?MODULE, Job0#job.id, Reason]),
+            Job1 = update_history(Job0#job{pid = undefined}, crashed, os:timestamp()),
+            true = ets:insert(?MODULE, Job1),
+            start_pending_jobs(State#state.max_jobs),
+            {noreply, State};
+        {error, not_found} ->
+            % removed in remove_job and should not be reinserted.
+            {noreply, State}
+    end;
+
+handle_info(_, State) ->
+    {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+format_status(_Opt, [_PDict, State]) ->
+    [{max_jobs, State#state.max_jobs},
+     {running_jobs, running_job_count()},
+     {pending_jobs, pending_job_count()}].
+
+
+%% config listener functions
+
+handle_config_change("replicator", "max_jobs", V, _, Pid) ->
+    ok = gen_server:cast(Pid, {set_max_jobs, list_to_integer(V)}),
+    {ok, Pid};
+
+handle_config_change("replicator", "max_churn", V, _, Pid) ->
+    ok = gen_server:cast(Pid, {set_max_churn, list_to_integer(V)}),
+    {ok, Pid};
+
+handle_config_change("replicator", "interval", V, _, Pid) ->
+    ok = gen_server:cast(Pid, {set_interval, list_to_integer(V)}),
+    {ok, Pid};
+
+handle_config_change(_, _, _, _, Pid) ->
+    {ok, Pid}.
+
+
+handle_config_terminate(_, stop, _) ->
+    ok;
+
+handle_config_terminate(Self, _, _) ->
+    spawn(fun() ->
+        timer:sleep(5000),
+        config:listen_for_changes(?MODULE, Self)
+    end).
+
+
+%% private functions
+
+start_jobs(Count) ->
+    Runnable0 = pending_jobs(),
+    Runnable1 = lists:sort(fun oldest_job_first/2, Runnable0),
+    Runnable2 = lists:filter(fun not_recently_crashed/1, Runnable1),
+    Runnable3 = lists:sublist(Runnable2, Count),
+    lists:foreach(fun start_job_int/1, Runnable3).
+
+
+stop_jobs(Count) ->
+    Running0 = running_jobs(),
+    Running1 = lists:sort(fun oldest_job_first/2, Running0),
+    Running2 = lists:sublist(Running1, Count),
+    lists:foreach(fun stop_job_int/1, Running2).
+
+
+oldest_job_first(#job{} = A, #job{} = B) ->
+    last_started(A) =< last_started(B).
+
+
+not_recently_crashed(#job{} = Job) ->
+    case crash_history(Job) of
+        [] ->
+            true;
+        [{crashed, When} | _] ->
+            timer:now_diff(os:timestamp(), When)
+                >= ?MINIMUM_CRASH_INTERVAL
+    end.
+
+
+crash_history(#job{} = Job) ->
+    [Crash || {crashed, _When} = Crash <- Job#job.history].
+
+
+-spec add_job_int(#job{}) -> boolean().
+add_job_int(#job{} = Job) ->
+    ets:insert_new(?MODULE, Job).
+
+
+start_job_int(#job{pid = Pid}) when Pid /= undefined ->
+    ok;
+
+start_job_int(#job{} = Job0) ->
+    case couch_replicator_scheduler_sup:start_child(Job0#job.rep) of
+        {ok, Child} ->
+            monitor(process, Child),
+            Job1 = update_history(Job0#job{pid = Child}, started, os:timestamp()),
+            true = ets:insert(?MODULE, Job1),
+            couch_log:notice("~p: Job ~p started as ~p",
+                [?MODULE, Job1#job.id, Job1#job.pid]);
+        {error, Reason} ->
+            couch_log:notice("~p: Job ~p failed to start for reason ~p",
+                [?MODULE, Job0, Reason])
+    end.
+
+
+-spec stop_job_int(#job{}) -> ok | {error, term()}.
+stop_job_int(#job{pid = undefined}) ->
+    ok;
+
+stop_job_int(#job{} = Job0) ->
+    ok = couch_replicator_scheduler_sup:terminate_child(Job0#job.pid),
+    Job1 = update_history(Job0#job{pid = undefined}, stopped, os:timestamp()),
+    true = ets:insert(?MODULE, Job1),
+    couch_log:notice("~p: Job ~p stopped as ~p",
+        [?MODULE, Job0#job.id, Job0#job.pid]).
+
+
+-spec remove_job_int(#job{}) -> true.
+remove_job_int(#job{} = Job) ->
+    ets:delete(?MODULE, Job#job.id).
+
+
+-spec running_job_count() -> non_neg_integer().
+running_job_count() ->
+    ets:info(?MODULE, size) - pending_job_count().
+
+
+-spec running_jobs() -> [#job{}].
+running_jobs() ->
+    ets:tab2list(?MODULE) -- pending_jobs().
+
+
+-spec pending_job_count() -> non_neg_integer().
+pending_job_count() ->
+    MatchSpec = [{#job{pid='$1', _='_'}, [{'not', {'is_pid', '$1'}}], [true]}],
+    ets:select_count(?MODULE, MatchSpec).
+
+
+-spec pending_jobs() -> [#job{}].
+pending_jobs() ->
+    ets:match_object(?MODULE, #job{pid=undefined, _='_'}).
+
+
+-spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}.
+job_by_pid(Pid) when is_pid(Pid) ->
+    case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of
+        [] ->
+            {error, not_found};
+        [#job{}=Job] ->
+            {ok, Job}
+    end.
+
+-spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}.
+job_by_id(Id) ->
+    case ets:lookup(?MODULE, Id) of
+        [] ->
+            {error, not_found};
+        [#job{}=Job] ->
+            {ok, Job}
+    end.
+
+
+-spec reschedule(MaxJobs :: non_neg_integer(), MaxChurn :: non_neg_integer()) -> ok.
+reschedule(MaxJobs, MaxChurn)
+  when is_integer(MaxJobs), MaxJobs > 0, is_integer(MaxChurn), MaxChurn > 0 ->
+    Running = running_job_count(),
+    Pending = pending_job_count(),
+    stop_excess_jobs(MaxJobs, Running),
+    start_pending_jobs(MaxJobs, Running, Pending),
+    rotate_jobs(MaxJobs, MaxChurn, Running, Pending).
+
+
+stop_excess_jobs(Max, Running) when Running > Max ->
+    stop_jobs(Running - Max);
+
+stop_excess_jobs(_, _) ->
+    ok.
+
+
+start_pending_jobs(Max) ->
+    start_pending_jobs(Max, running_job_count(), pending_job_count()).
+
+
+start_pending_jobs(Max, Running, Pending) when Running < Max, Pending > 0 ->
+    start_jobs(Max - Running);
+
+start_pending_jobs(_, _, _) ->
+    ok.
+
+rotate_jobs(MaxJobs, MaxChurn, Running, Pending) when Running == MaxJobs, Pending > 0 ->
+    stop_jobs(min([Pending, Running, MaxChurn])),
+    start_jobs(min([Pending, Running, MaxChurn]));
+
+rotate_jobs(_, _, _, _) ->
+    ok.
+
+
+min(List) ->
+    hd(lists:sort(List)).
+
+
+-spec last_started(#job{}) -> erlang:timestamp().
+last_started(#job{} = Job) ->
+    Starts = [E || {started, _} = E <- Job#job.history],
+    case Starts of
+        [] ->
+            {0, 0, 0};
+        [{started, When} | _] ->
+            When
+    end.
+
+
+-spec update_history(#job{}, event_type(), erlang:timestamp()) -> #job{}.
+update_history(Job, Type, When) ->
+    History0 = [{Type, When} | Job#job.history],
+    History1 = lists:sublist(History0, ?MAX_HISTORY),
+    Job#job{history = History1}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/bb34ad58/src/couch_replicator_scheduler.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.hrl b/src/couch_replicator_scheduler.hrl
new file mode 100644
index 0000000..5203b0c
--- /dev/null
+++ b/src/couch_replicator_scheduler.hrl
@@ -0,0 +1,15 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-type job_id() :: term().
+-type job_args() :: term().

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/bb34ad58/src/couch_replicator_scheduler_job.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler_job.erl b/src/couch_replicator_scheduler_job.erl
new file mode 100644
index 0000000..5240ddd
--- /dev/null
+++ b/src/couch_replicator_scheduler_job.erl
@@ -0,0 +1,881 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler_job).
+-behaviour(gen_server).
+-vsn(1).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+
+%% public api
+-export([start_link/1]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+-export([format_status/2]).
+
+
+%% imports
+-import(couch_util, [
+    get_value/2,
+    get_value/3,
+    to_binary/1
+]).
+
+-import(couch_replicator_utils, [
+    start_db_compaction_notifier/2,
+    stop_db_compaction_notifier/1
+]).
+
+%% definitions
+-define(LOWEST_SEQ, 0).
+-define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+-record(rep_state, {
+    rep_details,
+    source_name,
+    target_name,
+    source,
+    target,
+    history,
+    checkpoint_history,
+    start_seq,
+    committed_seq,
+    current_through_seq,
+    seqs_in_progress = [],
+    highest_seq_done = {0, ?LOWEST_SEQ},
+    source_log,
+    target_log,
+    rep_starttime,
+    src_starttime,
+    tgt_starttime,
+    timer, % checkpoint timer
+    changes_queue,
+    changes_manager,
+    changes_reader,
+    workers,
+    stats = couch_replicator_stats:new(),
+    session_id,
+    source_db_compaction_notifier = nil,
+    target_db_compaction_notifier = nil,
+    source_monitor = nil,
+    target_monitor = nil,
+    source_seq = nil,
+    use_checkpoints = true,
+    checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
+    type = db,
+    view = nil
+}).
+
+start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
+    RepChildId = BaseId ++ Ext,
+    Source = couch_replicator_api_wrap:db_uri(Src),
+    Target = couch_replicator_api_wrap:db_uri(Tgt),
+    ServerName = {global, {?MODULE, Rep#rep.id}},
+
+    case gen_server:start_link(ServerName, ?MODULE, Rep, []) of
+        {ok, Pid} ->
+            couch_log:notice("starting new replication `~s` at ~p (`~s` -> `~s`)",
+                             [RepChildId, Pid, Source, Target]),
+            {ok, Pid};
+        {error, Reason} ->
+            couch_log:warn("failed to start replication `~s` (`~s` -> `~s`)",
+                           [RepChildId, Source, Target]),
+            {error, Reason}
+    end.
+
+init(InitArgs) ->
+    {ok, InitArgs, 0}.
+
+do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
+    process_flag(trap_exit, true),
+
+    #rep_state{
+        source = Source,
+        target = Target,
+        source_name = SourceName,
+        target_name = TargetName,
+        start_seq = {_Ts, StartSeq},
+        committed_seq = {_, CommittedSeq},
+        highest_seq_done = {_, HighestSeq},
+        checkpoint_interval = CheckpointInterval
+    } = State = init_state(Rep),
+
+    NumWorkers = get_value(worker_processes, Options),
+    BatchSize = get_value(worker_batch_size, Options),
+    {ok, ChangesQueue} = couch_work_queue:new([
+        {max_items, BatchSize * NumWorkers * 2},
+        {max_size, 100 * 1024 * NumWorkers}
+    ]),
+    % This starts the _changes reader process. It adds the changes from
+    % the source db to the ChangesQueue.
+    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
+        StartSeq, Source, ChangesQueue, Options
+    ),
+    % Changes manager - responsible for dequeing batches from the changes queue
+    % and deliver them to the worker processes.
+    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
+    % This starts the worker processes. They ask the changes queue manager for a
+    % a batch of _changes rows to process -> check which revs are missing in the
+    % target, and for the missing ones, it copies them from the source to the target.
+    MaxConns = get_value(http_connections, Options),
+    Workers = lists:map(
+        fun(_) ->
+            couch_stats:increment_counter([couch_replicator, workers_started]),
+            {ok, Pid} = couch_replicator_worker:start_link(
+                self(), Source, Target, ChangesManager, MaxConns),
+            Pid
+        end,
+        lists:seq(1, NumWorkers)),
+
+    couch_task_status:add_task([
+        {type, replication},
+        {user, UserCtx#user_ctx.name},
+        {replication_id, ?l2b(BaseId ++ Ext)},
+        {database, Rep#rep.db_name},
+        {doc_id, Rep#rep.doc_id},
+        {source, ?l2b(SourceName)},
+        {target, ?l2b(TargetName)},
+        {continuous, get_value(continuous, Options, false)},
+        {revisions_checked, 0},
+        {missing_revisions_found, 0},
+        {docs_read, 0},
+        {docs_written, 0},
+        {changes_pending, get_pending_count(State)},
+        {doc_write_failures, 0},
+        {source_seq, HighestSeq},
+        {checkpointed_source_seq, CommittedSeq},
+        {checkpoint_interval, CheckpointInterval}
+    ]),
+    couch_task_status:set_update_frequency(1000),
+
+    % Until OTP R14B03:
+    %
+    % Restarting a temporary supervised child implies that the original arguments
+    % (#rep{} record) specified in the MFA component of the supervisor
+    % child spec will always be used whenever the child is restarted.
+    % This implies the same replication performance tunning parameters will
+    % always be used. The solution is to delete the child spec (see
+    % cancel_replication/1) and then start the replication again, but this is
+    % unfortunately not immune to race conditions.
+
+    couch_log:notice("Replication `~p` is using:~n"
+        "~c~p worker processes~n"
+        "~ca worker batch size of ~p~n"
+        "~c~p HTTP connections~n"
+        "~ca connection timeout of ~p milliseconds~n"
+        "~c~p retries per request~n"
+        "~csocket options are: ~s~s",
+        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
+            MaxConns, $\t, get_value(connection_timeout, Options),
+            $\t, get_value(retries, Options),
+            $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
+            case StartSeq of
+            ?LOWEST_SEQ ->
+                "";
+            _ ->
+                io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
+            end]),
+
+    couch_log:debug("Worker pids are: ~p", [Workers]),
+
+    couch_replicator_manager:replication_started(Rep),
+
+    {ok, State#rep_state{
+            changes_queue = ChangesQueue,
+            changes_manager = ChangesManager,
+            changes_reader = ChangesReader,
+            workers = Workers
+        }
+    }.
+
+adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
+    Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
+    couch_log:notice(Msg, [RepId]),
+    Src#httpdb{http_connections = 2};
+
+adjust_maxconn(Src, _RepId) ->
+    Src.
+
+handle_info(shutdown, St) ->
+    {stop, shutdown, St};
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
+    couch_log:error("Source database is down. Reason: ~p", [Why]),
+    {stop, source_db_down, St};
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
+    couch_log:error("Target database is down. Reason: ~p", [Why]),
+    {stop, target_db_down, St};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+    couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
+    {stop, changes_reader_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
+    couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
+    {stop, changes_manager_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
+    couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
+    {stop, changes_queue_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
+    case Workers -- [Pid] of
+    Workers ->
+        couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
+        {noreply, State#rep_state{workers = Workers}};
+        %% not clear why a stop was here before
+        %%{stop, {unknown_process_died, Pid, normal}, State};
+    [] ->
+        catch unlink(State#rep_state.changes_manager),
+        catch exit(State#rep_state.changes_manager, kill),
+        do_last_checkpoint(State);
+    Workers2 ->
+        {noreply, State#rep_state{workers = Workers2}}
+    end;
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
+    State2 = cancel_timer(State),
+    case lists:member(Pid, Workers) of
+    false ->
+        {stop, {unknown_process_died, Pid, Reason}, State2};
+    true ->
+        couch_stats:increment_counter([couch_replicator, worker_deaths]),
+        couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
+        {stop, {worker_died, Pid, Reason}, State2}
+    end;
+
+handle_info(timeout, InitArgs) ->
+    try do_init(InitArgs) of {ok, State} ->
+        {noreply, State}
+    catch Class:Error ->
+        Stack = erlang:get_stacktrace(),
+        {stop, shutdown, {error, Class, Error, Stack, InitArgs}}
+    end.
+
+handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
+    {reply, {ok, Rep}, State};
+
+handle_call({add_stats, Stats}, From, State) ->
+    gen_server:reply(From, ok),
+    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
+    {noreply, State#rep_state{stats = NewStats}};
+
+handle_call({report_seq_done, Seq, StatsInc}, From,
+    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
+        current_through_seq = ThroughSeq, stats = Stats} = State) ->
+    gen_server:reply(From, ok),
+    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+    [] ->
+        {Seq, []};
+    [Seq | Rest] ->
+        {Seq, Rest};
+    [_ | _] ->
+        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+    end,
+    NewHighestDone = lists:max([HighestDone, Seq]),
+    NewThroughSeq = case NewSeqsInProgress of
+    [] ->
+        lists:max([NewThroughSeq0, NewHighestDone]);
+    _ ->
+        NewThroughSeq0
+    end,
+    couch_log:debug("Worker reported seq ~p, through seq was ~p, "
+        "new through seq is ~p, highest seq done was ~p, "
+        "new highest seq done is ~p~n"
+        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+    NewState = State#rep_state{
+        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
+        current_through_seq = NewThroughSeq,
+        seqs_in_progress = NewSeqsInProgress,
+        highest_seq_done = NewHighestDone
+    },
+    update_task(NewState),
+    {noreply, NewState}.
+
+
+handle_cast({db_compacted, DbName},
+    #rep_state{source = #db{name = DbName} = Source} = State) ->
+    {ok, NewSource} = couch_db:reopen(Source),
+    {noreply, State#rep_state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+    #rep_state{target = #db{name = DbName} = Target} = State) ->
+    {ok, NewTarget} = couch_db:reopen(Target),
+    {noreply, State#rep_state{target = NewTarget}};
+
+handle_cast(checkpoint, State) ->
+    #rep_state{rep_details = #rep{} = Rep} = State,
+    case couch_replicator_manager:continue(Rep) of
+    {true, _} ->
+        case do_checkpoint(State) of
+        {ok, NewState} ->
+            couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+            {noreply, NewState#rep_state{timer = start_timer(State)}};
+        Error ->
+            couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+            {stop, Error, State}
+        end;
+    {false, Owner} ->
+        couch_replicator_manager:replication_usurped(Rep, Owner),
+        {stop, shutdown, State}
+    end;
+
+handle_cast({report_seq, Seq},
+    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
+
+
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
+    {ok, State}.
+
+
+headers_strip_creds([], Acc) ->
+    lists:reverse(Acc);
+headers_strip_creds([{Key, Value0} | Rest], Acc) ->
+    Value = case string:to_lower(Key) of
+    "authorization" ->
+        "****";
+    _ ->
+        Value0
+    end,
+    headers_strip_creds(Rest, [{Key, Value} | Acc]).
+
+
+httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
+    HttpDb#httpdb{
+        url = couch_util:url_strip_password(Url),
+        headers = headers_strip_creds(Headers, [])
+    };
+httpdb_strip_creds(LocalDb) ->
+    LocalDb.
+
+
+rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
+    Rep#rep{
+        source = httpdb_strip_creds(Source),
+        target = httpdb_strip_creds(Target)
+    }.
+
+
+state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
+    % #rep_state contains the source and target at the top level and also
+    % in the nested #rep_details record
+    State#rep_state{
+        rep_details = rep_strip_creds(Rep),
+        source = httpdb_strip_creds(Source),
+        target = httpdb_strip_creds(Target)
+    }.
+
+
+terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
+    checkpoint_history = CheckpointHistory} = State) ->
+    terminate_cleanup(State),
+    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
+    couch_replicator_manager:replication_completed(Rep, rep_stats(State));
+
+terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
+    % cancelled replication throught ?MODULE:cancel_replication/1
+    couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
+    terminate_cleanup(State);
+
+terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
+    #rep{id=RepId} = InitArgs,
+    couch_stats:increment_counter([couch_replicator, failed_starts]),
+    CleanInitArgs = rep_strip_creds(InitArgs),
+    couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
+             [Class, Error, CleanInitArgs, Stack]),
+    case Error of
+    {unauthorized, DbUri} ->
+        NotifyError = {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
+    {db_not_found, DbUri} ->
+        NotifyError = {db_not_found, <<"could not open ", DbUri/binary>>};
+    _ ->
+        NotifyError = Error
+    end,
+    couch_replicator_notifier:notify({error, RepId, NotifyError}),
+    couch_replicator_manager:replication_error(InitArgs, NotifyError);
+terminate(Reason, State) ->
+    #rep_state{
+        source_name = Source,
+        target_name = Target,
+        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
+    } = State,
+    couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
+        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
+    terminate_cleanup(State),
+    couch_replicator_notifier:notify({error, RepId, Reason}),
+    couch_replicator_manager:replication_error(Rep, Reason).
+
+terminate_cleanup(State) ->
+    update_task(State),
+    stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
+    stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
+    couch_replicator_api_wrap:db_close(State#rep_state.source),
+    couch_replicator_api_wrap:db_close(State#rep_state.target).
+
+
+format_status(_Opt, [_PDict, State]) ->
+    [{data, [{"State", state_strip_creds(State)}]}].
+
+
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+    highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
+    {stop, normal, cancel_timer(State)};
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+    highest_seq_done = Seq} = State) ->
+    case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
+    {ok, NewState} ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+        {stop, normal, cancel_timer(NewState)};
+    Error ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+        {stop, Error, State}
+    end.
+
+
+start_timer(State) ->
+    After = State#rep_state.checkpoint_interval,
+    case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
+    {ok, Ref} ->
+        Ref;
+    Error ->
+        couch_log:error("Replicator, error scheduling checkpoint:  ~p", [Error]),
+        nil
+    end.
+
+
+cancel_timer(#rep_state{timer = nil} = State) ->
+    State;
+cancel_timer(#rep_state{timer = Timer} = State) ->
+    {ok, cancel} = timer:cancel(Timer),
+    State#rep_state{timer = nil}.
+
+
+init_state(Rep) ->
+    #rep{
+        id = {BaseId, _Ext},
+        source = Src0, target = Tgt,
+        options = Options, user_ctx = UserCtx,
+        type = Type, view = View
+    } = Rep,
+    % Adjust minimum number of http source connections to 2 to avoid deadlock
+    Src = adjust_maxconn(Src0, BaseId),
+    {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
+    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
+        get_value(create_target, Options, false)),
+
+    {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
+    {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
+
+    [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
+
+    {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+    StartSeq1 = get_value(since_seq, Options, StartSeq0),
+    StartSeq = {0, StartSeq1},
+
+    SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
+
+    #doc{body={CheckpointHistory}} = SourceLog,
+    State = #rep_state{
+        rep_details = Rep,
+        source_name = couch_replicator_api_wrap:db_uri(Source),
+        target_name = couch_replicator_api_wrap:db_uri(Target),
+        source = Source,
+        target = Target,
+        history = History,
+        checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
+        start_seq = StartSeq,
+        current_through_seq = StartSeq,
+        committed_seq = StartSeq,
+        source_log = SourceLog,
+        target_log = TargetLog,
+        rep_starttime = httpd_util:rfc1123_date(),
+        src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
+        tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+        session_id = couch_uuids:random(),
+        source_db_compaction_notifier =
+            start_db_compaction_notifier(Source, self()),
+        target_db_compaction_notifier =
+            start_db_compaction_notifier(Target, self()),
+        source_monitor = db_monitor(Source),
+        target_monitor = db_monitor(Target),
+        source_seq = SourceSeq,
+        use_checkpoints = get_value(use_checkpoints, Options, true),
+        checkpoint_interval = get_value(checkpoint_interval, Options,
+                                        ?DEFAULT_CHECKPOINT_INTERVAL),
+        type = Type,
+        view = View
+    },
+    State#rep_state{timer = start_timer(State)}.
+
+
+find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
+    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
+    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+
+
+fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
+    lists:reverse(Acc);
+
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
+    case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
+    {error, <<"not_found">>} when Vsn > 1 ->
+        OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
+        fold_replication_logs(Dbs, Vsn - 1,
+            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
+    {error, <<"not_found">>} ->
+        fold_replication_logs(
+            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
+    {ok, Doc} when LogId =:= NewId ->
+        fold_replication_logs(
+            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
+    {ok, Doc} ->
+        MigratedLog = #doc{id = NewId, body = Doc#doc.body},
+        fold_replication_logs(
+            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
+    end.
+
+
+spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
+    spawn_link(fun() ->
+        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
+    end).
+
+changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
+    receive
+    {get_changes, From} ->
+        case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
+        closed ->
+            From ! {closed, self()};
+        {ok, Changes} ->
+            #doc_info{high_seq = Seq} = lists:last(Changes),
+            ReportSeq = {Ts, Seq},
+            ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
+            From ! {changes, self(), Changes, ReportSeq}
+        end,
+        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+    end.
+
+
+do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
+    NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
+    {ok, NewState};
+do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
+    update_task(State),
+    {ok, State};
+do_checkpoint(State) ->
+    #rep_state{
+        source_name=SourceName,
+        target_name=TargetName,
+        source = Source,
+        target = Target,
+        history = OldHistory,
+        start_seq = {_, StartSeq},
+        current_through_seq = {_Ts, NewSeq} = NewTsSeq,
+        source_log = SourceLog,
+        target_log = TargetLog,
+        rep_starttime = ReplicationStartTime,
+        src_starttime = SrcInstanceStartTime,
+        tgt_starttime = TgtInstanceStartTime,
+        stats = Stats,
+        rep_details = #rep{options = Options},
+        session_id = SessionId
+    } = State,
+    case commit_to_both(Source, Target) of
+    {source_error, Reason} ->
+         {checkpoint_commit_failure,
+             <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
+    {target_error, Reason} ->
+         {checkpoint_commit_failure,
+             <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
+    {SrcInstanceStartTime, TgtInstanceStartTime} ->
+        couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
+            [SourceName, TargetName, NewSeq]),
+        StartTime = ?l2b(ReplicationStartTime),
+        EndTime = ?l2b(httpd_util:rfc1123_date()),
+        NewHistoryEntry = {[
+            {<<"session_id">>, SessionId},
+            {<<"start_time">>, StartTime},
+            {<<"end_time">>, EndTime},
+            {<<"start_last_seq">>, StartSeq},
+            {<<"end_last_seq">>, NewSeq},
+            {<<"recorded_seq">>, NewSeq},
+            {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
+            {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
+            {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+            {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+            {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
+        ]},
+        BaseHistory = [
+            {<<"session_id">>, SessionId},
+            {<<"source_last_seq">>, NewSeq},
+            {<<"replication_id_version">>, ?REP_ID_VERSION}
+        ] ++ case get_value(doc_ids, Options) of
+        undefined ->
+            [];
+        _DocIds ->
+            % backwards compatibility with the result of a replication by
+            % doc IDs in versions 0.11.x and 1.0.x
+            % TODO: deprecate (use same history format, simplify code)
+            [
+                {<<"start_time">>, StartTime},
+                {<<"end_time">>, EndTime},
+                {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+                {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+                {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
+            ]
+        end,
+        % limit history to 50 entries
+        NewRepHistory = {
+            BaseHistory ++
+            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+        },
+
+        try
+            {SrcRevPos, SrcRevId} = update_checkpoint(
+                Source, SourceLog#doc{body = NewRepHistory}, source),
+            {TgtRevPos, TgtRevId} = update_checkpoint(
+                Target, TargetLog#doc{body = NewRepHistory}, target),
+            NewState = State#rep_state{
+                checkpoint_history = NewRepHistory,
+                committed_seq = NewTsSeq,
+                source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+                target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+            },
+            update_task(NewState),
+            {ok, NewState}
+        catch throw:{checkpoint_commit_failure, _} = Failure ->
+            Failure
+        end;
+    {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Target database out of sync. "
+            "Try to increase max_dbs_open at the target's server.">>};
+    {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Source database out of sync. "
+            "Try to increase max_dbs_open at the source's server.">>};
+    {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Source and target databases out of "
+            "sync. Try to increase max_dbs_open at both servers.">>}
+    end.
+
+
+update_checkpoint(Db, Doc, DbType) ->
+    try
+        update_checkpoint(Db, Doc)
+    catch throw:{checkpoint_commit_failure, Reason} ->
+        throw({checkpoint_commit_failure,
+            <<"Error updating the ", (to_binary(DbType))/binary,
+                " checkpoint document: ", (to_binary(Reason))/binary>>})
+    end.
+
+update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
+    try
+        case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
+        {ok, PosRevId} ->
+            PosRevId;
+        {error, Reason} ->
+            throw({checkpoint_commit_failure, Reason})
+        end
+    catch throw:conflict ->
+        case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
+        {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
+            % This means that we were able to update successfully the
+            % checkpoint doc in a previous attempt but we got a connection
+            % error (timeout for e.g.) before receiving the success response.
+            % Therefore the request was retried and we got a conflict, as the
+            % revision we sent is not the current one.
+            % We confirm this by verifying the doc body we just got is the same
+            % that we have just sent.
+            {Pos, RevId};
+        _ ->
+            throw({checkpoint_commit_failure, conflict})
+        end
+    end.
+
+
+commit_to_both(Source, Target) ->
+    % commit the src async
+    ParentPid = self(),
+    SrcCommitPid = spawn_link(
+        fun() ->
+            Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
+            ParentPid ! {self(), Result}
+        end),
+
+    % commit tgt sync
+    TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
+
+    SourceResult = receive
+    {SrcCommitPid, Result} ->
+        unlink(SrcCommitPid),
+        receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
+        Result;
+    {'EXIT', SrcCommitPid, Reason} ->
+        {error, Reason}
+    end,
+    case TargetResult of
+    {ok, TargetStartTime} ->
+        case SourceResult of
+        {ok, SourceStartTime} ->
+            {SourceStartTime, TargetStartTime};
+        SourceError ->
+            {source_error, SourceError}
+        end;
+    TargetError ->
+        {target_error, TargetError}
+    end.
+
+
+compare_replication_logs(SrcDoc, TgtDoc) ->
+    #doc{body={RepRecProps}} = SrcDoc,
+    #doc{body={RepRecPropsTgt}} = TgtDoc,
+    case get_value(<<"session_id">>, RepRecProps) ==
+            get_value(<<"session_id">>, RepRecPropsTgt) of
+    true ->
+        % if the records have the same session id,
+        % then we have a valid replication history
+        OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
+        OldHistory = get_value(<<"history">>, RepRecProps, []),
+        {OldSeqNum, OldHistory};
+    false ->
+        SourceHistory = get_value(<<"history">>, RepRecProps, []),
+        TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
+        couch_log:notice("Replication records differ. "
+                "Scanning histories to find a common ancestor.", []),
+        couch_log:debug("Record on source:~p~nRecord on target:~p~n",
+                [RepRecProps, RepRecPropsTgt]),
+        compare_rep_history(SourceHistory, TargetHistory)
+    end.
+
+compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
+    couch_log:notice("no common ancestry -- performing full replication", []),
+    {?LOWEST_SEQ, []};
+compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
+    SourceId = get_value(<<"session_id">>, S),
+    case has_session_id(SourceId, Target) of
+    true ->
+        RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
+        couch_log:notice("found a common replication record with source_seq ~p",
+            [RecordSeqNum]),
+        {RecordSeqNum, SourceRest};
+    false ->
+        TargetId = get_value(<<"session_id">>, T),
+        case has_session_id(TargetId, SourceRest) of
+        true ->
+            RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
+            couch_log:notice("found a common replication record with source_seq ~p",
+                [RecordSeqNum]),
+            {RecordSeqNum, TargetRest};
+        false ->
+            compare_rep_history(SourceRest, TargetRest)
+        end
+    end.
+
+
+has_session_id(_SessionId, []) ->
+    false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+    case get_value(<<"session_id">>, Props, nil) of
+    SessionId ->
+        true;
+    _Else ->
+        has_session_id(SessionId, Rest)
+    end.
+
+
+db_monitor(#db{} = Db) ->
+    couch_db:monitor(Db);
+db_monitor(_HttpDb) ->
+    nil.
+
+get_pending_count(St) ->
+    Rep = St#rep_state.rep_details,
+    Timeout = get_value(connection_timeout, Rep#rep.options),
+    TimeoutMicro = Timeout * 1000,
+    case get(pending_count_state) of
+        {LastUpdate, PendingCount} ->
+            case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of
+                true ->
+                    NewPendingCount = get_pending_count_int(St),
+                    put(pending_count_state, {os:timestamp(), NewPendingCount}),
+                    NewPendingCount;
+                false ->
+                    PendingCount
+            end;
+        undefined ->
+            NewPendingCount = get_pending_count_int(St),
+            put(pending_count_state, {os:timestamp(), NewPendingCount}),
+            NewPendingCount
+    end.
+
+
+get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) ->
+    {_, Seq} = St#rep_state.highest_seq_done,
+    Db = Db0#httpdb{retries = 3},
+    case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
+    {ok, Pending} ->
+        Pending;
+    _ ->
+        null
+    end;
+get_pending_count_int(#rep_state{source = Db}=St) ->
+    {_, Seq} = St#rep_state.highest_seq_done,
+    {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
+    Pending.
+
+
+update_task(State) ->
+    #rep_state{
+        current_through_seq = {_, ThroughSeq},
+        highest_seq_done = {_, HighestSeq}
+    } = State,
+    couch_task_status:update(
+        rep_stats(State) ++ [
+        {source_seq, HighestSeq},
+        {through_seq, ThroughSeq}
+    ]).
+
+
+rep_stats(State) ->
+    #rep_state{
+        committed_seq = {_, CommittedSeq},
+        stats = Stats
+    } = State,
+    [
+        {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
+        {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
+        {docs_read, couch_replicator_stats:docs_read(Stats)},
+        {docs_written, couch_replicator_stats:docs_written(Stats)},
+        {changes_pending, get_pending_count(State)},
+        {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
+        {checkpointed_source_seq, CommittedSeq}
+    ].

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/bb34ad58/src/couch_replicator_scheduler_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator_scheduler_sup.erl
new file mode 100644
index 0000000..a2cf9bd
--- /dev/null
+++ b/src/couch_replicator_scheduler_sup.erl
@@ -0,0 +1,54 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler_sup).
+-behaviour(supervisor).
+-vsn(1).
+
+%% includes
+-include("couch_replicator.hrl").
+
+%% public api
+-export([start_link/0, start_child/1, terminate_child/1]).
+
+%% supervisor api
+-export([init/1]).
+
+%% public functions
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_child(#rep{} = Rep) ->
+    supervisor:start_child(?MODULE, [Rep]).
+
+
+terminate_child(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+%% supervisor functions
+
+init(_Args) ->
+    Start = {couch_replicator_scheduler_job, start_link, []},
+    Restart = temporary, % A crashed job is not entitled to immediate restart.
+    Shutdown = 5000,
+    Type = worker,
+    Modules = [couch_replicator_scheduler_job],
+
+    RestartStrategy = simple_one_for_one,
+    MaxR = 10,
+    MaxT = 3,
+
+    ChildSpec =
+        {undefined, Start, Restart, Shutdown, Type, Modules},
+    {ok, {{RestartStrategy, MaxR, MaxT}, [ChildSpec]}}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/bb34ad58/src/couch_replicator_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_sup.erl b/src/couch_replicator_sup.erl
index 57ad63b..b744a74 100644
--- a/src/couch_replicator_sup.erl
+++ b/src/couch_replicator_sup.erl
@@ -20,6 +20,18 @@ start_link() ->
 
 init(_Args) ->
     Children = [
+        {couch_replicator_scheduler,
+            {couch_replicator_scheduler, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_scheduler]},
+        {couch_replicator_scheduler_sup,
+            {couch_replicator_scheduler_sup, start_link, []},
+            permanent,
+            infinity,
+            supervisor,
+            [couch_replicator_scheduler_sup]},
         {couch_replication_event,
             {gen_event, start_link, [{local, couch_replication}]},
             permanent,


Mime
View raw message