couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vatam...@apache.org
Subject [couchdb] 01/09: Introduce couch_replicator_scheduler
Date Wed, 05 Apr 2017 19:08:47 GMT
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f63267f4ff734a6925902756979ecefdee60e992
Author: Robert Newson <rnewson@apache.org>
AuthorDate: Mon Apr 11 20:12:26 2016 +0100

    Introduce couch_replicator_scheduler
    
    Scheduling replicator can run a large number of replication jobs by scheduling
    them. It will periodically stop some jobs and start new ones. Jobs that fail
    will be penalized with an exponential backoff.
    
    Jira: COUCHDB-3324
---
 .../src/couch_replicator_scheduler.erl             | 1380 ++++++++++++++++++++
 .../src/couch_replicator_scheduler.hrl             |   15 +
 .../src/couch_replicator_scheduler_job.erl         |  945 ++++++++++++++
 .../src/couch_replicator_scheduler_sup.erl         |   54 +
 4 files changed, 2394 insertions(+)

diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
new file mode 100644
index 0000000..a741a9f
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -0,0 +1,1380 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+-behaviour(gen_server).
+-behaviour(config_listener).
+-vsn(1).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% public api
+-export([start_link/0, add_job/1, remove_job/1, reschedule/0]).
+-export([rep_state/1, find_jobs_by_dbname/1, find_jobs_by_doc/2]).
+-export([job_summary/2, health_threshold/0]).
+-export([jobs/0, job/1]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+-export([format_status/2]).
+
+%% config_listener callback
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+%% types
+-type event_type() :: added | started | stopped | {crashed, any()}.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: nonempty_list(event()).
+
+%% definitions
+-define(MAX_BACKOFF_EXPONENT, 10).
+-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(RELISTEN_DELAY, 5000).
+
+-define(DEFAULT_MAX_JOBS, 500).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 60000).
+-record(state, {interval, timer, max_jobs, max_churn, max_history}).
+-record(job, {
+          id :: job_id() | '$1' | '_',
+          rep :: #rep{} | '_',
+          pid :: undefined | pid() | '$1' | '_',
+          monitor :: undefined | reference() | '_',
+          history :: history() | '_'}).
+
+-record(stats_acc, {
+          now  :: erlang:timestamp(),
+          pending_t = 0 :: non_neg_integer(),
+          running_t = 0 :: non_neg_integer(),
+          crashed_t = 0 :: non_neg_integer(),
+          pending_n = 0 :: non_neg_integer(),
+          running_n = 0 :: non_neg_integer(),
+          crashed_n = 0 :: non_neg_integer()}).
+
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+    Job = #job{
+        id = Rep#rep.id,
+        rep = Rep,
+        history = [{added, os:timestamp()}]},
+    gen_server:call(?MODULE, {add_job, Job}, infinity).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+    gen_server:call(?MODULE, {remove_job, Id}, infinity).
+
+
+-spec reschedule() -> ok.
+% Trigger a manual reschedule. Used for testing and/or ops.
+reschedule() ->
+    gen_server:call(?MODULE, reschedule, infinity).
+
+
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+    case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
+        {'EXIT',{badarg, _}} ->
+            nil;
+        Rep ->
+            Rep
+    end.
+
+
+-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
+job_summary(JobId, HealthThreshold) ->
+    case job_by_id(JobId) of
+        {ok, #job{pid = Pid, history = History, rep = Rep}} ->
+            ErrorCount = consecutive_crashes(History, HealthThreshold),
+            {State, Info} = case {Pid, ErrorCount} of
+                {undefined, 0}  ->
+                    {pending, null};
+                {undefined, ErrorCount} when ErrorCount > 0 ->
+                     [{{crashed, Error}, _When} | _] = History,
+                     ErrMsg = couch_replicator_utils:rep_error_to_binary(Error),
+                     {crashing, ErrMsg};
+                {Pid, ErrorCount} when is_pid(Pid) ->
+                     {running, null}
+            end,
+            [
+                {source, iolist_to_binary(ejson_url(Rep#rep.source))},
+                {target, iolist_to_binary(ejson_url(Rep#rep.target))},
+                {state, State},
+                {info, Info},
+                {error_count, ErrorCount},
+                {last_updated, last_updated(History)},
+                {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)},
+                {proxy, job_proxy_url(Rep#rep.source)}
+            ];
+        {error, not_found} ->
+            nil  % Job might have just completed
+    end.
+
+
+job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) ->
+    list_to_binary(couch_util:url_strip_password(ProxyUrl));
+
+job_proxy_url(_Endpoint) ->
+    null.
+
+
+-spec health_threshold() -> non_neg_integer().
+health_threshold() ->
+    config:get_integer("replicator", "health_threshold",
+        ?DEFAULT_HEALTH_THRESHOLD_SEC).
+
+
+-spec find_jobs_by_dbname(binary()) -> list(#rep{}).
+find_jobs_by_dbname(DbName) ->
+    Rep = #rep{db_name = DbName, _ = '_'},
+    MatchSpec = #job{id = '$1', rep = Rep, _ = '_'},
+    [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
+
+
+-spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}).
+find_jobs_by_doc(DbName, DocId) ->
+    Rep =  #rep{db_name = DbName, doc_id = DocId, _ = '_'},
+    MatchSpec = #job{id = '$1', rep = Rep, _ = '_'},
+    [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
+
+
+
+
+%% gen_server functions
+
+init(_) ->
+    EtsOpts = [named_table, {read_concurrency, true}, {keypos, #job.id}],
+    ?MODULE = ets:new(?MODULE, EtsOpts),
+    ok = config:listen_for_changes(?MODULE, nil),
+    Interval = config:get_integer("replicator", "interval", ?DEFAULT_SCHEDULER_INTERVAL),
+    MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS),
+    MaxChurn = config:get_integer("replicator", "max_churn", ?DEFAULT_MAX_CHURN),
+    MaxHistory = config:get_integer("replicator", "max_history", ?DEFAULT_MAX_HISTORY),
+    {ok, Timer} = timer:send_after(Interval, reschedule),
+    State = #state{
+        interval = Interval,
+        max_jobs = MaxJobs,
+        max_churn = MaxChurn,
+        max_history = MaxHistory,
+        timer = Timer
+    },
+    {ok, State}.
+
+
+handle_call({add_job, Job}, _From, State) ->
+    ok = maybe_remove_job_int(Job#job.id, State),
+    true = add_job_int(Job),
+    ok = maybe_start_newly_added_job(Job, State),
+    couch_stats:increment_counter([couch_replicator, jobs, adds]),
+    TotalJobs = ets:info(?MODULE, size),
+    couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs),
+    {reply, ok, State};
+
+handle_call({remove_job, Id}, _From, State) ->
+    ok = maybe_remove_job_int(Id, State),
+    {reply, ok, State};
+
+handle_call(reschedule, _From, State) ->
+    ok = reschedule(State),
+    {reply, ok, State};
+
+handle_call(_, _From, State) ->
+    {noreply, State}.
+
+
+handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs), MaxJobs >= 0 ->
+    couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]),
+    {noreply, State#state{max_jobs = MaxJobs}};
+
+handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn), MaxChurn > 0 ->
+    couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]),
+    {noreply, State#state{max_churn = MaxChurn}};
+
+handle_cast({set_max_history, MaxHistory}, State) when is_integer(MaxHistory), MaxHistory > 0 ->
+    couch_log:notice("~p: max_history set to ~B", [?MODULE, MaxHistory]),
+    {noreply, State#state{max_history = MaxHistory}};
+
+handle_cast({set_interval, Interval}, State) when is_integer(Interval), Interval > 0 ->
+    couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
+    {noreply, State#state{interval = Interval}};
+
+handle_cast(_, State) ->
+    {noreply, State}.
+
+
+handle_info(reschedule, State) ->
+    ok = reschedule(State),
+    {ok, cancel} = timer:cancel(State#state.timer),
+    {ok, Timer} = timer:send_after(State#state.interval, reschedule),
+    {noreply, State#state{timer = Timer}};
+
+handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
+    {ok, Job} = job_by_pid(Pid),
+    couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
+    remove_job_int(Job),
+    update_running_jobs_stats(),
+    {noreply, State};
+
+handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+    {ok, Job} = job_by_pid(Pid),
+    ok = handle_crashed_job(Job, Reason, State),
+    {noreply, State};
+
+handle_info(restart_config_listener, State) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {noreply, State};
+
+handle_info(_, State) ->
+    {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+format_status(_Opt, [_PDict, State]) ->
+    [{max_jobs, State#state.max_jobs},
+     {running_jobs, running_job_count()},
+     {pending_jobs, pending_job_count()}].
+
+
+%% config listener functions
+
+handle_config_change("replicator", "max_jobs", V, _, S) ->
+    ok = gen_server:cast(?MODULE, {set_max_jobs, list_to_integer(V)}),
+    {ok, S};
+
+handle_config_change("replicator", "max_churn", V, _, S) ->
+    ok = gen_server:cast(?MODULE, {set_max_churn, list_to_integer(V)}),
+    {ok, S};
+
+handle_config_change("replicator", "interval", V, _, S) ->
+    ok = gen_server:cast(?MODULE, {set_interval, list_to_integer(V)}),
+    {ok, S};
+
+handle_config_change("replicator", "max_history", V, _, S) ->
+    ok = gen_server:cast(?MODULE, {set_history, list_to_integer(V)}),
+    {ok, S};
+
+handle_config_change(_, _, _, _, S) ->
+    {ok, S}.
+
+
+handle_config_terminate(_, stop, _) ->
+    ok;
+
+handle_config_terminate(_, _, _) ->
+    Pid = whereis(?MODULE),
+    erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
+
+
+%% private functions
+
+
+% Handle crashed jobs. Handling differs between transient and permanent jobs.
+% Transient jobs are those posted to the _replicate endpoint. They don't have a
+% db associated with them. When those jobs crash, they are not restarted. That
+% is also consistent with behavior when the node they run on, crashed and they
+% do not migrate to other nodes. Permanent jobs are those created from
+% replicator documents. Those jobs, once they pass basic validation and end up
+% in the scheduler will be retried indefinitely (with appropriate exponential
+% backoffs).
+-spec handle_crashed_job(#job{}, any(), #state{}) -> ok.
+handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, _State) ->
+    Msg = "~p : Transient job ~p failed, removing. Error: ~p",
+    ErrorBinary = couch_replicator_utils:rep_error_to_binary(Reason),
+    couch_log:error(Msg, [?MODULE, Job#job.id, ErrorBinary]),
+    remove_job_int(Job),
+    update_running_jobs_stats(),
+    ok;
+
+handle_crashed_job(Job, Reason, State) ->
+    ok = update_state_crashed(Job, Reason, State),
+    case couch_replicator_doc_processor:update_docs() of
+        true ->
+            couch_replicator_docs:update_error(Job#job.rep, Reason);
+        false ->
+            ok
+    end,
+    case ets:info(?MODULE, size) < State#state.max_jobs of
+        true ->
+            % Starting pending jobs is an O(TotalJobsCount) operation. Only do
+            % it if there is a relatively small number of jobs. Otherwise
+            % scheduler could be blocked if there is a cascade of lots failing
+            % jobs in a row.
+            start_pending_jobs(State),
+            update_running_jobs_stats(),
+            ok;
+        false ->
+            ok
+    end.
+
+
+% Attempt to start a newly added job. First quickly check if total jobs
+% already exceed max jobs, then do a more expensive check which runs a
+% select (an O(n) operation) to check pending jobs specifically.
+-spec maybe_start_newly_added_job(#job{}, #state{}) -> ok.
+maybe_start_newly_added_job(Job, State) ->
+    MaxJobs = State#state.max_jobs,
+    TotalJobs = ets:info(?MODULE, size),
+    case TotalJobs < MaxJobs andalso running_job_count() < MaxJobs of
+        true ->
+            start_job_int(Job, State),
+            update_running_jobs_stats(),
+            ok;
+        false ->
+            ok
+    end.
+
+% Return up to a given number of oldest, not recently crashed jobs. Try to be
+% memory efficient and use ets:foldl to accumulate jobs.
+-spec pending_jobs(non_neg_integer()) -> [#job{}].
+pending_jobs(0) ->
+    % Handle this case as user could set max_churn to 0. If this is passed to
+    % other function clause it will crash as gb_sets:largest assumes set is not
+    % empty.
+    [];
+
+pending_jobs(Count) when is_integer(Count), Count > 0 ->
+    Set0 = gb_sets:new(),  % [{LastStart, Job},...]
+    Now = os:timestamp(),
+    Acc0 = {Set0, Now, Count, health_threshold()},
+    {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE),
+    [Job || {_Started, Job} <- gb_sets:to_list(Set1)].
+
+
+pending_fold(Job, {Set, Now, Count, HealthThreshold}) ->
+    Set1 = case {not_recently_crashed(Job, Now, HealthThreshold),
+        gb_sets:size(Set) >= Count} of
+        {true, true} ->
+             % Job is healthy but already reached accumulated limit, so might
+             % have to replace one of the accumulated jobs
+             pending_maybe_replace(Job, Set);
+        {true, false} ->
+             % Job is healthy and we haven't reached the limit, so add job
+             % to accumulator
+             gb_sets:add_element({last_started(Job), Job}, Set);
+        {false, _} ->
+             % This jobs is not healthy (has crashed too recently), so skip it.
+             Set
+    end,
+    {Set1, Now, Count, HealthThreshold}.
+
+
+% Replace Job in the accumulator if it is older than youngest job there.
+pending_maybe_replace(Job, Set) ->
+    Started = last_started(Job),
+    {Youngest, YoungestJob} = gb_sets:largest(Set),
+    case Started < Youngest of
+        true ->
+            Set1 = gb_sets:delete({Youngest, YoungestJob}, Set),
+            gb_sets:add_element({Started, Job}, Set1);
+        false ->
+            Set
+    end.
+
+
+start_jobs(Count, State) ->
+    [start_job_int(Job, State) || Job <- pending_jobs(Count)],
+    ok.
+
+
+-spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer().
+stop_jobs(Count, IsContinuous, State) ->
+    Running0 = running_jobs(),
+    ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end,
+    Running1 = lists:filter(ContinuousPred, Running0),
+    Running2 = lists:sort(fun oldest_job_first/2, Running1),
+    Running3 = lists:sublist(Running2, Count),
+    length([stop_job_int(Job, State) || Job <- Running3]).
+
+
+oldest_job_first(#job{} = A, #job{} = B) ->
+    last_started(A) =< last_started(B).
+
+
+not_recently_crashed(#job{history = History}, Now, HealthThreshold) ->
+    case History of
+        [{added, _When}] ->
+            true;
+        [{stopped, _When} | _] ->
+            true;
+        _ ->
+            LatestCrashT = latest_crash_timestamp(History),
+            CrashCount = consecutive_crashes(History, HealthThreshold),
+            timer:now_diff(Now, LatestCrashT) >= backoff_micros(CrashCount)
+    end.
+
+
+% Count consecutive crashes. A crash happens when there is a `crashed` event
+% within a short period of time (configurable) after any other event. It could
+% be `crashed, started` for jobs crashing quickly after starting, `crashed,
+% crashed`, `crashed, stopped` if job repeatedly failed to start
+% being stopped. Or it could be `crashed, added` if it crashed immediately after
+% being added during start.
+%
+% The end of the consecutive crashes ends when a crashed event is seen with
+% the time delta between previous greater then the threshold.
+-spec consecutive_crashes(history(), non_neg_integer()) -> non_neg_integer().
+consecutive_crashes(History, HealthThreshold) when is_list(History) ->
+    consecutive_crashes(History, HealthThreshold, 0).
+
+
+-spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) ->
+     non_neg_integer().
+consecutive_crashes([], _HealthThreashold, Count) ->
+    Count;
+
+consecutive_crashes([{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest],
+    HealthThreshold, Count) ->
+    case timer:now_diff(CrashT, PrevT) > HealthThreshold * 1000000 of
+        true ->
+            Count;
+        false ->
+            consecutive_crashes([PrevEvent | Rest], HealthThreshold, Count + 1)
+    end;
+
+consecutive_crashes([{stopped, _}, {started, _} | _], _HealthThreshold, Count) ->
+    Count;
+
+consecutive_crashes([_ | Rest], HealthThreshold, Count) ->
+    consecutive_crashes(Rest, HealthThreshold, Count).
+
+
+-spec latest_crash_timestamp(history()) -> erlang:timestamp().
+latest_crash_timestamp([]) ->
+    {0, 0, 0};  % Used to avoid special-casing "no crash" when doing now_diff
+
+latest_crash_timestamp([{{crashed, _Reason}, When} | _]) ->
+    When;
+
+latest_crash_timestamp([_Event | Rest]) ->
+    latest_crash_timestamp(Rest).
+
+
+-spec backoff_micros(non_neg_integer()) -> non_neg_integer().
+backoff_micros(CrashCount) ->
+    BackoffExp = erlang:min(CrashCount - 1, ?MAX_BACKOFF_EXPONENT),
+    (1 bsl BackoffExp) * ?BACKOFF_INTERVAL_MICROS.
+
+
+-spec add_job_int(#job{}) -> boolean().
+add_job_int(#job{} = Job) ->
+    ets:insert_new(?MODULE, Job).
+
+
+-spec maybe_remove_job_int(job_id(), #state{}) -> ok.
+maybe_remove_job_int(JobId, State) ->
+    case job_by_id(JobId) of
+        {ok, Job} ->
+            ok = stop_job_int(Job, State),
+            true = remove_job_int(Job),
+            couch_stats:increment_counter([couch_replicator, jobs, removes]),
+            TotalJobs = ets:info(?MODULE, size),
+            couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs),
+            update_running_jobs_stats(),
+            ok;
+        {error, not_found} ->
+            ok
+    end.
+
+
+start_job_int(#job{pid = Pid}, _State) when Pid /= undefined ->
+    ok;
+
+start_job_int(#job{} = Job0, State) ->
+    Job = maybe_optimize_job_for_rate_limiting(Job0),
+    case couch_replicator_scheduler_sup:start_child(Job#job.rep) of
+        {ok, Child} ->
+            Ref = monitor(process, Child),
+            ok = update_state_started(Job, Child, Ref, State),
+            couch_log:notice("~p: Job ~p started as ~p",
+                [?MODULE, Job#job.id, Child]);
+        {error, {already_started, OtherPid}} when node(OtherPid) =:= node ->
+            Ref = monitor(process, OtherPid),
+            ok = update_state_started(Job, OtherPid, Ref, State),
+            couch_log:notice("~p: Job ~p already running as ~p. Most likely",
+                " because replicator scheduler was restarted",
+                 [?MODULE, Job#job.id, OtherPid]);
+        {error, {already_started, OtherPid}} when node(OtherPid) =/= node ->
+            CrashMsg = "Duplicate replication running on another node",
+            couch_log:notice("~p: Job ~p already running as ~p. Most likely"
+                " because a duplicate replication is running on another node",
+                [?MODULE, Job#job.id, OtherPid]),
+            ok = update_state_crashed(Job, CrashMsg, State);
+        {error, Reason} ->
+            couch_log:notice("~p: Job ~p failed to start for reason ~p",
+                [?MODULE, Job, Reason]),
+            ok = update_state_crashed(Job, Reason, State)
+    end.
+
+
+-spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}.
+stop_job_int(#job{pid = undefined}, _State) ->
+    ok;
+
+stop_job_int(#job{} = Job, State) ->
+    ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid),
+    demonitor(Job#job.monitor, [flush]),
+    ok = update_state_stopped(Job, State),
+    couch_log:notice("~p: Job ~p stopped as ~p",
+        [?MODULE, Job#job.id, Job#job.pid]).
+
+
+-spec remove_job_int(#job{}) -> true.
+remove_job_int(#job{} = Job) ->
+    ets:delete(?MODULE, Job#job.id).
+
+
+-spec running_job_count() -> non_neg_integer().
+running_job_count() ->
+    ets:info(?MODULE, size) - pending_job_count().
+
+
+-spec running_jobs() -> [#job{}].
+running_jobs() ->
+    ets:select(?MODULE, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]).
+
+
+-spec pending_job_count() -> non_neg_integer().
+pending_job_count() ->
+    ets:select_count(?MODULE, [{#job{pid=undefined, _='_'}, [], [true]}]).
+
+
+
+
+
+-spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}.
+job_by_pid(Pid) when is_pid(Pid) ->
+    case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of
+        [] ->
+            {error, not_found};
+        [#job{}=Job] ->
+            {ok, Job}
+    end.
+
+-spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}.
+job_by_id(Id) ->
+    case ets:lookup(?MODULE, Id) of
+        [] ->
+            {error, not_found};
+        [#job{}=Job] ->
+            {ok, Job}
+    end.
+
+
+-spec update_state_stopped(#job{}, #state{}) -> ok.
+update_state_stopped(Job, State) ->
+    Job1 = reset_job_process(Job),
+    Job2 = update_history(Job1, stopped, os:timestamp(), State),
+    true = ets:insert(?MODULE, Job2),
+    couch_stats:increment_counter([couch_replicator, jobs, stops]),
+    ok.
+
+
+-spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok.
+update_state_started(Job, Pid, Ref, State) ->
+    Job1 = set_job_process(Job, Pid, Ref),
+    Job2 = update_history(Job1, started, os:timestamp(), State),
+    true = ets:insert(?MODULE, Job2),
+    couch_stats:increment_counter([couch_replicator, jobs, starts]),
+    ok.
+
+
+-spec update_state_crashed(#job{}, any(), #state{}) -> ok.
+update_state_crashed(Job, Reason, State) ->
+    Job1 = reset_job_process(Job),
+    Job2 = update_history(Job1, {crashed, Reason}, os:timestamp(), State),
+    true = ets:insert(?MODULE, Job2),
+    couch_stats:increment_counter([couch_replicator, jobs, crashes]),
+    ok.
+
+
+-spec set_job_process(#job{}, pid(), reference()) -> #job{}.
+set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) ->
+    Job#job{pid = Pid, monitor = Ref}.
+
+
+-spec reset_job_process(#job{}) -> #job{}.
+reset_job_process(#job{} = Job) ->
+    Job#job{pid = undefined, monitor = undefined}.
+
+
+-spec reschedule(#state{}) -> ok.
+reschedule(State) ->
+    Running = running_job_count(),
+    Pending = pending_job_count(),
+    stop_excess_jobs(State, Running),
+    start_pending_jobs(State, Running, Pending),
+    rotate_jobs(State, Running, Pending),
+    update_running_jobs_stats(),
+    ok.
+
+
+-spec stop_excess_jobs(#state{}, non_neg_integer()) -> ok.
+stop_excess_jobs(State, Running) ->
+    #state{max_jobs=MaxJobs} = State,
+    StopCount = Running - MaxJobs,
+    if StopCount > 0 ->
+        Stopped = stop_jobs(StopCount, true, State),
+        OneshotLeft = StopCount - Stopped,
+        if OneshotLeft > 0 ->
+            stop_jobs(OneshotLeft, false, State),
+            ok;
+        true ->
+            ok
+        end;
+    true ->
+        ok
+    end.
+
+
+start_pending_jobs(State) ->
+    start_pending_jobs(State, running_job_count(), pending_job_count()).
+
+
+start_pending_jobs(State, Running, Pending) ->
+    #state{max_jobs=MaxJobs} = State,
+    if Running < MaxJobs, Pending > 0 ->
+        start_jobs(MaxJobs - Running, State);
+    true ->
+        ok
+    end.
+
+-spec rotate_jobs(#state{}, non_neg_integer(), non_neg_integer()) -> ok.
+rotate_jobs(State, Running, Pending) ->
+    #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
+    if Running == MaxJobs, Pending > 0 ->
+        RotateCount = lists:min([Pending, Running, MaxChurn]),
+        StopCount = stop_jobs(RotateCount, true, State),
+        start_jobs(StopCount, State);
+    true ->
+        ok
+    end.
+
+
+-spec last_started(#job{}) -> erlang:timestamp().
+last_started(#job{} = Job) ->
+    case lists:keyfind(started, 1, Job#job.history) of
+        false ->
+            {0, 0, 0};
+        {started, When} ->
+            When
+    end.
+
+
+-spec update_history(#job{}, event_type(), erlang:timestamp(), #state{}) -> #job{}.
+update_history(Job, Type, When, State) ->
+    History0 = [{Type, When} | Job#job.history],
+    History1 = lists:sublist(History0, State#state.max_history),
+    Job#job{history = History1}.
+
+
+-spec update_running_jobs_stats() -> ok.
+update_running_jobs_stats() ->
+    Acc0 = #stats_acc{now = os:timestamp()},
+    AccR = ets:foldl(fun stats_fold/2, Acc0, ?MODULE),
+    #stats_acc{
+        pending_t = PendingSum,
+        running_t = RunningSum,
+        crashed_t = CrashedSum,
+        pending_n = PendingN,
+        running_n = RunningN,
+        crashed_n = CrashedN
+    } = AccR,
+    PendingAvg = avg(PendingSum, PendingN),
+    RunningAvg = avg(RunningSum, RunningN),
+    CrashedAvg = avg(CrashedSum, CrashedN),
+    couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN),
+    couch_stats:update_gauge([couch_replicator, jobs, running], RunningN),
+    couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN),
+    couch_stats:update_gauge([couch_replicator, jobs, avg_pending], PendingAvg),
+    couch_stats:update_gauge([couch_replicator, jobs, avg_running], RunningAvg),
+    couch_stats:update_gauge([couch_replicator, jobs, avg_crashed], CrashedAvg),
+    ok.
+
+
+-spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}.
+stats_fold(#job{pid = undefined, history = [{added, T}]}, Acc) ->
+    #stats_acc{now = Now, pending_t = SumT, pending_n = Cnt} = Acc,
+    Dt = round(timer:now_diff(Now, T) / 1000000),
+    Acc#stats_acc{pending_t = SumT + Dt, pending_n = Cnt + 1};
+
+stats_fold(#job{pid = undefined, history = [{stopped, T} | _]}, Acc) ->
+    #stats_acc{now = Now, pending_t = SumT, pending_n = Cnt} = Acc,
+    Dt = round(timer:now_diff(Now, T) / 1000000),
+    Acc#stats_acc{pending_t = SumT + Dt, pending_n = Cnt + 1};
+
+stats_fold(#job{pid = undefined, history = [{{crashed, _}, T} | _]}, Acc) ->
+    #stats_acc{now = Now, crashed_t = SumT, crashed_n = Cnt} = Acc,
+    Dt = round(timer:now_diff(Now, T) / 1000000),
+    Acc#stats_acc{crashed_t = SumT + Dt, crashed_n = Cnt + 1};
+
+stats_fold(#job{pid = P, history = [{started, T} | _]}, Acc) when is_pid(P) ->
+    #stats_acc{now = Now, running_t = SumT, running_n = Cnt} = Acc,
+    Dt = round(timer:now_diff(Now, T) / 1000000),
+    Acc#stats_acc{running_t = SumT + Dt, running_n = Cnt + 1}.
+
+
+
+-spec avg(Sum :: non_neg_integer(), N :: non_neg_integer())  -> non_neg_integer().
+avg(_Sum, 0) ->
+    0;
+
+avg(Sum, N) when N > 0 ->
+    round(Sum / N).
+
+
+-spec ejson_url(#httpdb{} | binary()) -> binary().
+ejson_url(#httpdb{}=Httpdb) ->
+    couch_util:url_strip_password(Httpdb#httpdb.url);
+ejson_url(DbName) when is_binary(DbName) ->
+    DbName.
+
+
+-spec job_ejson(#job{}) -> {[_ | _]}.
+job_ejson(Job) ->
+    Rep = Job#job.rep,
+    Source = ejson_url(Rep#rep.source),
+    Target = ejson_url(Rep#rep.target),
+    History = lists:map(fun({Type, When}) ->
+        EventProps  = case Type of
+            {crashed, Reason} ->
+                [{type, crashed}, {reason, crash_reason_json(Reason)}];
+            Type ->
+                [{type, Type}]
+        end,
+        {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]}
+    end, Job#job.history),
+    {BaseID, Ext} = Job#job.id,
+    Pid = case Job#job.pid of
+        undefined ->
+            null;
+        P when is_pid(P) ->
+            ?l2b(pid_to_list(P))
+    end,
+    {[
+        {id, iolist_to_binary([BaseID, Ext])},
+        {pid, Pid},
+        {source, iolist_to_binary(Source)},
+        {target, iolist_to_binary(Target)},
+        {database, Rep#rep.db_name},
+        {user, (Rep#rep.user_ctx)#user_ctx.name},
+        {doc_id, Rep#rep.doc_id},
+        {history, History},
+        {node, node()},
+        {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
+    ]}.
+
+
+-spec jobs() -> [[tuple()]].
+jobs() ->
+    ets:foldl(fun(Job, Acc) ->
+        [job_ejson(Job) | Acc]
+    end, [], couch_replicator_scheduler).
+
+
+-spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}.
+job(JobId) ->
+    case job_by_id(JobId) of
+        {ok, Job} ->
+            {ok, job_ejson(Job)};
+        Error ->
+            Error
+    end.
+
+
+crash_reason_json({_CrashType, Info}) when is_binary(Info) ->
+    Info;
+crash_reason_json(Reason) when is_binary(Reason) ->
+    Reason;
+crash_reason_json(Error) ->
+    couch_replicator_utils:rep_error_to_binary(Error).
+
+
+-spec last_updated([_]) -> binary().
+last_updated([{_Type, When} | _]) ->
+    couch_replicator_utils:iso8601(When).
+
+
+-spec is_continuous(#job{}) -> boolean().
+is_continuous(#job{rep = Rep}) ->
+    couch_util:get_value(continuous, Rep#rep.options, false).
+
+
+% If job crashed last time because it was rate limited, try to
+% optimize some options to help the job make progress.
+-spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
+maybe_optimize_job_for_rate_limiting(Job = #job{history =
+    [{{crashed, {shutdown, max_backoff}}, _} | _]}) ->
+    Opts = [
+        {checkpoint_interval, 5000},
+        {worker_processes, 2},
+        {worker_batch_size, 100},
+        {http_connections, 5}
+    ],
+    Rep = lists:foldl(fun optimize_int_option/2, Job#job.rep, Opts),
+    Job#job{rep = Rep};
+
+maybe_optimize_job_for_rate_limiting(Job) ->
+    Job.
+
+
+-spec optimize_int_option({atom(), any()}, #rep{}) -> #rep{}.
+optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
+    case couch_util:get_value(Key, Options) of
+        CurVal when is_integer(CurVal), CurVal > Val ->
+            Msg = "~p replication ~p : setting ~p = ~p due to rate limiting",
+            couch_log:warning(Msg, [?MODULE, Rep#rep.id, Key, Val]),
+            Options1 = lists:keyreplace(Key, 1, Options, {Key, Val}),
+            Rep#rep{options = Options1};
+        _ ->
+            Rep
+    end.
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+backoff_micros_test_() ->
+    BaseInterval = ?BACKOFF_INTERVAL_MICROS,
+    [?_assertEqual(R * BaseInterval, backoff_micros(N)) || {R, N} <- [
+        {1, 1}, {2, 2}, {4, 3}, {8, 4}, {16, 5}, {32, 6}, {64, 7}, {128, 8},
+        {256, 9}, {512, 10}, {1024, 11}, {1024, 12}
+    ]].
+
+
+consecutive_crashes_test_() ->
+    Threshold = ?DEFAULT_HEALTH_THRESHOLD_SEC,
+    [?_assertEqual(R, consecutive_crashes(H, Threshold)) || {R, H} <- [
+        {0, []},
+        {0, [added()]},
+        {0, [stopped()]},
+        {0, [crashed()]},
+        {1, [crashed(), added()]},
+        {1, [crashed(), crashed()]},
+        {1, [crashed(), stopped()]},
+        {3, [crashed(), crashed(), crashed(), added()]},
+        {2, [crashed(), crashed(), stopped()]},
+        {1, [crashed(), started(), added()]},
+        {2, [crashed(3), started(2), crashed(1), started(0)]},
+        {0, [stopped(3), started(2), crashed(1), started(0)]},
+        {1, [crashed(3), started(2), stopped(1), started(0)]},
+        {0, [crashed(999), started(0)]},
+        {1, [crashed(999), started(998), crashed(997), started(0)]}
+    ]].
+
+
+consecutive_crashes_non_default_threshold_test_() ->
+    [?_assertEqual(R, consecutive_crashes(H, T)) || {R, H, T} <- [
+        {0, [crashed(11), started(0)], 10},
+        {1, [crashed(10), started(0)], 10}
+    ]].
+
+
+latest_crash_timestamp_test_() ->
+    [?_assertEqual({0, R, 0}, latest_crash_timestamp(H)) || {R, H} <- [
+         {0, [added()]},
+         {1, [crashed(1)]},
+         {3, [crashed(3), started(2), crashed(1), started(0)]},
+         {1, [started(3), stopped(2), crashed(1), started(0)]}
+    ]].
+
+
+last_started_test_() ->
+    [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [
+         {0, [added()]},
+         {0, [crashed(1)]},
+         {1, [started(1)]},
+         {1, [added(), started(1)]},
+         {2, [started(2), started(1)]},
+         {2, [crashed(3), started(2), started(1)]}
+    ]].
+
+
+oldest_job_first_test() ->
+    J0 = testjob([crashed()]),
+    J1 = testjob([started(1)]),
+    J2 = testjob([started(2)]),
+    Sort = fun(Jobs) -> lists:sort(fun oldest_job_first/2, Jobs) end,
+    ?assertEqual([], Sort([])),
+    ?assertEqual([J1], Sort([J1])),
+    ?assertEqual([J1, J2], Sort([J2, J1])),
+    ?assertEqual([J0, J1, J2], Sort([J2, J1, J0])).
+
+
+scheduler_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_pending_jobs_simple(),
+            t_pending_jobs_skip_crashed(),
+            t_one_job_starts(),
+            t_no_jobs_start_if_max_is_0(),
+            t_one_job_starts_if_max_is_1(),
+            t_max_churn_does_not_throttle_initial_start(),
+            t_excess_oneshot_only_jobs(),
+            t_excess_continuous_only_jobs(),
+            t_excess_prefer_continuous_first(),
+            t_stop_oldest_first(),
+            t_start_oldest_first(),
+            t_dont_stop_if_nothing_pending(),
+            t_max_churn_limits_number_of_rotated_jobs(),
+            t_if_pending_less_than_running_start_all_pending(),
+            t_running_less_than_pending_swap_all_running(),
+            t_oneshot_dont_get_rotated(),
+            t_rotate_continuous_only_if_mixed(),
+            t_oneshot_dont_get_starting_priority(),
+            t_oneshot_will_hog_the_scheduler(),
+            t_if_excess_is_trimmed_rotation_doesnt_happen(),
+            t_if_transient_job_crashes_it_gets_removed(),
+            t_if_permanent_job_crashes_it_stays_in_ets()
+         ]
+    }.
+
+
+t_pending_jobs_simple() ->
+   ?_test(begin
+        Job1 = oneshot(1),
+        Job2 = oneshot(2),
+        setup_jobs([Job2, Job1]),
+        ?assertEqual([], pending_jobs(0)),
+        ?assertEqual([Job1], pending_jobs(1)),
+        ?assertEqual([Job1, Job2], pending_jobs(2)),
+        ?assertEqual([Job1, Job2], pending_jobs(3))
+    end).
+
+
+t_pending_jobs_skip_crashed() ->
+   ?_test(begin
+        Job = oneshot(1),
+        Ts = os:timestamp(),
+        History = [crashed(Ts), started(Ts) | Job#job.history],
+        Job1 = Job#job{history = History},
+        Job2 = oneshot(2),
+        Job3 = oneshot(3),
+        setup_jobs([Job2, Job1, Job3]),
+        ?assertEqual([Job2], pending_jobs(1)),
+        ?assertEqual([Job2, Job3], pending_jobs(2)),
+        ?assertEqual([Job2, Job3], pending_jobs(3))
+    end).
+
+
+t_one_job_starts() ->
+    ?_test(begin
+        setup_jobs([oneshot(1)]),
+        ?assertEqual({0, 1}, run_stop_count()),
+        reschedule(mock_state(?DEFAULT_MAX_JOBS)),
+        ?assertEqual({1, 0}, run_stop_count())
+    end).
+
+
+t_no_jobs_start_if_max_is_0() ->
+    ?_test(begin
+        setup_jobs([oneshot(1)]),
+        reschedule(mock_state(0)),
+        ?assertEqual({0, 1}, run_stop_count())
+    end).
+
+
+t_one_job_starts_if_max_is_1() ->
+    ?_test(begin
+        setup_jobs([oneshot(1), oneshot(2)]),
+        reschedule(mock_state(1)),
+        ?assertEqual({1, 1}, run_stop_count())
+    end).
+
+
+t_max_churn_does_not_throttle_initial_start() ->
+    ?_test(begin
+        setup_jobs([oneshot(1), oneshot(2)]),
+        reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)),
+        ?assertEqual({2, 0}, run_stop_count())
+    end).
+
+
+t_excess_oneshot_only_jobs() ->
+    ?_test(begin
+        setup_jobs([oneshot_running(1), oneshot_running(2)]),
+        ?assertEqual({2, 0}, run_stop_count()),
+        reschedule(mock_state(1)),
+        ?assertEqual({1, 1}, run_stop_count()),
+        reschedule(mock_state(0)),
+        ?assertEqual({0, 2}, run_stop_count())
+    end).
+
+
+t_excess_continuous_only_jobs() ->
+    ?_test(begin
+        setup_jobs([continuous_running(1), continuous_running(2)]),
+        ?assertEqual({2, 0}, run_stop_count()),
+        reschedule(mock_state(1)),
+        ?assertEqual({1, 1}, run_stop_count()),
+        reschedule(mock_state(0)),
+        ?assertEqual({0, 2}, run_stop_count())
+    end).
+
+
+t_excess_prefer_continuous_first() ->
+    ?_test(begin
+        Jobs = [
+            continuous_running(1),
+            oneshot_running(2),
+            continuous_running(3)
+        ],
+        setup_jobs(Jobs),
+        ?assertEqual({3, 0}, run_stop_count()),
+        ?assertEqual({1, 0}, oneshot_run_stop_count()),
+        reschedule(mock_state(2)),
+        ?assertEqual({2, 1}, run_stop_count()),
+        ?assertEqual({1, 0}, oneshot_run_stop_count()),
+        reschedule(mock_state(1)),
+        ?assertEqual({1, 0}, oneshot_run_stop_count()),
+        reschedule(mock_state(0)),
+        ?assertEqual({0, 1}, oneshot_run_stop_count())
+    end).
+
+
+t_stop_oldest_first() ->
+    ?_test(begin
+        Jobs = [
+            continuous_running(7),
+            continuous_running(4),
+            continuous_running(5)
+        ],
+        setup_jobs(Jobs),
+        reschedule(mock_state(2)),
+        ?assertEqual({2, 1}, run_stop_count()),
+        ?assertEqual([4], jobs_stopped()),
+        reschedule(mock_state(1)),
+        ?assertEqual([7], jobs_running())
+    end).
+
+
+t_start_oldest_first() ->
+    ?_test(begin
+        setup_jobs([continuous(7), continuous(2), continuous(5)]),
+        reschedule(mock_state(1)),
+        ?assertEqual({1, 2}, run_stop_count()),
+        ?assertEqual([2], jobs_running()),
+        reschedule(mock_state(2)),
+        ?assertEqual([7], jobs_stopped())
+    end).
+
+
+t_dont_stop_if_nothing_pending() ->
+    ?_test(begin
+        setup_jobs([continuous_running(1), continuous_running(2)]),
+        reschedule(mock_state(2)),
+        ?assertEqual({2, 0}, run_stop_count())
+    end).
+
+
+t_max_churn_limits_number_of_rotated_jobs() ->
+    ?_test(begin
+        Jobs = [
+            continuous(1),
+            continuous_running(2),
+            continuous(3),
+            continuous_running(4)
+        ],
+        setup_jobs(Jobs),
+        reschedule(mock_state(2, 1)),
+        ?assertEqual([2, 3], jobs_stopped())
+    end).
+
+
+t_if_pending_less_than_running_start_all_pending() ->
+    ?_test(begin
+        Jobs = [
+            continuous(1),
+            continuous_running(2),
+            continuous(3),
+            continuous_running(4),
+            continuous_running(5)
+        ],
+        setup_jobs(Jobs),
+        reschedule(mock_state(3)),
+        ?assertEqual([1, 2, 5], jobs_running())
+    end).
+
+
+t_running_less_than_pending_swap_all_running() ->
+    ?_test(begin
+        Jobs = [
+            continuous(1),
+            continuous(2),
+            continuous(3),
+            continuous_running(4),
+            continuous_running(5)
+        ],
+        setup_jobs(Jobs),
+        reschedule(mock_state(2)),
+        ?assertEqual([3, 4, 5], jobs_stopped())
+    end).
+
+
+t_oneshot_dont_get_rotated() ->
+    ?_test(begin
+        setup_jobs([oneshot_running(1), continuous(2)]),
+        reschedule(mock_state(1)),
+        ?assertEqual([1], jobs_running())
+    end).
+
+
+t_rotate_continuous_only_if_mixed() ->
+    ?_test(begin
+        setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]),
+        reschedule(mock_state(2)),
+        ?assertEqual([1, 2], jobs_running())
+    end).
+
+
+t_oneshot_dont_get_starting_priority() ->
+    ?_test(begin
+        setup_jobs([continuous(1), oneshot(2), continuous_running(3)]),
+        reschedule(mock_state(1)),
+        ?assertEqual([1], jobs_running())
+    end).
+
+
+% This tested in other test cases, it is here to mainly make explicit a property
+% of one-shot replications -- they can starve other jobs if they "take control" of
+% all the available scheduler slots.
+t_oneshot_will_hog_the_scheduler() ->
+    ?_test(begin
+        Jobs = [
+            oneshot_running(1),
+            oneshot_running(2),
+            oneshot(3),
+            continuous(4)
+        ],
+        setup_jobs(Jobs),
+        reschedule(mock_state(2)),
+        ?assertEqual([1, 2], jobs_running())
+    end).
+
+
+t_if_excess_is_trimmed_rotation_doesnt_happen() ->
+    ?_test(begin
+        Jobs = [
+            continuous(1),
+            continuous_running(2),
+            continuous_running(3)
+        ],
+        setup_jobs(Jobs),
+        reschedule(mock_state(1)),
+        ?assertEqual([3], jobs_running())
+    end).
+
+
+t_if_transient_job_crashes_it_gets_removed() ->
+    ?_test(begin
+        Pid = mock_pid(),
+        Job =  #job{
+            id = job1,
+            pid = Pid,
+            history = [added()],
+            rep = #rep{db_name = null, options = [{continuous, true}]}
+        },
+        setup_jobs([Job]),
+        ?assertEqual(1, ets:info(?MODULE, size)),
+        State = #state{max_history = 3},
+        {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, State),
+        ?assertEqual(0, ets:info(?MODULE, size))
+   end).
+
+
+t_if_permanent_job_crashes_it_stays_in_ets() ->
+    ?_test(begin
+        Pid = mock_pid(),
+        Job =  #job{
+            id = job1,
+            pid = Pid,
+            history = [added()],
+            rep = #rep{db_name = <<"db1">>, options = [{continuous, true}]}
+        },
+        setup_jobs([Job]),
+        ?assertEqual(1, ets:info(?MODULE, size)),
+        State = #state{max_jobs =1, max_history = 3},
+        {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, State),
+        ?assertEqual(1, ets:info(?MODULE, size)),
+        [Job1] = ets:lookup(?MODULE, job1),
+        [Latest | _] = Job1#job.history,
+        ?assertMatch({{crashed, failed}, _}, Latest)
+   end).
+
+
+% Test helper functions
+
+setup() ->
+    catch ets:delete(?MODULE),
+    meck:expect(couch_log, notice, 2, ok),
+    meck:expect(couch_log, warning, 2, ok),
+    meck:expect(couch_log, error, 2, ok),
+    meck:expect(couch_replicator_scheduler_sup, terminate_child, 1, ok),
+    meck:expect(couch_stats, increment_counter, 1, ok),
+    meck:expect(couch_stats, update_gauge, 2, ok),
+    Pid = mock_pid(),
+    meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}).
+
+
+teardown(_) ->
+    catch ets:delete(?MODULE),
+    meck:unload().
+
+
+setup_jobs(Jobs) when is_list(Jobs) ->
+    ?MODULE = ets:new(?MODULE, [named_table, {keypos, #job.id}]),
+    ets:insert(?MODULE, Jobs).
+
+
+all_jobs() ->
+    lists:usort(ets:tab2list(?MODULE)).
+
+
+jobs_stopped() ->
+    [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined].
+
+
+jobs_running() ->
+    [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined].
+
+
+run_stop_count() ->
+    {length(jobs_running()), length(jobs_stopped())}.
+
+
+oneshot_run_stop_count() ->
+    Running = [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined,
+        not is_continuous(Job)],
+    Stopped = [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined,
+        not is_continuous(Job)],
+    {length(Running), length(Stopped)}.
+
+
+mock_state(MaxJobs) ->
+    #state{
+        max_jobs = MaxJobs,
+        max_churn = ?DEFAULT_MAX_CHURN,
+        max_history = ?DEFAULT_MAX_HISTORY
+    }.
+
+mock_state(MaxJobs, MaxChurn) ->
+    #state{
+        max_jobs = MaxJobs,
+        max_churn = MaxChurn,
+        max_history = ?DEFAULT_MAX_HISTORY
+    }.
+
+
+continuous(Id) when is_integer(Id) ->
+    Started = Id,
+    Hist = [stopped(Started+1), started(Started), added()],
+    #job{
+        id = Id,
+        history = Hist,
+        rep = #rep{options = [{continuous, true}]}
+    }.
+
+
+continuous_running(Id) when is_integer(Id) ->
+    Started = Id,
+    Pid = mock_pid(),
+    #job{
+        id = Id,
+        history = [started(Started), added()],
+        rep = #rep{options = [{continuous, true}]},
+        pid = Pid,
+        monitor = monitor(process, Pid)
+    }.
+
+
+oneshot(Id) when is_integer(Id) ->
+    Started = Id,
+    Hist = [stopped(Started + 1), started(Started), added()],
+    #job{id = Id, history = Hist, rep = #rep{options = []}}.
+
+
+oneshot_running(Id) when is_integer(Id) ->
+    Started = Id,
+    Pid = mock_pid(),
+    #job{
+        id = Id,
+        history = [started(Started), added()],
+        rep = #rep{options = []},
+        pid = Pid,
+        monitor = monitor(process, Pid)
+    }.
+
+
+testjob(Hist) when is_list(Hist) ->
+    #job{history = Hist}.
+
+
+mock_pid() ->
+   list_to_pid("<0.999.999>").
+
+crashed() ->
+    crashed(0).
+
+
+crashed(WhenSec) when is_integer(WhenSec)->
+    {{crashed, some_reason}, {0, WhenSec, 0}};
+crashed({MSec, Sec, USec}) ->
+    {{crashed, some_reason}, {MSec, Sec, USec}}.
+
+
+started() ->
+    started(0).
+
+
+started(WhenSec) when is_integer(WhenSec)->
+    {started, {0, WhenSec, 0}};
+
+started({MSec, Sec, USec}) ->
+    {started, {MSec, Sec, USec}}.
+
+
+stopped() ->
+    stopped(0).
+
+
+stopped(WhenSec) ->
+    {stopped, {0, WhenSec, 0}}.
+
+added() ->
+    {added, {0, 0, 0}}.
+
+-endif.
+
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.hrl b/src/couch_replicator/src/couch_replicator_scheduler.hrl
new file mode 100644
index 0000000..5203b0c
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_scheduler.hrl
@@ -0,0 +1,15 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-type job_id() :: term().
+-type job_args() :: term().
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
new file mode 100644
index 0000000..1c9faaf
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -0,0 +1,945 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler_job).
+-behaviour(gen_server).
+-vsn(1).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+
+%% public api
+-export([start_link/1]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+-export([format_status/2]).
+
+
+%% imports
+-import(couch_util, [
+    get_value/2,
+    get_value/3,
+    to_binary/1
+]).
+
+-import(couch_replicator_utils, [
+    start_db_compaction_notifier/2,
+    stop_db_compaction_notifier/1,
+    pp_rep_id/1
+]).
+
+
+%% definitions
+-define(LOWEST_SEQ, 0).
+-define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+-record(rep_state, {
+    rep_details,
+    source_name,
+    target_name,
+    source,
+    target,
+    history,
+    checkpoint_history,
+    start_seq,
+    committed_seq,
+    current_through_seq,
+    seqs_in_progress = [],
+    highest_seq_done = {0, ?LOWEST_SEQ},
+    source_log,
+    target_log,
+    rep_starttime,
+    src_starttime,
+    tgt_starttime,
+    timer, % checkpoint timer
+    changes_queue,
+    changes_manager,
+    changes_reader,
+    workers,
+    stats = couch_replicator_stats:new(),
+    session_id,
+    source_db_compaction_notifier = nil,
+    target_db_compaction_notifier = nil,
+    source_monitor = nil,
+    target_monitor = nil,
+    source_seq = nil,
+    use_checkpoints = true,
+    checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
+    type = db,
+    view = nil
+}).
+
+start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
+    RepChildId = BaseId ++ Ext,
+    Source = couch_replicator_api_wrap:db_uri(Src),
+    Target = couch_replicator_api_wrap:db_uri(Tgt),
+    ServerName = {global, {?MODULE, Rep#rep.id}},
+
+    case gen_server:start_link(ServerName, ?MODULE, Rep, []) of
+        {ok, Pid} ->
+            couch_log:notice("starting new replication `~s` at ~p (`~s` -> `~s`)",
+                             [RepChildId, Pid, Source, Target]),
+            {ok, Pid};
+        {error, Reason} ->
+            couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)",
+                           [RepChildId, Source, Target]),
+            {error, Reason}
+    end.
+
+init(InitArgs) ->
+    {ok, InitArgs, 0}.
+
+do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
+    process_flag(trap_exit, true),
+
+    #rep_state{
+        source = Source,
+        target = Target,
+        source_name = SourceName,
+        target_name = TargetName,
+        start_seq = {_Ts, StartSeq},
+        committed_seq = {_, CommittedSeq},
+        highest_seq_done = {_, HighestSeq},
+        checkpoint_interval = CheckpointInterval
+    } = State = init_state(Rep),
+
+    NumWorkers = get_value(worker_processes, Options),
+    BatchSize = get_value(worker_batch_size, Options),
+    {ok, ChangesQueue} = couch_work_queue:new([
+        {max_items, BatchSize * NumWorkers * 2},
+        {max_size, 100 * 1024 * NumWorkers}
+    ]),
+    % This starts the _changes reader process. It adds the changes from
+    % the source db to the ChangesQueue.
+    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
+        StartSeq, Source, ChangesQueue, Options
+    ),
+    % Changes manager - responsible for dequeing batches from the changes queue
+    % and deliver them to the worker processes.
+    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
+    % This starts the worker processes. They ask the changes queue manager for a
+    % a batch of _changes rows to process -> check which revs are missing in the
+    % target, and for the missing ones, it copies them from the source to the target.
+    MaxConns = get_value(http_connections, Options),
+    Workers = lists:map(
+        fun(_) ->
+            couch_stats:increment_counter([couch_replicator, workers_started]),
+            {ok, Pid} = couch_replicator_worker:start_link(
+                self(), Source, Target, ChangesManager, MaxConns),
+            Pid
+        end,
+        lists:seq(1, NumWorkers)),
+
+    couch_task_status:add_task([
+        {type, replication},
+        {user, UserCtx#user_ctx.name},
+        {replication_id, ?l2b(BaseId ++ Ext)},
+        {database, Rep#rep.db_name},
+        {doc_id, Rep#rep.doc_id},
+        {source, ?l2b(SourceName)},
+        {target, ?l2b(TargetName)},
+        {continuous, get_value(continuous, Options, false)},
+        {revisions_checked, 0},
+        {missing_revisions_found, 0},
+        {docs_read, 0},
+        {docs_written, 0},
+        {changes_pending, get_pending_count(State)},
+        {doc_write_failures, 0},
+        {source_seq, HighestSeq},
+        {checkpointed_source_seq, CommittedSeq},
+        {checkpoint_interval, CheckpointInterval}
+    ]),
+    couch_task_status:set_update_frequency(1000),
+
+    % Until OTP R14B03:
+    %
+    % Restarting a temporary supervised child implies that the original arguments
+    % (#rep{} record) specified in the MFA component of the supervisor
+    % child spec will always be used whenever the child is restarted.
+    % This implies the same replication performance tunning parameters will
+    % always be used. The solution is to delete the child spec (see
+    % cancel_replication/1) and then start the replication again, but this is
+    % unfortunately not immune to race conditions.
+
+    couch_log:notice("Replication `~p` is using:~n"
+        "~c~p worker processes~n"
+        "~ca worker batch size of ~p~n"
+        "~c~p HTTP connections~n"
+        "~ca connection timeout of ~p milliseconds~n"
+        "~c~p retries per request~n"
+        "~csocket options are: ~s~s",
+        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
+            MaxConns, $\t, get_value(connection_timeout, Options),
+            $\t, get_value(retries, Options),
+            $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
+            case StartSeq of
+            ?LOWEST_SEQ ->
+                "";
+            _ ->
+                io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
+            end]),
+
+    couch_log:debug("Worker pids are: ~p", [Workers]),
+
+    doc_update_triggered(Rep),
+
+    {ok, State#rep_state{
+            changes_queue = ChangesQueue,
+            changes_manager = ChangesManager,
+            changes_reader = ChangesReader,
+            workers = Workers
+        }
+    }.
+
+adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
+    Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
+    couch_log:notice(Msg, [RepId]),
+    Src#httpdb{http_connections = 2};
+
+adjust_maxconn(Src, _RepId) ->
+    Src.
+
+handle_info(shutdown, St) ->
+    {stop, shutdown, St};
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
+    couch_log:error("Source database is down. Reason: ~p", [Why]),
+    {stop, source_db_down, St};
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
+    couch_log:error("Target database is down. Reason: ~p", [Why]),
+    {stop, target_db_down, St};
+
+handle_info({'EXIT', Pid, max_backoff}, State) ->
+    couch_log:error("Max backoff reached child process ~p", [Pid]),
+    {stop, {shutdown, max_backoff}, State};
+
+handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
+    couch_log:error("Max backoff reached child process ~p", [Pid]),
+    {stop, {shutdown, max_backoff}, State};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+    couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
+    {stop, changes_reader_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
+    couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
+    {stop, changes_manager_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+    couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
+    couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
+    {stop, changes_queue_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
+    case Workers -- [Pid] of
+    Workers ->
+        couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
+        {noreply, State#rep_state{workers = Workers}};
+        %% not clear why a stop was here before
+        %%{stop, {unknown_process_died, Pid, normal}, State};
+    [] ->
+        catch unlink(State#rep_state.changes_manager),
+        catch exit(State#rep_state.changes_manager, kill),
+        do_last_checkpoint(State);
+    Workers2 ->
+        {noreply, State#rep_state{workers = Workers2}}
+    end;
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
+    State2 = cancel_timer(State),
+    case lists:member(Pid, Workers) of
+    false ->
+        {stop, {unknown_process_died, Pid, Reason}, State2};
+    true ->
+        couch_stats:increment_counter([couch_replicator, worker_deaths]),
+        couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
+        {stop, {worker_died, Pid, Reason}, State2}
+    end;
+
+handle_info(timeout, InitArgs) ->
+    try do_init(InitArgs) of {ok, State} ->
+        {noreply, State}
+    catch
+        exit:{http_request_failed, _, _, max_backoff} ->
+            {stop, {shutdown, max_backoff}, {error, InitArgs}};
+        Class:Error ->
+            ShutdownReason = {error, replication_start_error(Error)},
+            % Shutdown state is a hack as it is not really the state of the
+            % gen_server (it failed to initialize, so it doesn't have one).
+            % Shutdown state is used to pass extra info about why start failed.
+            ShutdownState = {error, Class, erlang:get_stacktrace(), InitArgs},
+            {stop, {shutdown, ShutdownReason}, ShutdownState}
+    end.
+
+handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
+    {reply, {ok, Rep}, State};
+
+handle_call({add_stats, Stats}, From, State) ->
+    gen_server:reply(From, ok),
+    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
+    {noreply, State#rep_state{stats = NewStats}};
+
+handle_call({report_seq_done, Seq, StatsInc}, From,
+    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
+        current_through_seq = ThroughSeq, stats = Stats} = State) ->
+    gen_server:reply(From, ok),
+    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+    [] ->
+        {Seq, []};
+    [Seq | Rest] ->
+        {Seq, Rest};
+    [_ | _] ->
+        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+    end,
+    NewHighestDone = lists:max([HighestDone, Seq]),
+    NewThroughSeq = case NewSeqsInProgress of
+    [] ->
+        lists:max([NewThroughSeq0, NewHighestDone]);
+    _ ->
+        NewThroughSeq0
+    end,
+    couch_log:debug("Worker reported seq ~p, through seq was ~p, "
+        "new through seq is ~p, highest seq done was ~p, "
+        "new highest seq done is ~p~n"
+        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+    NewState = State#rep_state{
+        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
+        current_through_seq = NewThroughSeq,
+        seqs_in_progress = NewSeqsInProgress,
+        highest_seq_done = NewHighestDone
+    },
+    update_task(NewState),
+    {noreply, NewState}.
+
+
+handle_cast({db_compacted, DbName},
+    #rep_state{source = #db{name = DbName} = Source} = State) ->
+    {ok, NewSource} = couch_db:reopen(Source),
+    {noreply, State#rep_state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+    #rep_state{target = #db{name = DbName} = Target} = State) ->
+    {ok, NewTarget} = couch_db:reopen(Target),
+    {noreply, State#rep_state{target = NewTarget}};
+
+handle_cast(checkpoint, State) ->
+    case do_checkpoint(State) of
+    {ok, NewState} ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+        {noreply, NewState#rep_state{timer = start_timer(State)}};
+    Error ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+        {stop, Error, State}
+    end;
+
+handle_cast({report_seq, Seq},
+    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
+
+
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
+    {ok, State}.
+
+
+headers_strip_creds([], Acc) ->
+    lists:reverse(Acc);
+headers_strip_creds([{Key, Value0} | Rest], Acc) ->
+    Value = case string:to_lower(Key) of
+    "authorization" ->
+        "****";
+    _ ->
+        Value0
+    end,
+    headers_strip_creds(Rest, [{Key, Value} | Acc]).
+
+
+httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
+    HttpDb#httpdb{
+        url = couch_util:url_strip_password(Url),
+        headers = headers_strip_creds(Headers, [])
+    };
+httpdb_strip_creds(LocalDb) ->
+    LocalDb.
+
+
+rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
+    Rep#rep{
+        source = httpdb_strip_creds(Source),
+        target = httpdb_strip_creds(Target)
+    }.
+
+
+state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
+    % #rep_state contains the source and target at the top level and also
+    % in the nested #rep_details record
+    State#rep_state{
+        rep_details = rep_strip_creds(Rep),
+        source = httpdb_strip_creds(Source),
+        target = httpdb_strip_creds(Target)
+    }.
+
+
+terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
+    checkpoint_history = CheckpointHistory} = State) ->
+    terminate_cleanup(State),
+    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
+    doc_update_completed(Rep, rep_stats(State));
+
+terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
+    % Replication stopped via _scheduler_sup:terminate_child/1, which can be
+    % occur during regular scheduler operation or when job is removed from
+    % the scheduler.
+    State1 = case do_checkpoint(State) of
+        {ok, NewState} ->
+            NewState;
+        Error ->
+            LogMsg = "~p : Failed last checkpoint. Job: ~p Error: ~p",
+            couch_log:error(LogMsg, [?MODULE, RepId, Error]),
+            State
+    end,
+    couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}),
+    terminate_cleanup(State1);
+
+terminate({shutdown, max_backoff}, {error, InitArgs}) ->
+    #rep{id = {BaseId, Ext} = RepId} = InitArgs,
+    couch_stats:increment_counter([couch_replicator, failed_starts]),
+    couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]),
+    couch_replicator_notifier:notify({error, RepId, max_backoff});
+
+terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) ->
+    #rep{id=RepId} = InitArgs,
+    couch_stats:increment_counter([couch_replicator, failed_starts]),
+    CleanInitArgs = rep_strip_creds(InitArgs),
+    couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
+             [Class, Error, CleanInitArgs, Stack]),
+    couch_replicator_notifier:notify({error, RepId, Error});
+
+terminate({shutdown, max_backoff}, State) ->
+    #rep_state{
+        source_name = Source,
+        target_name = Target,
+        rep_details = #rep{id = {BaseId, Ext} = RepId}
+    } = State,
+    couch_log:error("Replication `~s` (`~s` -> `~s`) reached max backoff",
+        [BaseId ++ Ext, Source, Target]),
+    terminate_cleanup(State),
+    couch_replicator_notifier:notify({error, RepId, max_backoff});
+
+terminate(Reason, State) ->
+#rep_state{
+        source_name = Source,
+        target_name = Target,
+        rep_details = #rep{id = {BaseId, Ext} = RepId}
+    } = State,
+    couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
+        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
+    terminate_cleanup(State),
+    couch_replicator_notifier:notify({error, RepId, Reason}).
+
+terminate_cleanup(State) ->
+    update_task(State),
+    stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
+    stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
+    couch_replicator_api_wrap:db_close(State#rep_state.source),
+    couch_replicator_api_wrap:db_close(State#rep_state.target).
+
+
+format_status(_Opt, [_PDict, State]) ->
+    [{data, [{"State", state_strip_creds(State)}]}].
+
+
+-spec doc_update_triggered(#rep{}) -> ok.
+doc_update_triggered(#rep{db_name = null}) ->
+    ok;
+doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
+    case couch_replicator_doc_processor:update_docs() of
+        true ->
+            couch_replicator_docs:update_triggered(Rep, RepId);
+        false ->
+            ok
+    end,
+    couch_log:notice("Document `~s` triggered replication `~s`",
+        [DocId, pp_rep_id(RepId)]),
+    ok.
+
+-spec doc_update_completed(#rep{}, list()) -> ok.
+doc_update_completed(#rep{db_name = null}, _Stats) ->
+    ok;
+doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
+    start_time = StartTime}, Stats) ->
+    couch_replicator_docs:update_doc_completed(DbName, DocId, Stats, StartTime),
+    couch_log:notice("Replication `~s` completed (triggered by `~s`)",
+        [pp_rep_id(RepId), DocId]),
+    ok.
+
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+    highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
+    {stop, normal, cancel_timer(State)};
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+    highest_seq_done = Seq} = State) ->
+    case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
+    {ok, NewState} ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+        {stop, normal, cancel_timer(NewState)};
+    Error ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+        {stop, Error, State}
+    end.
+
+
+start_timer(State) ->
+    After = State#rep_state.checkpoint_interval,
+    case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
+    {ok, Ref} ->
+        Ref;
+    Error ->
+        couch_log:error("Replicator, error scheduling checkpoint:  ~p", [Error]),
+        nil
+    end.
+
+
+cancel_timer(#rep_state{timer = nil} = State) ->
+    State;
+cancel_timer(#rep_state{timer = Timer} = State) ->
+    {ok, cancel} = timer:cancel(Timer),
+    State#rep_state{timer = nil}.
+
+
+init_state(Rep) ->
+    #rep{
+        id = {BaseId, _Ext},
+        source = Src0, target = Tgt,
+        options = Options, user_ctx = UserCtx,
+        type = Type, view = View,
+        start_time = StartTime
+    } = Rep,
+    % Adjust minimum number of http source connections to 2 to avoid deadlock
+    Src = adjust_maxconn(Src0, BaseId),
+    {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
+    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
+        get_value(create_target, Options, false)),
+
+    {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
+    {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
+
+    [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
+
+    {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+    StartSeq1 = get_value(since_seq, Options, StartSeq0),
+    StartSeq = {0, StartSeq1},
+
+    SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
+
+    #doc{body={CheckpointHistory}} = SourceLog,
+    State = #rep_state{
+        rep_details = Rep,
+        source_name = couch_replicator_api_wrap:db_uri(Source),
+        target_name = couch_replicator_api_wrap:db_uri(Target),
+        source = Source,
+        target = Target,
+        history = History,
+        checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
+        start_seq = StartSeq,
+        current_through_seq = StartSeq,
+        committed_seq = StartSeq,
+        source_log = SourceLog,
+        target_log = TargetLog,
+        rep_starttime = StartTime,
+        src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
+        tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+        session_id = couch_uuids:random(),
+        source_db_compaction_notifier =
+            start_db_compaction_notifier(Source, self()),
+        target_db_compaction_notifier =
+            start_db_compaction_notifier(Target, self()),
+        source_monitor = db_monitor(Source),
+        target_monitor = db_monitor(Target),
+        source_seq = SourceSeq,
+        use_checkpoints = get_value(use_checkpoints, Options, true),
+        checkpoint_interval = get_value(checkpoint_interval, Options,
+                                        ?DEFAULT_CHECKPOINT_INTERVAL),
+        type = Type,
+        view = View
+    },
+    State#rep_state{timer = start_timer(State)}.
+
+
+find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
+    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
+    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+
+
+fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
+    lists:reverse(Acc);
+
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
+    case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
+    {error, <<"not_found">>} when Vsn > 1 ->
+        OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
+        fold_replication_logs(Dbs, Vsn - 1,
+            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
+    {error, <<"not_found">>} ->
+        fold_replication_logs(
+            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
+    {ok, Doc} when LogId =:= NewId ->
+        fold_replication_logs(
+            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
+    {ok, Doc} ->
+        MigratedLog = #doc{id = NewId, body = Doc#doc.body},
+        fold_replication_logs(
+            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
+    end.
+
+
+spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
+    spawn_link(fun() ->
+        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
+    end).
+
+changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
+    receive
+    {get_changes, From} ->
+        case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
+        closed ->
+            From ! {closed, self()};
+        {ok, Changes} ->
+            #doc_info{high_seq = Seq} = lists:last(Changes),
+            ReportSeq = {Ts, Seq},
+            ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
+            From ! {changes, self(), Changes, ReportSeq}
+        end,
+        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+    end.
+
+
+do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
+    NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
+    {ok, NewState};
+do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
+    update_task(State),
+    {ok, State};
+do_checkpoint(State) ->
+    #rep_state{
+        source_name=SourceName,
+        target_name=TargetName,
+        source = Source,
+        target = Target,
+        history = OldHistory,
+        start_seq = {_, StartSeq},
+        current_through_seq = {_Ts, NewSeq} = NewTsSeq,
+        source_log = SourceLog,
+        target_log = TargetLog,
+        rep_starttime = ReplicationStartTime,
+        src_starttime = SrcInstanceStartTime,
+        tgt_starttime = TgtInstanceStartTime,
+        stats = Stats,
+        rep_details = #rep{options = Options},
+        session_id = SessionId
+    } = State,
+    case commit_to_both(Source, Target) of
+    {source_error, Reason} ->
+         {checkpoint_commit_failure,
+             <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
+    {target_error, Reason} ->
+         {checkpoint_commit_failure,
+             <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
+    {SrcInstanceStartTime, TgtInstanceStartTime} ->
+        couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
+            [SourceName, TargetName, NewSeq]),
+        UniversalStartTime = calendar:now_to_universal_time(ReplicationStartTime),
+        StartTime = ?l2b(httpd_util:rfc1123_date(UniversalStartTime)),
+        EndTime = ?l2b(httpd_util:rfc1123_date()),
+        NewHistoryEntry = {[
+            {<<"session_id">>, SessionId},
+            {<<"start_time">>, StartTime},
+            {<<"end_time">>, EndTime},
+            {<<"start_last_seq">>, StartSeq},
+            {<<"end_last_seq">>, NewSeq},
+            {<<"recorded_seq">>, NewSeq},
+            {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
+            {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
+            {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+            {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+            {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
+        ]},
+        BaseHistory = [
+            {<<"session_id">>, SessionId},
+            {<<"source_last_seq">>, NewSeq},
+            {<<"replication_id_version">>, ?REP_ID_VERSION}
+        ] ++ case get_value(doc_ids, Options) of
+        undefined ->
+            [];
+        _DocIds ->
+            % backwards compatibility with the result of a replication by
+            % doc IDs in versions 0.11.x and 1.0.x
+            % TODO: deprecate (use same history format, simplify code)
+            [
+                {<<"start_time">>, StartTime},
+                {<<"end_time">>, EndTime},
+                {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+                {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+                {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
+            ]
+        end,
+        % limit history to 50 entries
+        NewRepHistory = {
+            BaseHistory ++
+            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+        },
+
+        try
+            {SrcRevPos, SrcRevId} = update_checkpoint(
+                Source, SourceLog#doc{body = NewRepHistory}, source),
+            {TgtRevPos, TgtRevId} = update_checkpoint(
+                Target, TargetLog#doc{body = NewRepHistory}, target),
+            NewState = State#rep_state{
+                checkpoint_history = NewRepHistory,
+                committed_seq = NewTsSeq,
+                source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+                target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+            },
+            update_task(NewState),
+            {ok, NewState}
+        catch throw:{checkpoint_commit_failure, _} = Failure ->
+            Failure
+        end;
+    {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Target database out of sync. "
+            "Try to increase max_dbs_open at the target's server.">>};
+    {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Source database out of sync. "
+            "Try to increase max_dbs_open at the source's server.">>};
+    {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Source and target databases out of "
+            "sync. Try to increase max_dbs_open at both servers.">>}
+    end.
+
+
+update_checkpoint(Db, Doc, DbType) ->
+    try
+        update_checkpoint(Db, Doc)
+    catch throw:{checkpoint_commit_failure, Reason} ->
+        throw({checkpoint_commit_failure,
+            <<"Error updating the ", (to_binary(DbType))/binary,
+                " checkpoint document: ", (to_binary(Reason))/binary>>})
+    end.
+
+update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
+    try
+        case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
+        {ok, PosRevId} ->
+            PosRevId;
+        {error, Reason} ->
+            throw({checkpoint_commit_failure, Reason})
+        end
+    catch throw:conflict ->
+        case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
+        {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
+            % This means that we were able to update successfully the
+            % checkpoint doc in a previous attempt but we got a connection
+            % error (timeout for e.g.) before receiving the success response.
+            % Therefore the request was retried and we got a conflict, as the
+            % revision we sent is not the current one.
+            % We confirm this by verifying the doc body we just got is the same
+            % that we have just sent.
+            {Pos, RevId};
+        _ ->
+            throw({checkpoint_commit_failure, conflict})
+        end
+    end.
+
+
+commit_to_both(Source, Target) ->
+    % commit the src async
+    ParentPid = self(),
+    SrcCommitPid = spawn_link(
+        fun() ->
+            Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
+            ParentPid ! {self(), Result}
+        end),
+
+    % commit tgt sync
+    TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
+
+    SourceResult = receive
+    {SrcCommitPid, Result} ->
+        unlink(SrcCommitPid),
+        receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
+        Result;
+    {'EXIT', SrcCommitPid, Reason} ->
+        {error, Reason}
+    end,
+    case TargetResult of
+    {ok, TargetStartTime} ->
+        case SourceResult of
+        {ok, SourceStartTime} ->
+            {SourceStartTime, TargetStartTime};
+        SourceError ->
+            {source_error, SourceError}
+        end;
+    TargetError ->
+        {target_error, TargetError}
+    end.
+
+
+compare_replication_logs(SrcDoc, TgtDoc) ->
+    #doc{body={RepRecProps}} = SrcDoc,
+    #doc{body={RepRecPropsTgt}} = TgtDoc,
+    case get_value(<<"session_id">>, RepRecProps) ==
+            get_value(<<"session_id">>, RepRecPropsTgt) of
+    true ->
+        % if the records have the same session id,
+        % then we have a valid replication history
+        OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
+        OldHistory = get_value(<<"history">>, RepRecProps, []),
+        {OldSeqNum, OldHistory};
+    false ->
+        SourceHistory = get_value(<<"history">>, RepRecProps, []),
+        TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
+        couch_log:notice("Replication records differ. "
+                "Scanning histories to find a common ancestor.", []),
+        couch_log:debug("Record on source:~p~nRecord on target:~p~n",
+                [RepRecProps, RepRecPropsTgt]),
+        compare_rep_history(SourceHistory, TargetHistory)
+    end.
+
+compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
+    couch_log:notice("no common ancestry -- performing full replication", []),
+    {?LOWEST_SEQ, []};
+compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
+    SourceId = get_value(<<"session_id">>, S),
+    case has_session_id(SourceId, Target) of
+    true ->
+        RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
+        couch_log:notice("found a common replication record with source_seq ~p",
+            [RecordSeqNum]),
+        {RecordSeqNum, SourceRest};
+    false ->
+        TargetId = get_value(<<"session_id">>, T),
+        case has_session_id(TargetId, SourceRest) of
+        true ->
+            RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
+            couch_log:notice("found a common replication record with source_seq ~p",
+                [RecordSeqNum]),
+            {RecordSeqNum, TargetRest};
+        false ->
+            compare_rep_history(SourceRest, TargetRest)
+        end
+    end.
+
+
+has_session_id(_SessionId, []) ->
+    false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+    case get_value(<<"session_id">>, Props, nil) of
+    SessionId ->
+        true;
+    _Else ->
+        has_session_id(SessionId, Rest)
+    end.
+
+
+db_monitor(#db{} = Db) ->
+    couch_db:monitor(Db);
+db_monitor(_HttpDb) ->
+    nil.
+
+get_pending_count(St) ->
+    Rep = St#rep_state.rep_details,
+    Timeout = get_value(connection_timeout, Rep#rep.options),
+    TimeoutMicro = Timeout * 1000,
+    case get(pending_count_state) of
+        {LastUpdate, PendingCount} ->
+            case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of
+                true ->
+                    NewPendingCount = get_pending_count_int(St),
+                    put(pending_count_state, {os:timestamp(), NewPendingCount}),
+                    NewPendingCount;
+                false ->
+                    PendingCount
+            end;
+        undefined ->
+            NewPendingCount = get_pending_count_int(St),
+            put(pending_count_state, {os:timestamp(), NewPendingCount}),
+            NewPendingCount
+    end.
+
+
+get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) ->
+    {_, Seq} = St#rep_state.highest_seq_done,
+    Db = Db0#httpdb{retries = 3},
+    case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
+    {ok, Pending} ->
+        Pending;
+    _ ->
+        null
+    end;
+get_pending_count_int(#rep_state{source = Db}=St) ->
+    {_, Seq} = St#rep_state.highest_seq_done,
+    {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
+    Pending.
+
+
+update_task(State) ->
+    #rep_state{
+        current_through_seq = {_, ThroughSeq},
+        highest_seq_done = {_, HighestSeq}
+    } = State,
+    couch_task_status:update(
+        rep_stats(State) ++ [
+        {source_seq, HighestSeq},
+        {through_seq, ThroughSeq}
+    ]).
+
+
+rep_stats(State) ->
+    #rep_state{
+        committed_seq = {_, CommittedSeq},
+        stats = Stats
+    } = State,
+    [
+        {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
+        {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
+        {docs_read, couch_replicator_stats:docs_read(Stats)},
+        {docs_written, couch_replicator_stats:docs_written(Stats)},
+        {changes_pending, get_pending_count(State)},
+        {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
+        {checkpointed_source_seq, CommittedSeq}
+    ].
+
+
+replication_start_error({unauthorized, DbUri}) ->
+    {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
+
+replication_start_error({db_not_found, DbUri}) ->
+    {db_not_found, <<"could not open ", DbUri/binary>>};
+
+replication_start_error(Error) ->
+    Error.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
new file mode 100644
index 0000000..a2cf9bd
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
@@ -0,0 +1,54 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler_sup).
+-behaviour(supervisor).
+-vsn(1).
+
+%% includes
+-include("couch_replicator.hrl").
+
+%% public api
+-export([start_link/0, start_child/1, terminate_child/1]).
+
+%% supervisor api
+-export([init/1]).
+
+%% public functions
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_child(#rep{} = Rep) ->
+    supervisor:start_child(?MODULE, [Rep]).
+
+
+terminate_child(Pid) ->
+    supervisor:terminate_child(?MODULE, Pid).
+
+%% supervisor functions
+
+init(_Args) ->
+    Start = {couch_replicator_scheduler_job, start_link, []},
+    Restart = temporary, % A crashed job is not entitled to immediate restart.
+    Shutdown = 5000,
+    Type = worker,
+    Modules = [couch_replicator_scheduler_job],
+
+    RestartStrategy = simple_one_for_one,
+    MaxR = 10,
+    MaxT = 3,
+
+    ChildSpec =
+        {undefined, Start, Restart, Shutdown, Type, Modules},
+    {ok, {{RestartStrategy, MaxR, MaxT}, [ChildSpec]}}.

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <commits@couchdb.apache.org>.

Mime
View raw message