couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vatam...@apache.org
Subject [couchdb] 01/02: Stitch scheduling replicator together.
Date Tue, 04 Apr 2017 19:06:16 GMT
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 382d4880514bc63e1c07cfa810a76485917c0bec
Author: Nick Vatamaniuc <vatamane@apache.org>
AuthorDate: Fri Nov 25 11:47:56 2016 -0500

    Stitch scheduling replicator together.
    
    Glue together all the scheduling replicator pieces.
    
    Scheduler is the main component. It can run a large number of replication jobs
    by switching between them, stopping and starting some periodically. Jobs
    which fail are backed off exponentially. Normal (non-continuous) jobs will be
    allowed to run to completion to preserve their current semantics.
    
    Scheduler behavior can configured by these configuration options in
    `[replicator]` sections:
    
     * `max_jobs` : Number of actively running replications. Making this too high
     could cause performance issues. Making it too low could mean replications jobs
     might not have enough time to make progress before getting unscheduled again.
     This parameter can be adjusted at runtime and will take effect during next
     reschudling cycle.
    
     * `interval` : Scheduling interval in milliseconds. During each reschedule
     cycle scheduler might start or stop up to "max_churn" number of jobs.
    
     * `max_churn` : Maximum number of replications to start and stop during
     rescheduling. This parameter along with "interval" defines the rate of job
     replacement. During startup, however a much larger number of jobs could be
     started (up to max_jobs) in short period of time.
    
    Replication jobs are added to the scheduler by the document processor or from
    the `couch_replicator:replicate/2` function when called from `_replicate` HTTP
    endpoint handler.
    
    Document processor listens for updates via couch_mutlidb_changes module then
    tries to add replication jobs to the scheduler. Sometimes translating a
    document update to a replication job could fail, either permantly (if document
    is malformed and missing some expected fields for example) or temporarily if
    it is a filtered replication and filter cannot be fetched. A failed filter
    fetch will be retried with an exponential backoff.
    
    couch_replicator_clustering is in charge of monitoring cluster membership
    changes. When membership changes, after a configurable quiet period, a rescan
    will be initiated. Rescan will shufle replication jobs to make sure a
    replication job is running on only one node.
    
    A new set of stats were added to introspect scheduler and doc processor
    internals.
    
    The top replication supervisor structure is `rest_for_one`. This means if
    a child crashes, all children to the "right" of it will be restarted (if
    visualized supervisor hierarchy as an upside-down tree). Clustering,
    connection pool and rate limiter are towards the "left" as they are more
    fundamental, if clustering child crashes, most other components will be
    restart. Doc process or and multi-db changes children are towards the "right".
    If they crash, they can be safely restarted without affecting already running
    replication or components like clustering or connection pool.
    
    Jira: COUCHDB-3324
---
 src/couch/src/couch_doc.erl                        |    3 +
 src/couch_replicator/priv/stats_descriptions.cfg   |  108 ++
 src/couch_replicator/src/couch_replicator.app.src  |   22 +-
 src/couch_replicator/src/couch_replicator.erl      | 1229 +++++---------------
 src/couch_replicator/src/couch_replicator.hrl      |   35 +-
 .../src/couch_replicator_api_wrap.erl              |   50 +-
 .../src/couch_replicator_api_wrap.hrl              |    3 +-
 .../src/couch_replicator_httpc.erl                 |  123 +-
 .../src/couch_replicator_js_functions.hrl          |   30 +-
 .../src/couch_replicator_manager.erl               | 1033 +---------------
 src/couch_replicator/src/couch_replicator_sup.erl  |   54 +-
 .../src/couch_replicator_utils.erl                 |  541 ++-------
 .../src/couch_replicator_worker.erl                |    3 +
 .../test/couch_replicator_compact_tests.erl        |   34 +-
 .../test/couch_replicator_httpc_pool_tests.erl     |    2 +-
 .../test/couch_replicator_many_leaves_tests.erl    |   24 +-
 .../test/couch_replicator_modules_load_tests.erl   |   11 +-
 .../test/couch_replicator_proxy_tests.erl          |   69 ++
 .../test/couch_replicator_test_helper.erl          |   14 +-
 .../couch_replicator_use_checkpoints_tests.erl     |   24 +-
 20 files changed, 856 insertions(+), 2556 deletions(-)

diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl
index 381ad4b..5e6d2b7 100644
--- a/src/couch/src/couch_doc.erl
+++ b/src/couch/src/couch_doc.erl
@@ -270,6 +270,9 @@ transfer_fields([{<<"_replication_state">>, _} = Field | Rest],
 transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest],
     #doc{body=Fields} = Doc) ->
     transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+transfer_fields([{<<"_replication_start_time">>, _} = Field | Rest],
+    #doc{body=Fields} = Doc) ->
+    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
 transfer_fields([{<<"_replication_state_reason">>, _} = Field | Rest],
     #doc{body=Fields} = Doc) ->
     transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
diff --git a/src/couch_replicator/priv/stats_descriptions.cfg b/src/couch_replicator/priv/stats_descriptions.cfg
index 2564f92..c009cc9 100644
--- a/src/couch_replicator/priv/stats_descriptions.cfg
+++ b/src/couch_replicator/priv/stats_descriptions.cfg
@@ -54,3 +54,111 @@
     {type, counter},
     {desc, <<"number of replicator workers started">>}
 ]}.
+{[couch_replicator, cluster_is_stable], [
+    {type, gauge},
+    {desc, <<"1 if cluster is stable, 0 if unstable">>}
+]}.
+{[couch_replicator, db_scans], [
+    {type, counter},
+    {desc, <<"number of time replicator db scans have been started">>}
+]}.
+{[couch_replicator, docs, dbs_created], [
+    {type, counter},
+    {desc, <<"number of db shard creations seen by replicator doc processor">>}
+]}.
+{[couch_replicator, docs, dbs_deleted], [
+    {type, counter},
+    {desc, <<"number of db shard deletions seen by replicator doc processor">>}
+]}.
+{[couch_replicator, docs, dbs_found], [
+    {type, counter},
+    {desc, <<"number of db shard found by replicator doc processor">>}
+]}.
+{[couch_replicator, docs, db_changes], [
+    {type, counter},
+    {desc, <<"number of db changes processed by replicator doc processor">>}
+]}.
+{[couch_replicator, docs, failed_state_updates], [
+    {type, counter},
+    {desc, <<"number of 'failed' state document updates">>}
+]}.
+{[couch_replicator, docs, completed_state_updates], [
+    {type, counter},
+    {desc, <<"number of 'completed' state document updates">>}
+]}.
+{[couch_replicator, jobs, adds], [
+    {type, counter},
+    {desc, <<"number of jobs added to replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, duplicate_adds], [
+    {type, counter},
+    {desc, <<"number of duplicate jobs added to replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, removes], [
+    {type, counter},
+    {desc, <<"number of jobs removed from replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, starts], [
+    {type, counter},
+    {desc, <<"number of jobs started by replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, stops], [
+    {type, counter},
+    {desc, <<"number of jobs stopped to replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, crashes], [
+    {type, counter},
+    {desc, <<"number of job crashed noticed by replicator scheduler">>}
+]}.
+{[couch_replicator, jobs, running], [
+    {type, gauge},
+    {desc, <<"replicator scheduler running jobs">>}
+]}.
+{[couch_replicator, jobs, pending], [
+    {type, gauge},
+    {desc, <<"replicator scheduler pending jobs">>}
+]}.
+{[couch_replicator, jobs, crashed], [
+    {type, gauge},
+    {desc, <<"replicator scheduler crashed jobs">>}
+]}.
+{[couch_replicator, jobs, total], [
+    {type, gauge},
+    {desc, <<"total number of replicator scheduler jobs">>}
+]}.
+{[couch_replicator, jobs, avg_running], [
+    {type, gauge},
+    {desc, <<"average of length of time current jobs have been running">>}
+]}.
+{[couch_replicator, jobs, avg_pending], [
+    {type, gauge},
+    {desc, <<"average length of time spent waiting to run">>}
+]}.
+{[couch_replicator, jobs, avg_crashed], [
+    {type, gauge},
+    {desc, <<"average of length of time since last crash">>}
+]}.
+{[couch_replicator, connection, acquires], [
+    {type, counter},
+    {desc, <<"number of times connections are shared">>}
+]}.
+{[couch_replicator, connection, creates], [
+    {type, counter},
+    {desc, <<"number of connections created">>}
+]}.
+{[couch_replicator, connection, relinquishes], [
+    {type, counter},
+    {desc, <<"number of times ownership of a connection is relinquished">>}
+]}.
+{[couch_replicator, connection, owner_crashes], [
+    {type, counter},
+    {desc, <<"number of times a connection owner crashes while owning at least one connection">>}
+]}.
+{[couch_replicator, connection, worker_crashes], [
+    {type, counter},
+    {desc, <<"number of times a worker unexpectedly terminates">>}
+]}.
+{[couch_replicator, connection, closes], [
+    {type, counter},
+    {desc, <<"number of times a worker is gracefully shut down">>}
+]}.
diff --git a/src/couch_replicator/src/couch_replicator.app.src b/src/couch_replicator/src/couch_replicator.app.src
index 4f12195..f3d3dae 100644
--- a/src/couch_replicator/src/couch_replicator.app.src
+++ b/src/couch_replicator/src/couch_replicator.app.src
@@ -14,24 +14,12 @@
     {description, "CouchDB replicator"},
     {vsn, git},
     {mod, {couch_replicator_app, []}},
-    {modules, [
-        couch_replicator,
-        couch_replicator_api_wrap,
-        couch_replicator_app,
-        couch_replicator_httpc,
-        couch_replicator_httpd,
-        couch_replicator_job_sup,
-        couch_replicator_notifier,
-        couch_replicator_manager,
-        couch_replicator_httpc_pool,
-        couch_replicator_sup,
-        couch_replicator_utils,
-        couch_replicator_worker
-    ]},
     {registered, [
-        couch_replicator,
-        couch_replicator_manager,
-        couch_replicator_job_sup
+        couch_replicator_sup,
+        couch_replication,  % couch_replication_event gen_event
+        couch_replicator_clustering,
+        couch_replicator_scheduler,
+        couch_replicator_scheduler_sup
     ]},
     {applications, [
         kernel,
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 7f0c7ee..f2b0d02 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -11,91 +11,62 @@
 % the License.
 
 -module(couch_replicator).
--behaviour(gen_server).
--vsn(1).
 
-% public API
--export([replicate/2]).
-
-% meant to be used only by the replicator database listener
--export([async_replicate/1]).
--export([cancel_replication/1]).
-
-% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
--export([format_status/2]).
-
--export([details/1]).
+-export([replicate/2, ensure_rep_db_exists/0]).
+-export([stream_active_docs_info/3, stream_terminal_docs_info/4]).
+-export([replication_states/0]).
+-export([job/1, doc/3]).
 
 -include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
-
--define(LOWEST_SEQ, 0).
-
--define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(REPLICATION_STATES, [
+    initializing,  % Just added to scheduler
+    error,         % Could not be turned into a replication job
+    running,       % Scheduled and running
+    pending,       % Scheduled and waiting to run
+    crashing,      % Scheduled but crashing, possibly backed off by the scheduler
+    completed,     % Non-continuous (normal) completed replication
+    failed         % Terminal failure, will not be retried anymore
+]).
 
 -import(couch_util, [
     get_value/2,
-    get_value/3,
-    to_binary/1
+    get_value/3
 ]).
 
--import(couch_replicator_utils, [
-    start_db_compaction_notifier/2,
-    stop_db_compaction_notifier/1
-]).
 
--record(rep_state, {
-    rep_details,
-    source_name,
-    target_name,
-    source,
-    target,
-    history,
-    checkpoint_history,
-    start_seq,
-    committed_seq,
-    current_through_seq,
-    seqs_in_progress = [],
-    highest_seq_done = {0, ?LOWEST_SEQ},
-    source_log,
-    target_log,
-    rep_starttime,
-    src_starttime,
-    tgt_starttime,
-    timer, % checkpoint timer
-    changes_queue,
-    changes_manager,
-    changes_reader,
-    workers,
-    stats = couch_replicator_stats:new(),
-    session_id,
-    source_db_compaction_notifier = nil,
-    target_db_compaction_notifier = nil,
-    source_monitor = nil,
-    target_monitor = nil,
-    source_seq = nil,
-    use_checkpoints = true,
-    checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
-    type = db,
-    view = nil
-}).
+-type user_doc_cb() :: fun(({[_]}, any()) -> any()).
+-type query_acc() :: {binary(), user_doc_cb(), any()}.
 
 
+-spec replicate({[_]}, #user_ctx{}) ->
+    {ok, {continuous, binary()}} |
+    {ok, {[_]}} |
+    {ok, {cancelled, binary()}} |
+    {error, any()}.
 replicate(PostBody, Ctx) ->
-    {ok, #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep} =
-        couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+    {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+    Rep = Rep0#rep{start_time = os:timestamp()},
+    #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
     case get_value(cancel, Options, false) of
     true ->
-        case get_value(id, Options, nil) of
+        CancelRepId = case get_value(id, Options, nil) of
         nil ->
-            cancel_replication(RepId);
+            RepId;
         RepId2 ->
-            cancel_replication(RepId2, UserCtx)
+            RepId2
+        end,
+        case check_authorization(CancelRepId, UserCtx) of
+        ok ->
+            cancel_replication(CancelRepId);
+        not_found ->
+            {error, not_found}
         end;
     false ->
+        check_authorization(RepId, UserCtx),
         {ok, Listener} = rep_result_listener(RepId),
         Result = do_replication_loop(Rep),
         couch_replicator_notifier:stop(Listener),
@@ -103,75 +74,34 @@ replicate(PostBody, Ctx) ->
     end.
 
 
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
-    case async_replicate(Rep) of
-    {ok, _Pid} ->
-        case get_value(continuous, Options, false) of
-        true ->
-            {ok, {continuous, ?l2b(BaseId ++ Ext)}};
-        false ->
-            wait_for_result(Id)
-        end;
-    Error ->
-        Error
-    end.
+% This is called from supervisor. Must respect supervisor protocol so
+% it returns `ignore`.
+-spec ensure_rep_db_exists() -> ignore.
+ensure_rep_db_exists() ->
+    {ok, _Db} = couch_replicator_docs:ensure_rep_db_exists(),
+    couch_log:notice("~p : created local _replicator database", [?MODULE]),
+    ignore.
 
 
-async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
-    RepChildId = BaseId ++ Ext,
-    Source = couch_replicator_api_wrap:db_uri(Src),
-    Target = couch_replicator_api_wrap:db_uri(Tgt),
-    Timeout = get_value(connection_timeout, Rep#rep.options),
-    ChildSpec = {
-        RepChildId,
-        {gen_server, start_link, [?MODULE, Rep, [{timeout, Timeout}]]},
-        temporary,
-        250,
-        worker,
-        [?MODULE]
-    },
-    % All these nested cases to attempt starting/restarting a replication child
-    % are ugly and not 100% race condition free. The following patch submission
-    % is a solution:
-    %
-    % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
-    %
-    case supervisor:start_child(couch_replicator_job_sup, ChildSpec) of
-    {ok, Pid} ->
-        couch_log:notice("starting new replication `~s` at ~p (`~s` -> `~s`)",
-            [RepChildId, Pid, Source, Target]),
-        {ok, Pid};
-    {error, already_present} ->
-        case supervisor:restart_child(couch_replicator_job_sup, RepChildId) of
-        {ok, Pid} ->
-            couch_log:notice("restarting replication `~s` at ~p (`~s` -> `~s`)",
-                [RepChildId, Pid, Source, Target]),
-            {ok, Pid};
-        {error, running} ->
-            %% this error occurs if multiple replicators are racing
-            %% each other to start and somebody else won. Just grab
-            %% the Pid by calling start_child again.
-            timer:sleep(50 + random:uniform(100)),
-            async_replicate(Rep);
-        {error, {'EXIT', {badarg,
-            [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
-            % Clause to deal with a change in the supervisor module introduced
-            % in R14B02. For more details consult the thread at:
-            %     http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
-            _ = supervisor:delete_child(couch_replicator_job_sup, RepChildId),
-            async_replicate(Rep);
-        {error, _} = Error ->
-            Error
-        end;
-    {error, {already_started, Pid}} ->
-        couch_log:notice("replication `~s` already running at ~p (`~s` -> `~s`)",
-            [RepChildId, Pid, Source, Target]),
-        {ok, Pid};
-    {error, {Error, _}} ->
-        {error, Error}
+-spec do_replication_loop(#rep{}) ->
+    {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
+do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
+    case couch_replicator_scheduler:add_job(Rep) of
+    ok ->
+        ok;
+    {error, already_added} ->
+        couch_log:notice("Replication '~s' already running", [BaseId ++ Ext]),
+        ok
+    end,
+    case get_value(continuous, Options, false) of
+    true ->
+        {ok, {continuous, ?l2b(BaseId ++ Ext)}};
+    false ->
+        wait_for_result(Id)
     end.
 
 
+-spec rep_result_listener(rep_id()) -> {ok, pid()}.
 rep_result_listener(RepId) ->
     ReplyTo = self(),
     {ok, _Listener} = couch_replicator_notifier:start_link(
@@ -182,6 +112,8 @@ rep_result_listener(RepId) ->
         end).
 
 
+-spec wait_for_result(rep_id()) ->
+    {ok, any()} | {error, any()}.
 wait_for_result(RepId) ->
     receive
     {finished, RepId, RepResult} ->
@@ -191,847 +123,294 @@ wait_for_result(RepId) ->
     end.
 
 
-cancel_replication({BaseId, Extension}) ->
-    FullRepId = BaseId ++ Extension,
-    couch_log:notice("Canceling replication `~s`...", [FullRepId]),
-    case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
-    ok ->
-        couch_log:notice("Replication `~s` canceled.", [FullRepId]),
-        case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
-            ok ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            {error, not_found} ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            Error ->
-                Error
-        end;
-    Error ->
-        couch_log:error("Error canceling replication `~s`: ~p", [FullRepId, Error]),
-        Error
+-spec cancel_replication(rep_id()) ->
+    {ok, {cancelled, binary()}} | {error, not_found}.
+cancel_replication({BasedId, Extension} = RepId) ->
+    FullRepId = BasedId ++ Extension,
+    couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
+    case couch_replicator_scheduler:rep_state(RepId) of
+    #rep{} ->
+        ok = couch_replicator_scheduler:remove_job(RepId),
+        couch_log:notice("Replication '~s' cancelled", [FullRepId]),
+        {ok, {cancelled, ?l2b(FullRepId)}};
+    nil ->
+        couch_log:notice("Replication '~s' not found", [FullRepId]),
+        {error, not_found}
     end.
 
-cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
-    case lists:member(<<"_admin">>, Roles) of
-    true ->
-        cancel_replication(RepId);
-    false ->
-        case find_replicator(RepId) of
-        {ok, Pid} ->
-            case details(Pid) of
-            {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
-                cancel_replication(RepId);
-            {ok, _} ->
-                throw({unauthorized,
-                    <<"Can't cancel a replication triggered by another user">>});
-            Error ->
-                Error
-            end;
-        Error ->
-            Error
-        end
-    end.
 
-find_replicator({BaseId, Ext} = _RepId) ->
-    case lists:keysearch(
-        BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
-    {value, {_, Pid, _, _}} when is_pid(Pid) ->
-            {ok, Pid};
-    _ ->
-            {error, not_found}
-    end.
-
-details(Pid) ->
-    case (catch gen_server:call(Pid, get_details)) of
-    {ok, Rep} ->
-        {ok, Rep};
-    {'EXIT', {noproc, {gen_server, call, _}}} ->
-        {error, not_found};
-    Error ->
-        throw(Error)
+-spec replication_states() -> [atom()].
+replication_states() ->
+    ?REPLICATION_STATES.
+
+
+-spec stream_terminal_docs_info(binary(), user_doc_cb(), any(), [atom()]) -> any().
+stream_terminal_docs_info(Db, Cb, UserAcc, States) ->
+    DDoc = <<"_replicator">>,
+    View = <<"terminal_states">>,
+    QueryCb = fun handle_replicator_doc_query/2,
+    Args = #mrargs{view_type = map, reduce = false},
+    Acc = {Db, Cb, UserAcc, States},
+    try fabric:query_view(Db, DDoc, View, QueryCb, Acc, Args) of
+    {ok, {Db, Cb, UserAcc1, States}} ->
+        UserAcc1
+    catch
+        error:database_does_not_exist ->
+            UserAcc;
+        error:{badmatch, {not_found, Reason}} ->
+            Msg = "Could not find _design/~s ~s view in replicator db ~s : ~p",
+            couch_log:error(Msg, [DDoc, View, Db, Reason]),
+            couch_replicator_docs:ensure_cluster_rep_ddoc_exists(Db),
+            timer:sleep(1000),
+            stream_terminal_docs_info(Db, Cb, UserAcc, States)
     end.
 
-init(InitArgs) ->
-    {ok, InitArgs, 0}.
-
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
-    process_flag(trap_exit, true),
-
-    random:seed(os:timestamp()),
-
-    #rep_state{
-        source = Source,
-        target = Target,
-        source_name = SourceName,
-        target_name = TargetName,
-        start_seq = {_Ts, StartSeq},
-        committed_seq = {_, CommittedSeq},
-        highest_seq_done = {_, HighestSeq},
-        checkpoint_interval = CheckpointInterval
-    } = State = init_state(Rep),
-
-    NumWorkers = get_value(worker_processes, Options),
-    BatchSize = get_value(worker_batch_size, Options),
-    {ok, ChangesQueue} = couch_work_queue:new([
-        {max_items, BatchSize * NumWorkers * 2},
-        {max_size, 100 * 1024 * NumWorkers}
-    ]),
-    % This starts the _changes reader process. It adds the changes from
-    % the source db to the ChangesQueue.
-    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
-        StartSeq, Source, ChangesQueue, Options
-    ),
-    % Changes manager - responsible for dequeing batches from the changes queue
-    % and deliver them to the worker processes.
-    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
-    % This starts the worker processes. They ask the changes queue manager for a
-    % a batch of _changes rows to process -> check which revs are missing in the
-    % target, and for the missing ones, it copies them from the source to the target.
-    MaxConns = get_value(http_connections, Options),
-    Workers = lists:map(
-        fun(_) ->
-            couch_stats:increment_counter([couch_replicator, workers_started]),
-            {ok, Pid} = couch_replicator_worker:start_link(
-                self(), Source, Target, ChangesManager, MaxConns),
-            Pid
-        end,
-        lists:seq(1, NumWorkers)),
 
-    couch_task_status:add_task([
-        {type, replication},
-        {user, UserCtx#user_ctx.name},
-        {replication_id, ?l2b(BaseId ++ Ext)},
-        {database, Rep#rep.db_name},
-        {doc_id, Rep#rep.doc_id},
-        {source, ?l2b(SourceName)},
-        {target, ?l2b(TargetName)},
-        {continuous, get_value(continuous, Options, false)},
-        {revisions_checked, 0},
-        {missing_revisions_found, 0},
-        {docs_read, 0},
-        {docs_written, 0},
-        {changes_pending, get_pending_count(State)},
-        {doc_write_failures, 0},
-        {source_seq, HighestSeq},
-        {checkpointed_source_seq, CommittedSeq},
-        {checkpoint_interval, CheckpointInterval}
-    ]),
-    couch_task_status:set_update_frequency(1000),
+-spec stream_active_docs_info(user_doc_cb(), any(), [atom()]) -> any().
+stream_active_docs_info(Cb, UserAcc, States) ->
+    Nodes = lists:sort([node() | nodes()]),
+    stream_active_docs_info(Nodes, Cb, UserAcc, States).
 
-    % Until OTP R14B03:
-    %
-    % Restarting a temporary supervised child implies that the original arguments
-    % (#rep{} record) specified in the MFA component of the supervisor
-    % child spec will always be used whenever the child is restarted.
-    % This implies the same replication performance tunning parameters will
-    % always be used. The solution is to delete the child spec (see
-    % cancel_replication/1) and then start the replication again, but this is
-    % unfortunately not immune to race conditions.
 
-    couch_log:notice("Replication `~p` is using:~n"
-        "~c~p worker processes~n"
-        "~ca worker batch size of ~p~n"
-        "~c~p HTTP connections~n"
-        "~ca connection timeout of ~p milliseconds~n"
-        "~c~p retries per request~n"
-        "~csocket options are: ~s~s",
-        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
-            MaxConns, $\t, get_value(connection_timeout, Options),
-            $\t, get_value(retries, Options),
-            $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
-            case StartSeq of
-            ?LOWEST_SEQ ->
-                "";
-            _ ->
-                io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
-            end]),
+-spec stream_active_docs_info([node()], user_doc_cb(), any(), [atom()]) -> any().
+stream_active_docs_info([], _Cb, UserAcc, _States) ->
+    UserAcc;
 
-    couch_log:debug("Worker pids are: ~p", [Workers]),
-
-    couch_replicator_manager:replication_started(Rep),
-
-    {ok, State#rep_state{
-            changes_queue = ChangesQueue,
-            changes_manager = ChangesManager,
-            changes_reader = ChangesReader,
-            workers = Workers
-        }
-    }.
-
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
-    Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
-    couch_log:notice(Msg, [RepId]),
-    Src#httpdb{http_connections = 2};
-
-adjust_maxconn(Src, _RepId) ->
-    Src.
-
-handle_info(shutdown, St) ->
-    {stop, shutdown, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
-    couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
-    couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
-    couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
-    case Workers -- [Pid] of
-    Workers ->
-        couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
-        {noreply, State#rep_state{workers = Workers}};
-        %% not clear why a stop was here before
-        %%{stop, {unknown_process_died, Pid, normal}, State};
-    [] ->
-        catch unlink(State#rep_state.changes_manager),
-        catch exit(State#rep_state.changes_manager, kill),
-        do_last_checkpoint(State);
-    Workers2 ->
-        {noreply, State#rep_state{workers = Workers2}}
-    end;
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
-    State2 = cancel_timer(State),
-    case lists:member(Pid, Workers) of
-    false ->
-        {stop, {unknown_process_died, Pid, Reason}, State2};
-    true ->
-        couch_stats:increment_counter([couch_replicator, worker_deaths]),
-        couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
-    end;
-
-handle_info(timeout, InitArgs) ->
-    try do_init(InitArgs) of {ok, State} ->
-        {noreply, State}
-    catch Class:Error ->
-        Stack = erlang:get_stacktrace(),
-        {stop, shutdown, {error, Class, Error, Stack, InitArgs}}
+stream_active_docs_info([Node | Nodes], Cb, UserAcc, States) ->
+    case rpc:call(Node, couch_replicator_doc_processor, docs, [States]) of
+        {badrpc, Reason} ->
+            ErrMsg = "Could not get replicator docs from ~p. Error: ~p",
+            couch_log:error(ErrMsg, [Node, Reason]),
+            stream_active_docs_info(Nodes, Cb, UserAcc, States);
+        Results ->
+            UserAcc1 = lists:foldl(Cb, UserAcc, Results),
+            stream_active_docs_info(Nodes, Cb, UserAcc1, States)
     end.
 
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
-    {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
-    gen_server:reply(From, ok),
-    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
-    {noreply, State#rep_state{stats = NewStats}};
 
-handle_call({report_seq_done, Seq, StatsInc}, From,
-    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
-        current_through_seq = ThroughSeq, stats = Stats} = State) ->
-    gen_server:reply(From, ok),
-    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
-    [] ->
-        {Seq, []};
-    [Seq | Rest] ->
-        {Seq, Rest};
-    [_ | _] ->
-        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+-spec handle_replicator_doc_query
+    ({row, [_]} , query_acc()) -> {ok, query_acc()};
+    ({error, any()}, query_acc()) -> {error, any()};
+    ({meta, any()}, query_acc()) -> {ok,  query_acc()};
+    (complete, query_acc()) -> {ok, query_acc()}.
+handle_replicator_doc_query({row, Props}, {Db, Cb, UserAcc, States}) ->
+    DocId = couch_util:get_value(id, Props),
+    DocStateBin = couch_util:get_value(key, Props),
+    DocState = erlang:binary_to_existing_atom(DocStateBin, utf8),
+    MapValue = couch_util:get_value(value, Props),
+    {Source, Target, StartTime, StateTime, StateInfo} = case MapValue of
+        [Src, Tgt, StartT, StateT, Info] ->
+            {Src, Tgt, StartT, StateT, Info};
+        _Other ->
+            % Handle the case where the view code was upgraded but new view code
+            % wasn't updated yet (before a _scheduler/docs request was made)
+            {null, null, null, null, null}
     end,
-    NewHighestDone = lists:max([HighestDone, Seq]),
-    NewThroughSeq = case NewSeqsInProgress of
-    [] ->
-        lists:max([NewThroughSeq0, NewHighestDone]);
-    _ ->
-        NewThroughSeq0
-    end,
-    couch_log:debug("Worker reported seq ~p, through seq was ~p, "
-        "new through seq is ~p, highest seq done was ~p, "
-        "new highest seq done is ~p~n"
-        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
-        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
-            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
-    NewState = State#rep_state{
-        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
-        current_through_seq = NewThroughSeq,
-        seqs_in_progress = NewSeqsInProgress,
-        highest_seq_done = NewHighestDone
-    },
-    update_task(NewState),
-    {noreply, NewState}.
-
-
-handle_cast({db_compacted, DbName},
-    #rep_state{source = #db{name = DbName} = Source} = State) ->
-    {ok, NewSource} = couch_db:reopen(Source),
-    {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
-    #rep_state{target = #db{name = DbName} = Target} = State) ->
-    {ok, NewTarget} = couch_db:reopen(Target),
-    {noreply, State#rep_state{target = NewTarget}};
-
-handle_cast(checkpoint, State) ->
-    #rep_state{rep_details = #rep{} = Rep} = State,
-    case couch_replicator_manager:continue(Rep) of
-    {true, _} ->
-        case do_checkpoint(State) of
-        {ok, NewState} ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-            {noreply, NewState#rep_state{timer = start_timer(State)}};
-        Error ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
-            {stop, Error, State}
-        end;
-    {false, Owner} ->
-        couch_replicator_manager:replication_usurped(Rep, Owner),
-        {stop, shutdown, State}
+    % Set the error_count to 1 if failed. This is mainly for consistency as
+    % jobs from doc_processor and scheduler will have that value set
+    ErrorCount = case DocState of failed -> 1; _ -> 0 end,
+    case filter_replicator_doc_query(DocState, States) of
+        true ->
+            EjsonInfo = {[
+                {doc_id, DocId},
+                {database, Db},
+                {id, null},
+                {source, strip_url_creds(Source)},
+                {target, strip_url_creds(Target)},
+                {state, DocState},
+                {error_count, ErrorCount},
+                {info, StateInfo},
+                {last_updated, StateTime},
+                {start_time, StartTime}
+            ]},
+            {ok, {Db, Cb, Cb(EjsonInfo, UserAcc), States}};
+        false ->
+            {ok, {Db, Cb, UserAcc, States}}
     end;
-
-handle_cast({report_seq, Seq},
-    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
-    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
-    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-code_change(_OldVsn, #rep_state{}=State, _Extra) ->
-    {ok, State}.
-
-
-headers_strip_creds([], Acc) ->
-    lists:reverse(Acc);
-headers_strip_creds([{Key, Value0} | Rest], Acc) ->
-    Value = case string:to_lower(Key) of
-    "authorization" ->
-        "****";
-    _ ->
-        Value0
-    end,
-    headers_strip_creds(Rest, [{Key, Value} | Acc]).
-
-
-httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
-    HttpDb#httpdb{
-        url = couch_util:url_strip_password(Url),
-        headers = headers_strip_creds(Headers, [])
-    };
-httpdb_strip_creds(LocalDb) ->
-    LocalDb.
-
-
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
-    Rep#rep{
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
-    % #rep_state contains the source and target at the top level and also
-    % in the nested #rep_details record
-    State#rep_state{
-        rep_details = rep_strip_creds(Rep),
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
-    checkpoint_history = CheckpointHistory} = State) ->
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
-    couch_replicator_manager:replication_completed(Rep, rep_stats(State));
-
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
-    % cancelled replication throught ?MODULE:cancel_replication/1
-    couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
-    terminate_cleanup(State);
-
-terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
-    #rep{id=RepId} = InitArgs,
-    couch_stats:increment_counter([couch_replicator, failed_starts]),
-    CleanInitArgs = rep_strip_creds(InitArgs),
-    couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
-             [Class, Error, CleanInitArgs, Stack]),
-    case Error of
-    {unauthorized, DbUri} ->
-        NotifyError = {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
-    {db_not_found, DbUri} ->
-        NotifyError = {db_not_found, <<"could not open ", DbUri/binary>>};
-    _ ->
-        NotifyError = Error
-    end,
-    couch_replicator_notifier:notify({error, RepId, NotifyError}),
-    couch_replicator_manager:replication_error(InitArgs, NotifyError);
-terminate(Reason, State) ->
-    #rep_state{
-        source_name = Source,
-        target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
-    } = State,
-    couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
-        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, Reason}),
-    couch_replicator_manager:replication_error(Rep, Reason).
-
-terminate_cleanup(State) ->
-    update_task(State),
-    stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
-    stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
-    couch_replicator_api_wrap:db_close(State#rep_state.source),
-    couch_replicator_api_wrap:db_close(State#rep_state.target).
-
-
-format_status(_Opt, [_PDict, State]) ->
-    [{data, [{"State", state_strip_creds(State)}]}].
-
-
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
-    {stop, normal, cancel_timer(State)};
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = Seq} = State) ->
-    case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
-    {ok, NewState} ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-        {stop, normal, cancel_timer(NewState)};
-    Error ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
-        {stop, Error, State}
+handle_replicator_doc_query({error, Reason}, _Acc) ->
+    {error, Reason};
+handle_replicator_doc_query({meta, _Meta}, Acc) ->
+    {ok, Acc};
+handle_replicator_doc_query(complete, Acc) ->
+    {stop, Acc}.
+
+
+-spec strip_url_creds(binary() | {[_]}) -> binary().
+strip_url_creds(Endpoint) ->
+    case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
+        #httpdb{url=Url} ->
+            iolist_to_binary(couch_util:url_strip_password(Url));
+        LocalDb when is_binary(LocalDb) ->
+            LocalDb
     end.
 
 
-start_timer(State) ->
-    After = State#rep_state.checkpoint_interval,
-    case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
-    {ok, Ref} ->
-        Ref;
-    Error ->
-        couch_log:error("Replicator, error scheduling checkpoint:  ~p", [Error]),
-        nil
-    end.
-
-
-cancel_timer(#rep_state{timer = nil} = State) ->
-    State;
-cancel_timer(#rep_state{timer = Timer} = State) ->
-    {ok, cancel} = timer:cancel(Timer),
-    State#rep_state{timer = nil}.
-
-
-init_state(Rep) ->
-    #rep{
-        id = {BaseId, _Ext},
-        source = Src0, target = Tgt,
-        options = Options, user_ctx = UserCtx,
-        type = Type, view = View
-    } = Rep,
-    % Adjust minimum number of http source connections to 2 to avoid deadlock
-    Src = adjust_maxconn(Src0, BaseId),
-    {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
-        get_value(create_target, Options, false)),
-
-    {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
-    {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
+-spec filter_replicator_doc_query(atom(), [atom()]) -> boolean().
+filter_replicator_doc_query(_DocState, []) ->
+    true;
+filter_replicator_doc_query(State, States) when is_list(States) ->
+    lists:member(State, States).
 
-    [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
-
-    {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
-    StartSeq1 = get_value(since_seq, Options, StartSeq0),
-    StartSeq = {0, StartSeq1},
-
-    SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
-
-    #doc{body={CheckpointHistory}} = SourceLog,
-    State = #rep_state{
-        rep_details = Rep,
-        source_name = couch_replicator_api_wrap:db_uri(Source),
-        target_name = couch_replicator_api_wrap:db_uri(Target),
-        source = Source,
-        target = Target,
-        history = History,
-        checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
-        start_seq = StartSeq,
-        current_through_seq = StartSeq,
-        committed_seq = StartSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = httpd_util:rfc1123_date(),
-        src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
-        tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
-        session_id = couch_uuids:random(),
-        source_db_compaction_notifier =
-            start_db_compaction_notifier(Source, self()),
-        target_db_compaction_notifier =
-            start_db_compaction_notifier(Target, self()),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
-        source_seq = SourceSeq,
-        use_checkpoints = get_value(use_checkpoints, Options, true),
-        checkpoint_interval = get_value(checkpoint_interval, Options,
-                                        ?DEFAULT_CHECKPOINT_INTERVAL),
-        type = Type,
-        view = View
-    },
-    State#rep_state{timer = start_timer(State)}.
-
-
-find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
-    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
-    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
 
+-spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
+job(JobId0) when is_binary(JobId0) ->
+    JobId = couch_replicator_ids:convert(JobId0),
+    {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
+    case [JobInfo || {ok, JobInfo} <- Res] of
+        [JobInfo| _] ->
+            {ok, JobInfo};
+        [] ->
+            {error, not_found}
+    end.
 
-fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
-    lists:reverse(Acc);
 
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
-    case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
-    {error, <<"not_found">>} when Vsn > 1 ->
-        OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
-        fold_replication_logs(Dbs, Vsn - 1,
-            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
-    {error, <<"not_found">>} ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
-    {ok, Doc} when LogId =:= NewId ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
-    {ok, Doc} ->
-        MigratedLog = #doc{id = NewId, body = Doc#doc.body},
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
+-spec doc(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc(RepDb, DocId, UserCtx) ->
+    {Res, _Bad} = rpc:multicall(couch_replicator_doc_processor, doc, [RepDb, DocId]),
+    case [DocInfo || {ok, DocInfo} <- Res] of
+        [DocInfo| _] ->
+            {ok, DocInfo};
+        [] ->
+            doc_from_db(RepDb, DocId, UserCtx)
     end.
 
 
-spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
-    spawn_link(fun() ->
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
-    end).
-
-changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
-    receive
-    {get_changes, From} ->
-        case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
-        closed ->
-            From ! {closed, self()};
-        {ok, Changes} ->
-            #doc_info{high_seq = Seq} = lists:last(Changes),
-            ReportSeq = {Ts, Seq},
-            ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
-            From ! {changes, self(), Changes, ReportSeq}
-        end,
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+-spec doc_from_db(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc_from_db(RepDb, DocId, UserCtx) ->
+    case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
+        {ok, Doc} ->
+            {Props} = couch_doc:to_json_obj(Doc, []),
+            Source = get_value(<<"source">>, Props),
+            Target = get_value(<<"target">>, Props),
+            State = get_value(<<"_replication_state">>, Props, null),
+            StartTime = get_value(<<"_replication_start_time">>, Props, null),
+            StateTime = get_value(<<"_replication_state_time">>, Props, null),
+            {StateInfo, ErrorCount} = case State of
+                <<"completed">> ->
+                    Info = get_value(<<"_replication_stats">>, Props, null),
+                    {Info, 0};
+                <<"failed">> ->
+                    Info = get_value(<<"_replication_state_reason">>, Props, null),
+                    {Info, 1};
+                _OtherState ->
+                    {null, 0}
+            end,
+            {ok, {[
+                {doc_id, DocId},
+                {database, RepDb},
+                {id, null},
+                {source, strip_url_creds(Source)},
+                {target, strip_url_creds(Target)},
+                {state, State},
+                {error_count, ErrorCount},
+                {info, StateInfo},
+                {start_time, StartTime},
+                {last_updated, StateTime}
+            ]}};
+         {not_found, _Reason} ->
+            {error, not_found}
     end.
 
 
-do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
-    NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
-    {ok, NewState};
-do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
-    update_task(State),
-    {ok, State};
-do_checkpoint(State) ->
-    #rep_state{
-        source_name=SourceName,
-        target_name=TargetName,
-        source = Source,
-        target = Target,
-        history = OldHistory,
-        start_seq = {_, StartSeq},
-        current_through_seq = {_Ts, NewSeq} = NewTsSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = ReplicationStartTime,
-        src_starttime = SrcInstanceStartTime,
-        tgt_starttime = TgtInstanceStartTime,
-        stats = Stats,
-        rep_details = #rep{options = Options},
-        session_id = SessionId
-    } = State,
-    case commit_to_both(Source, Target) of
-    {source_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
-    {target_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
-    {SrcInstanceStartTime, TgtInstanceStartTime} ->
-        couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
-            [SourceName, TargetName, NewSeq]),
-        StartTime = ?l2b(ReplicationStartTime),
-        EndTime = ?l2b(httpd_util:rfc1123_date()),
-        NewHistoryEntry = {[
-            {<<"session_id">>, SessionId},
-            {<<"start_time">>, StartTime},
-            {<<"end_time">>, EndTime},
-            {<<"start_last_seq">>, StartSeq},
-            {<<"end_last_seq">>, NewSeq},
-            {<<"recorded_seq">>, NewSeq},
-            {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
-            {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
-            {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
-            {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
-            {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
-        ]},
-        BaseHistory = [
-            {<<"session_id">>, SessionId},
-            {<<"source_last_seq">>, NewSeq},
-            {<<"replication_id_version">>, ?REP_ID_VERSION}
-        ] ++ case get_value(doc_ids, Options) of
-        undefined ->
-            [];
-        _DocIds ->
-            % backwards compatibility with the result of a replication by
-            % doc IDs in versions 0.11.x and 1.0.x
-            % TODO: deprecate (use same history format, simplify code)
-            [
-                {<<"start_time">>, StartTime},
-                {<<"end_time">>, EndTime},
-                {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
-                {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
-                {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
-            ]
-        end,
-        % limit history to 50 entries
-        NewRepHistory = {
-            BaseHistory ++
-            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
-        },
-
-        try
-            {SrcRevPos, SrcRevId} = update_checkpoint(
-                Source, SourceLog#doc{body = NewRepHistory}, source),
-            {TgtRevPos, TgtRevId} = update_checkpoint(
-                Target, TargetLog#doc{body = NewRepHistory}, target),
-            NewState = State#rep_state{
-                checkpoint_history = NewRepHistory,
-                committed_seq = NewTsSeq,
-                source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
-                target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
-            },
-            update_task(NewState),
-            {ok, NewState}
-        catch throw:{checkpoint_commit_failure, _} = Failure ->
-            Failure
-        end;
-    {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Target database out of sync. "
-            "Try to increase max_dbs_open at the target's server.">>};
-    {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source database out of sync. "
-            "Try to increase max_dbs_open at the source's server.">>};
-    {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source and target databases out of "
-            "sync. Try to increase max_dbs_open at both servers.">>}
+-spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
+check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
+    case couch_replicator_scheduler:rep_state(RepId) of
+    #rep{user_ctx = #user_ctx{name = Name}} ->
+        ok;
+    #rep{} ->
+        couch_httpd:verify_is_server_admin(Ctx);
+    nil ->
+        not_found
     end.
 
 
-update_checkpoint(Db, Doc, DbType) ->
-    try
-        update_checkpoint(Db, Doc)
-    catch throw:{checkpoint_commit_failure, Reason} ->
-        throw({checkpoint_commit_failure,
-            <<"Error updating the ", (to_binary(DbType))/binary,
-                " checkpoint document: ", (to_binary(Reason))/binary>>})
-    end.
+-ifdef(TEST).
 
-update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
-    try
-        case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
-        {ok, PosRevId} ->
-            PosRevId;
-        {error, Reason} ->
-            throw({checkpoint_commit_failure, Reason})
-        end
-    catch throw:conflict ->
-        case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
-        {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
-            % This means that we were able to update successfully the
-            % checkpoint doc in a previous attempt but we got a connection
-            % error (timeout for e.g.) before receiving the success response.
-            % Therefore the request was retried and we got a conflict, as the
-            % revision we sent is not the current one.
-            % We confirm this by verifying the doc body we just got is the same
-            % that we have just sent.
-            {Pos, RevId};
-        _ ->
-            throw({checkpoint_commit_failure, conflict})
-        end
-    end.
+-include_lib("eunit/include/eunit.hrl").
 
+authorization_test_() ->
+    {
+        foreach,
+        fun () -> ok end,
+        fun (_) -> meck:unload() end,
+        [
+            t_admin_is_always_authorized(),
+            t_username_must_match(),
+            t_replication_not_found()
+        ]
+    }.
 
-commit_to_both(Source, Target) ->
-    % commit the src async
-    ParentPid = self(),
-    SrcCommitPid = spawn_link(
-        fun() ->
-            Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
-            ParentPid ! {self(), Result}
-        end),
 
-    % commit tgt sync
-    TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
+t_admin_is_always_authorized() ->
+    ?_test(begin
+        expect_rep_user_ctx(<<"someuser">>, <<"_admin">>),
+        UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
+        ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx))
+    end).
 
-    SourceResult = receive
-    {SrcCommitPid, Result} ->
-        unlink(SrcCommitPid),
-        receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
-        Result;
-    {'EXIT', SrcCommitPid, Reason} ->
-        {error, Reason}
-    end,
-    case TargetResult of
-    {ok, TargetStartTime} ->
-        case SourceResult of
-        {ok, SourceStartTime} ->
-            {SourceStartTime, TargetStartTime};
-        SourceError ->
-            {source_error, SourceError}
-        end;
-    TargetError ->
-        {target_error, TargetError}
-    end.
 
+t_username_must_match() ->
+     ?_test(begin
+        expect_rep_user_ctx(<<"user">>, <<"somerole">>),
+        UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
+        ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)),
+        UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]},
+        ?assertThrow({unauthorized, _}, check_authorization(<<"RepId">>, UserCtx2))
+    end).
 
-compare_replication_logs(SrcDoc, TgtDoc) ->
-    #doc{body={RepRecProps}} = SrcDoc,
-    #doc{body={RepRecPropsTgt}} = TgtDoc,
-    case get_value(<<"session_id">>, RepRecProps) ==
-            get_value(<<"session_id">>, RepRecPropsTgt) of
-    true ->
-        % if the records have the same session id,
-        % then we have a valid replication history
-        OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
-        OldHistory = get_value(<<"history">>, RepRecProps, []),
-        {OldSeqNum, OldHistory};
-    false ->
-        SourceHistory = get_value(<<"history">>, RepRecProps, []),
-        TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
-        couch_log:notice("Replication records differ. "
-                "Scanning histories to find a common ancestor.", []),
-        couch_log:debug("Record on source:~p~nRecord on target:~p~n",
-                [RepRecProps, RepRecPropsTgt]),
-        compare_rep_history(SourceHistory, TargetHistory)
-    end.
 
-compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
-    couch_log:notice("no common ancestry -- performing full replication", []),
-    {?LOWEST_SEQ, []};
-compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
-    SourceId = get_value(<<"session_id">>, S),
-    case has_session_id(SourceId, Target) of
-    true ->
-        RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
-        couch_log:notice("found a common replication record with source_seq ~p",
-            [RecordSeqNum]),
-        {RecordSeqNum, SourceRest};
-    false ->
-        TargetId = get_value(<<"session_id">>, T),
-        case has_session_id(TargetId, SourceRest) of
-        true ->
-            RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
-            couch_log:notice("found a common replication record with source_seq ~p",
-                [RecordSeqNum]),
-            {RecordSeqNum, TargetRest};
-        false ->
-            compare_rep_history(SourceRest, TargetRest)
-        end
-    end.
+t_replication_not_found() ->
+     ?_test(begin
+        meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
+        UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
+        ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)),
+        UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
+        ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx2))
+    end).
 
 
-has_session_id(_SessionId, []) ->
-    false;
-has_session_id(SessionId, [{Props} | Rest]) ->
-    case get_value(<<"session_id">>, Props, nil) of
-    SessionId ->
-        true;
-    _Else ->
-        has_session_id(SessionId, Rest)
-    end.
+expect_rep_user_ctx(Name, Role) ->
+    meck:expect(couch_replicator_scheduler, rep_state,
+        fun(_Id) ->
+            UserCtx = #user_ctx{name = Name, roles = [Role]},
+            #rep{user_ctx = UserCtx}
+        end).
 
 
-db_monitor(#db{} = Db) ->
-    couch_db:monitor(Db);
-db_monitor(_HttpDb) ->
-    nil.
+strip_url_creds_test_() ->
+     {
+        foreach,
+        fun () -> meck:expect(config, get, fun(_, _, Default) -> Default end) end,
+        fun (_) -> meck:unload() end,
+        [
+            t_strip_local_db_creds(),
+            t_strip_http_basic_creds(),
+            t_strip_http_props_creds()
+        ]
+    }.
 
-get_pending_count(St) ->
-    Rep = St#rep_state.rep_details,
-    Timeout = get_value(connection_timeout, Rep#rep.options),
-    TimeoutMicro = Timeout * 1000,
-    case get(pending_count_state) of
-        {LastUpdate, PendingCount} ->
-            case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of
-                true ->
-                    NewPendingCount = get_pending_count_int(St),
-                    put(pending_count_state, {os:timestamp(), NewPendingCount}),
-                    NewPendingCount;
-                false ->
-                    PendingCount
-            end;
-        undefined ->
-            NewPendingCount = get_pending_count_int(St),
-            put(pending_count_state, {os:timestamp(), NewPendingCount}),
-            NewPendingCount
-    end.
 
+t_strip_local_db_creds() ->
+    ?_test(?assertEqual(<<"localdb">>, strip_url_creds(<<"localdb">>))).
 
-get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) ->
-    {_, Seq} = St#rep_state.highest_seq_done,
-    Db = Db0#httpdb{retries = 3},
-    case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
-    {ok, Pending} ->
-        Pending;
-    _ ->
-        null
-    end;
-get_pending_count_int(#rep_state{source = Db}=St) ->
-    {_, Seq} = St#rep_state.highest_seq_done,
-    {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
-    Pending.
 
+t_strip_http_basic_creds() ->
+    ?_test(begin
+        Url1 = <<"http://adm:pass@host/db">>,
+        ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Url1)),
+        Url2 = <<"https://adm:pass@host/db">>,
+        ?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url2))
+    end).
 
-update_task(State) ->
-    #rep_state{
-        current_through_seq = {_, ThroughSeq},
-        highest_seq_done = {_, HighestSeq}
-    } = State,
-    couch_task_status:update(
-        rep_stats(State) ++ [
-        {source_seq, HighestSeq},
-        {through_seq, ThroughSeq}
-    ]).
 
+t_strip_http_props_creds() ->
+    ?_test(begin
+        Props1 = {[{<<"url">>, <<"http://adm:pass@host/db">>}]},
+        ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Props1)),
+        Props2 = {[ {<<"url">>, <<"http://host/db">>},
+            {<<"headers">>, {[{<<"Authorization">>, <<"Basic pa55">>}]}}
+        ]},
+        ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props2))
+    end).
 
-rep_stats(State) ->
-    #rep_state{
-        committed_seq = {_, CommittedSeq},
-        stats = Stats
-    } = State,
-    [
-        {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
-        {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
-        {docs_read, couch_replicator_stats:docs_read(Stats)},
-        {docs_written, couch_replicator_stats:docs_written(Stats)},
-        {changes_pending, get_pending_count(State)},
-        {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
-        {checkpointed_source_seq, CommittedSeq}
-    ].
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index d3485c0..b8669e8 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -13,13 +13,30 @@
 -define(REP_ID_VERSION, 3).
 
 -record(rep, {
-    id,
-    source,
-    target,
-    options,
-    user_ctx,
-    type = db,
-    view = nil,
-    doc_id,
-    db_name = null
+    id :: rep_id() | '_',
+    source :: any() | '_',
+    target :: any() | '_',
+    options :: [_] | '_',
+    user_ctx :: any() | '_',
+    type = db :: atom() | '_',
+    view = nil :: any() | '_',
+    doc_id :: any() | '_',
+    db_name = null :: null | binary() | '_',
+    start_time :: erlang:timestamp() | '_'
+}).
+
+-type rep_id() :: {string(), string()}.
+-type db_doc_id() :: {binary(), binary() | '_'}.
+-type seconds() :: non_neg_integer().
+-type rep_start_result() ::
+    {ok, rep_id()} |
+    ignore |
+    {temporary_error, binary()} |
+    {permanent_failure, binary()}.
+
+
+-record(doc_worker_result, {
+    id :: db_doc_id(),
+    wref :: reference(),
+    result :: rep_start_result()
 }).
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index e5f6253..a0d08d7 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -38,7 +38,8 @@
     open_doc/3,
     open_doc_revs/6,
     changes_since/5,
-    db_uri/1
+    db_uri/1,
+    normalize_db/1
     ]).
 
 -import(couch_replicator_httpc, [
@@ -290,6 +291,8 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
             throw(missing_doc);
         {'DOWN', Ref, process, Pid, {{nocatch, {missing_stub,_} = Stub}, _}} ->
             throw(Stub);
+        {'DOWN', Ref, process, Pid, {http_request_failed, _, _, max_backoff}} ->
+            exit(max_backoff);
         {'DOWN', Ref, process, Pid, request_uri_too_long} ->
             NewMaxLen = get_value(max_url_len, Options, ?MAX_URL_LEN) div 2,
             case NewMaxLen < ?MIN_URL_LEN of
@@ -517,6 +520,8 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
                         end)
             end)
     catch
+        exit:{http_request_failed, _, _, max_backoff} ->
+            exit(max_backoff);
         exit:{http_request_failed, _, _, {error, {connection_closed,
                 mid_stream}}} ->
             throw(retry_no_limit);
@@ -985,3 +990,46 @@ header_value(Key, Headers, Default) ->
         _ ->
             Default
     end.
+
+
+% Normalize an #httpdb{} or #db{} record such that it can be used for
+% comparisons. This means remove things like pids and also sort options / props.
+normalize_db(#httpdb{} = HttpDb) ->
+    #httpdb{
+        url = HttpDb#httpdb.url,
+        oauth = HttpDb#httpdb.oauth,
+        headers = lists:keysort(1, HttpDb#httpdb.headers),
+        timeout = HttpDb#httpdb.timeout,
+        ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
+        retries = HttpDb#httpdb.retries,
+        http_connections = HttpDb#httpdb.http_connections
+    };
+
+normalize_db(<<DbName/binary>>) ->
+    DbName.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+normalize_http_db_test() ->
+    HttpDb =  #httpdb{
+        url = "http://host/db",
+        oauth = #oauth{},
+        headers = [{"k2","v2"}, {"k1","v1"}],
+        timeout = 30000,
+        ibrowse_options = [{k2, v2}, {k1, v1}],
+        retries = 10,
+        http_connections = 20
+    },
+    Expected = HttpDb#httpdb{
+        headers = [{"k1","v1"}, {"k2","v2"}],
+        ibrowse_options = [{k1, v1}, {k2, v2}]
+    },
+    ?assertEqual(Expected, normalize_db(HttpDb)),
+    ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
+
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/src/couch_replicator_api_wrap.hrl
index 24e204b..fc94054 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.hrl
@@ -25,7 +25,8 @@
     wait = 250,         % milliseconds
     httpc_pool = nil,
     http_connections,
-    backoff = 25
+    first_error_timestamp = nil,
+    proxy_url
 }).
 
 -record(oauth, {
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index 309a230..58fb0e1 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -40,8 +40,18 @@
 -define(MAX_DISCARDED_MESSAGES, 16).
 
 
-setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
-    {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
+setup(Db) ->
+    #httpdb{
+        httpc_pool = nil,
+        url = Url,
+        http_connections = MaxConns,
+        proxy_url = ProxyURL
+    } = Db,
+    HttpcURL = case ProxyURL of
+        undefined -> Url;
+        _ when is_list(ProxyURL) -> ProxyURL
+    end,
+    {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, [{max_connections, MaxConns}]),
     {ok, Db#httpdb{httpc_pool = Pid}}.
 
 
@@ -98,6 +108,7 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
         lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
             HttpDb#httpdb.ibrowse_options)
     ],
+    backoff_before_request(Worker, HttpDb, Params),
     Response = ibrowse:send_req_direct(
         Worker, Url, Headers2, Method, Body, IbrowseOptions, Timeout),
     {Worker, Response}.
@@ -139,8 +150,9 @@ process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
 process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
     case list_to_integer(Code) of
     429 ->
-        backoff(Worker, HttpDb, Params);
+        backoff(HttpDb, Params);
     Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
+        backoff_success(HttpDb, Params),
         couch_stats:increment_counter([couch_replicator, responses, success]),
         EJson = case Body of
         <<>> ->
@@ -150,6 +162,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
         end,
         Callback(Ok, Headers, EJson);
     R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+        backoff_success(HttpDb, Params),
         do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
     Error ->
         couch_stats:increment_counter([couch_replicator, responses, failure]),
@@ -165,9 +178,10 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
     {ibrowse_async_headers, ReqId, Code, Headers} ->
         case list_to_integer(Code) of
         429 ->
-            backoff(Worker, HttpDb#httpdb{timeout = get_max_back_off()},
-                Params);
+            Timeout = couch_replicator_rate_limiter:max_interval(),
+            backoff(HttpDb#httpdb{timeout = Timeout}, Params);
         Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
+            backoff_success(HttpDb, Params),
             StreamDataFun = fun() ->
                 stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
             end,
@@ -184,6 +198,7 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
                     maybe_retry(Err, Worker, HttpDb, Params)
             end;
         R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+            backoff_success(HttpDb, Params),
             do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
         Error ->
             couch_stats:increment_counter(
@@ -266,37 +281,48 @@ discard_message(ReqId, Worker, Count) ->
     end.
 
 
-%% For 429 errors, we perform an exponential backoff up to 2.17 hours.
-%% We use Backoff time as a timeout/failure end.
-backoff(Worker, #httpdb{backoff = Backoff} = HttpDb, Params) ->
-    MaxBackOff = get_max_back_off(),
-    MaxBackOffLog = get_back_off_log_threshold(),
-    ok = timer:sleep(random:uniform(Backoff)),
-    Backoff2 = round(Backoff*get_back_off_exp()),
-    NewBackoff = erlang:min(Backoff2, MaxBackOff),
-    NewHttpDb = HttpDb#httpdb{backoff = NewBackoff},
-    case Backoff2 of
-        W0 when W0 > MaxBackOff ->
-            report_error(Worker, HttpDb, Params, {error,
-                "Long 429-induced Retry Time Out"});
-        W1 when W1 >=  MaxBackOffLog -> % Past 8 min, we log retries
-            log_retry_error(Params, HttpDb, Backoff2, "429 Retry"),
-            throw({retry, NewHttpDb, Params});
-        _ ->
-            throw({retry, NewHttpDb, Params})
-    end.
-
-
 maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params) ->
     report_error(Worker, HttpDb, Params, {error, Error});
 
-maybe_retry(Error, _Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
     Params) ->
-    ok = timer:sleep(Wait),
-    log_retry_error(Params, HttpDb, Wait, Error),
-    Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
-    NewHttpDb = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
-    throw({retry, NewHttpDb, Params}).
+    case total_error_time_exceeded(HttpDb) of
+        true ->
+            report_error(Worker, HttpDb, Params, {error, Error});
+        false ->
+            ok = timer:sleep(Wait),
+            log_retry_error(Params, HttpDb, Wait, Error),
+            Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
+            HttpDb1 = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
+            HttpDb2 = update_first_error_timestamp(HttpDb1),
+            throw({retry, HttpDb2, Params})
+    end.
+
+
+% When retrying, check to make total time spent retrying a request is below
+% the current scheduler health threshold. The goal is to not exceed the
+% threshold, otherwise the job which keep retrying too long will still be
+% considered healthy.
+total_error_time_exceeded(#httpdb{first_error_timestamp = nil}) ->
+    false;
+
+total_error_time_exceeded(#httpdb{first_error_timestamp = ErrorTimestamp}) ->
+    HealthThresholdSec = couch_replicator_scheduler:health_threshold(),
+    % Theshold value is halved because in the calling code the next step
+    % is a doubling. Not halving here could mean sleeping too long and
+    % exceeding the health threshold.
+    ThresholdUSec = (HealthThresholdSec / 2) * 1000000,
+    timer:now_diff(os:timestamp(), ErrorTimestamp) > ThresholdUSec.
+
+
+% Remember the first time an error occurs. This value is used later to check
+% the total time spend retrying a request. Because retrying is cursive, on
+% successful result #httpdb{} record is reset back to the original value.
+update_first_error_timestamp(#httpdb{first_error_timestamp = nil} = HttpDb) ->
+    HttpDb#httpdb{first_error_timestamp = os:timestamp()};
+
+update_first_error_timestamp(HttpDb) ->
+    HttpDb.
 
 
 log_retry_error(Params, HttpDb, Wait, Error) ->
@@ -440,11 +466,32 @@ after_redirect(RedirectUrl, HttpDb, Params) ->
     Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
     {HttpDb#httpdb{url = RedirectUrl}, Params2}.
 
-get_max_back_off() ->
-    config:get_integer("replicator", "max_backoff_wait", 250 * 32768).
 
-get_back_off_log_threshold() ->
-    config:get_integer("replicator", "max_backoff_log_threshold", 512000).
+backoff_key(HttpDb, Params) ->
+    Method = get_value(method, Params, get),
+    Url = HttpDb#httpdb.url,
+    {Url, Method}.
+
+
+backoff(HttpDb, Params) ->
+    Key = backoff_key(HttpDb, Params),
+    couch_replicator_rate_limiter:failure(Key),
+    throw({retry, HttpDb, Params}).
 
-get_back_off_exp() ->
-    config:get_float("replicator", "backoff_exp", 1.5).
+
+backoff_success(HttpDb, Params) ->
+    Key = backoff_key(HttpDb, Params),
+    couch_replicator_rate_limiter:success(Key).
+
+
+backoff_before_request(Worker, HttpDb, Params) ->
+    Key = backoff_key(HttpDb, Params),
+    Limit = couch_replicator_rate_limiter:max_interval(),
+    case couch_replicator_rate_limiter:interval(Key) of
+        Sleep when Sleep >= Limit ->
+            report_error(Worker, HttpDb, Params, max_backoff);
+        Sleep when Sleep >= 1 ->
+            timer:sleep(Sleep);
+        Sleep when Sleep == 0 ->
+            ok
+    end.
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
index eba1973..dbad050 100644
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ b/src/couch_replicator/src/couch_replicator_js_functions.hrl
@@ -53,7 +53,7 @@
         var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
         var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
 
-        if (newDoc._replication_state === 'error') {
+        if (newDoc._replication_state === 'failed') {
             // Skip validation in case when we update the document with the
             // failed state. In this case it might be malformed. However,
             // replicator will not pay attention to failed documents so this
@@ -61,12 +61,6 @@
             return;
         }
 
-        if (oldDoc && !newDoc._deleted && !isReplicator &&
-            (oldDoc._replication_state === 'triggered')) {
-            reportError('Only the replicator can edit replication documents ' +
-                'that are in the triggered state.');
-        }
-
         if (!newDoc._deleted) {
             validateEndpoint(newDoc.source, 'source');
             validateEndpoint(newDoc.target, 'target');
@@ -176,3 +170,25 @@
         }
     }
 ">>).
+
+
+-define(REP_DB_TERMINAL_STATE_VIEW_MAP_FUN, <<"
+    function(doc) {
+        state = doc._replication_state;
+        if (state === 'failed') {
+            source = doc.source;
+            target = doc.target;
+            start_time = doc._replication_start_time;
+            last_updated = doc._replication_state_time;
+            state_reason = doc._replication_state_reason;
+            emit('failed', [source, target, start_time, last_updated, state_reason]);
+        } else if (state === 'completed') {
+            source = doc.source;
+            target = doc.target;
+            start_time = doc._replication_start_time;
+            last_updated = doc._replication_state_time;
+            stats = doc._replication_stats;
+            emit('completed', [source, target, start_time, last_updated, stats]);
+        }
+    }
+">>).
diff --git a/src/couch_replicator/src/couch_replicator_manager.erl b/src/couch_replicator/src/couch_replicator_manager.erl
index 85dd428..6a79ce6 100644
--- a/src/couch_replicator/src/couch_replicator_manager.erl
+++ b/src/couch_replicator/src/couch_replicator_manager.erl
@@ -11,1033 +11,18 @@
 % the License.
 
 -module(couch_replicator_manager).
--behaviour(gen_server).
--vsn(3).
--behaviour(config_listener).
 
-% public API
--export([replication_started/1, replication_completed/2, replication_error/2]).
--export([continue/1, replication_usurped/2]).
+% TODO: This is a temporary proxy module to external calls (outside replicator) to other 
+% replicator modules. This is done to avoid juggling multiple repos during development.
 
+% NV: TODO: These functions were moved to couch_replicator_docs
+% but it is still called from fabric_doc_update. Keep it here for now
+% later, update fabric to call couch_replicator_docs instead
 -export([before_doc_update/2, after_doc_read/2]).
 
-% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
--export([code_change/3, terminate/2]).
 
-% changes callbacks
--export([changes_reader/3, changes_reader_cb/3]).
+before_doc_update(Doc, Db) ->
+    couch_replicator_docs:before_doc_update(Doc, Db).
 
-% config_listener callback
--export([handle_config_change/5, handle_config_terminate/3]).
-
--export([handle_db_event/3]).
-
-%% exported but private
--export([start_replication/2]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/include/mem3.hrl").
--include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
-
--define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
--define(REP_TO_STATE, couch_rep_id_to_rep_state).
--define(INITIAL_WAIT, 2.5). % seconds
--define(MAX_WAIT, 600).     % seconds
--define(AVG_DELAY_MSEC, 100).
--define(MAX_DELAY_MSEC, 60000).
--define(OWNER, <<"owner">>).
--define(REPLICATOR_DB, <<"_replicator">>).
-
--define(DB_TO_SEQ, db_to_seq).
--define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
-
--define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-
--define(RELISTEN_DELAY, 5000).
-
--record(rep_state, {
-    rep,
-    starting,
-    retries_left,
-    max_retries,
-    wait = ?INITIAL_WAIT
-}).
-
--import(couch_util, [
-    to_binary/1
-]).
-
--record(state, {
-    event_listener = nil,
-    scan_pid = nil,
-    rep_start_pids = [],
-    max_retries,
-    live = [],
-    epoch = nil
-}).
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-replication_started(#rep{id = {BaseId, _} = RepId}) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"triggered">>},
-            {<<"_replication_state_reason">>, undefined},
-            {<<"_replication_id">>, ?l2b(BaseId)},
-            {<<"_replication_stats">>, undefined}]),
-        ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
-        couch_log:notice("Document `~s` from `~s` triggered replication `~s`",
-            [DocId, DbName, pp_rep_id(RepId)])
-    end.
-
-
-replication_completed(#rep{id = RepId}, Stats) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"completed">>},
-            {<<"_replication_state_reason">>, undefined},
-            {<<"_replication_stats">>, {Stats}}]),
-        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
-        couch_log:notice("Replication `~s` finished (triggered by document `~s`"
-            " from `~s`)", [pp_rep_id(RepId), DocId, DbName])
-    end.
-
-
-replication_usurped(#rep{id = RepId}, By) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
-        couch_log:notice("Replication `~s` usurped by ~s (triggered by document"
-            " `~s` from `~s`)", [pp_rep_id(RepId), By, DocId, DbName])
-    end.
-
-
-replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        ok = timer:sleep(jitter(ets:info(?REP_TO_STATE, size))),
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"error">>},
-            {<<"_replication_state_reason">>, to_binary(error_reason(Error))},
-            {<<"_replication_id">>, ?l2b(BaseId)}]),
-        ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
-    end.
-
-continue(#rep{doc_id = null}) ->
-    {true, no_owner};
-continue(#rep{id = RepId}) ->
-    Owner = gen_server:call(?MODULE, {owner, RepId}, infinity),
-    {node() == Owner, Owner}.
-
-
-handle_config_change("replicator", "max_replication_retry_count", V, _, S) ->
-    ok = gen_server:cast(?MODULE, {set_max_retries, retries_value(V)}),
-    {ok, S};
-handle_config_change(_, _, _, _, S) ->
-    {ok, S}.
-
-handle_config_terminate(_, stop, _) ->
-    ok;
-handle_config_terminate(_Server, _Reason, _State) ->
-    erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
-
-init(_) ->
-    process_flag(trap_exit, true),
-    net_kernel:monitor_nodes(true),
-    Live = [node() | nodes()],
-    ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, public]),
-    ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, public]),
-    ?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, public]),
-    Server = self(),
-    ok = config:listen_for_changes(?MODULE, nil),
-    Epoch = make_ref(),
-    ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
-    % Automatically start node local changes feed loop
-    ensure_rep_db_exists(?REPLICATOR_DB),
-    Pid = start_changes_reader(?REPLICATOR_DB, 0, Epoch),
-    {ok, #state{
-        event_listener = start_event_listener(),
-        scan_pid = ScanPid,
-        max_retries = retries_value(
-            config:get("replicator", "max_replication_retry_count", "10")),
-        rep_start_pids = [{?REPLICATOR_DB, Pid}],
-        live = Live,
-        epoch = Epoch
-    }}.
-
-handle_call({owner, RepId}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        {reply, nonode, State};
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        {reply, owner(DbName, DocId, State#state.live), State}
-    end;
-
-handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
-    NewState = try
-        process_update(State, DbName, Change)
-    catch
-    _Tag:Error ->
-        {RepProps} = get_json_value(doc, ChangeProps),
-        DocId = get_json_value(<<"_id">>, RepProps),
-        rep_db_update_error(Error, DbName, DocId),
-        State
-    end,
-    {reply, ok, NewState};
-
-
-handle_call({rep_started, RepId}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    RepState ->
-        NewRepState = RepState#rep_state{
-            starting = false,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries,
-            wait = ?INITIAL_WAIT
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
-    end,
-    {reply, ok, State};
-
-handle_call({rep_complete, RepId}, _From, State) ->
-    true = ets:delete(?REP_TO_STATE, RepId),
-    {reply, ok, State};
-
-handle_call({rep_error, RepId, Error}, _From, State) ->
-    {reply, ok, replication_error(State, RepId, Error)};
-
-% Match changes epoch with the current epoch in the state.
-% New epoch ref is created on a full rescan. Change feeds have to
-% be replayed from the start to determine ownership in the new
-% cluster configuration and epoch is used to match & checkpoint
-% only changes from the current cluster configuration.
-handle_call({rep_db_checkpoint, DbName, EndSeq, Epoch}, _From,
-            #state{epoch = Epoch} = State) ->
-    Entry = case ets:lookup(?DB_TO_SEQ, DbName) of
-        [] ->
-            {DbName, EndSeq, false};
-        [{DbName, _OldSeq, Rescan}] ->
-            {DbName, EndSeq, Rescan}
-    end,
-    true = ets:insert(?DB_TO_SEQ, Entry),
-    {reply, ok, State};
-
-% Ignore checkpoints from previous epoch.
-handle_call({rep_db_checkpoint, _DbName, _EndSeq, _Epoch}, _From, State) ->
-    {reply, ok, State};
-
-handle_call(Msg, From, State) ->
-    couch_log:error("Replication manager received unexpected call ~p from ~p",
-        [Msg, From]),
-    {stop, {error, {unexpected_call, Msg}}, State}.
-
-handle_cast({resume_scan, DbName}, State) ->
-    Pids = State#state.rep_start_pids,
-    NewPids = case lists:keyfind(DbName, 1, Pids) of
-        {DbName, _Pid} ->
-            Entry = case ets:lookup(?DB_TO_SEQ, DbName) of
-                [] ->
-                    {DbName, 0, true};
-                [{DbName, EndSeq, _Rescan}] ->
-                    {DbName, EndSeq, true}
-            end,
-            true = ets:insert(?DB_TO_SEQ, Entry),
-            Pids;
-        false ->
-            Since = case ets:lookup(?DB_TO_SEQ, DbName) of
-                [] -> 0;
-                [{DbName, EndSeq, _Rescan}] -> EndSeq
-            end,
-            true = ets:insert(?DB_TO_SEQ, {DbName, Since, false}),
-            ensure_rep_ddoc_exists(DbName),
-            Pid = start_changes_reader(DbName, Since, State#state.epoch),
-            couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
-            [{DbName, Pid} | Pids]
-    end,
-    {noreply, State#state{rep_start_pids = NewPids}};
-
-handle_cast({set_max_retries, MaxRetries}, State) ->
-    {noreply, State#state{max_retries = MaxRetries}};
-
-handle_cast(Msg, State) ->
-    couch_log:error("Replication manager received unexpected cast ~p", [Msg]),
-    {stop, {error, {unexpected_cast, Msg}}, State}.
-
-handle_info({nodeup, Node}, State) ->
-    couch_log:notice("Rescanning replicator dbs as ~s came up.", [Node]),
-    Live = lists:usort([Node | State#state.live]),
-    {noreply, rescan(State#state{live=Live})};
-
-handle_info({nodedown, Node}, State) ->
-    couch_log:notice("Rescanning replicator dbs ~s went down.", [Node]),
-    Live = State#state.live -- [Node],
-    {noreply, rescan(State#state{live=Live})};
-
-handle_info({'EXIT', From, normal}, #state{scan_pid = From} = State) ->
-    couch_log:debug("Background scan has completed.", []),
-    {noreply, State#state{scan_pid=nil}};
-
-handle_info({'EXIT', From, Reason}, #state{scan_pid = From} = State) ->
-    couch_log:error("Background scanner died. Reason: ~p", [Reason]),
-    {stop, {scanner_died, Reason}, State};
-
-handle_info({'EXIT', From, Reason}, #state{event_listener = From} = State) ->
-    couch_log:error("Database update notifier died. Reason: ~p", [Reason]),
-    {stop, {db_update_notifier_died, Reason}, State};
-
-handle_info({'EXIT', From, Reason}, #state{rep_start_pids = Pids} = State) ->
-    case lists:keytake(From, 2, Pids) of
-        {value, {DbName, From}, NewPids} ->
-            if Reason == normal -> ok; true ->
-                Fmt = "~s : Known replication or change feed pid ~w died :: ~w",
-                couch_log:error(Fmt, [?MODULE, From, Reason])
-            end,
-            NewState = State#state{rep_start_pids = NewPids},
-            case ets:lookup(?DB_TO_SEQ, DbName) of
-                [{DbName, _EndSeq, true}] ->
-                    handle_cast({resume_scan, DbName}, NewState);
-                _ ->
-                    {noreply, NewState}
-            end;
-        false when Reason == normal ->
-            {noreply, State};
-        false ->
-            Fmt = "~s : Unknown pid ~w died :: ~w",
-            couch_log:error(Fmt, [?MODULE, From, Reason]),
-            {stop, {unexpected_exit, From, Reason}, State}
-    end;
-
-handle_info({'DOWN', _Ref, _, _, _}, State) ->
-    % From a db monitor created by a replication process. Ignore.
-    {noreply, State};
-
-handle_info(shutdown, State) ->
-    {stop, shutdown, State};
-
-handle_info(restart_config_listener, State) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    {noreply, State};
-
-handle_info(Msg, State) ->
-    couch_log:error("Replication manager received unexpected message ~p", [Msg]),
-    {stop, {unexpected_msg, Msg}, State}.
-
-
-terminate(_Reason, State) ->
-    #state{
-        scan_pid = ScanPid,
-        rep_start_pids = StartPids,
-        event_listener = Listener
-    } = State,
-    stop_all_replications(),
-    lists:foreach(
-        fun({_Tag, Pid}) ->
-            catch unlink(Pid),
-            catch exit(Pid, stop)
-        end,
-        [{scanner, ScanPid} | StartPids]),
-    true = ets:delete(?REP_TO_STATE),
-    true = ets:delete(?DOC_TO_REP),
-    true = ets:delete(?DB_TO_SEQ),
-    couch_event:stop_listener(Listener).
-
-
-code_change(1, State, _Extra) ->
-    {ok, erlang:append_element(State, [node() | nodes()])};
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-start_changes_reader(DbName, Since, Epoch) ->
-    spawn_link(?MODULE, changes_reader, [{self(), Epoch}, DbName, Since]).
-
-changes_reader({Server, Epoch}, DbName, Since) ->
-    UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
-    DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
-    {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
-    ChangesFeedFun = couch_changes:handle_db_changes(
-        #changes_args{
-            include_docs = true,
-            since = Since,
-            feed = "normal",
-            timeout = infinity
-        },
-        {json_req, null},
-        Db
-    ),
-    ChangesFeedFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName, Epoch}}).
-
-changes_reader_cb({change, Change, _}, _, {Server, DbName, Epoch}) ->
-    case has_valid_rep_id(Change) of
-        true ->
-            Msg = {rep_db_update, DbName, Change},
-            ok = gen_server:call(Server, Msg, infinity);
-        false ->
-            ok
-    end,
-    {Server, DbName, Epoch};
-changes_reader_cb({stop, EndSeq}, _, {Server, DbName, Epoch}) ->
-    Msg = {rep_db_checkpoint, DbName, EndSeq, Epoch},
-    ok = gen_server:call(Server, Msg, infinity),
-    {Server, DbName, Epoch};
-changes_reader_cb(_, _, Acc) ->
-    Acc.
-
-has_valid_rep_id({Change}) ->
-    has_valid_rep_id(get_json_value(<<"id">>, Change));
-has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
-    false;
-has_valid_rep_id(_Else) ->
-    true.
-
-
-start_event_listener() ->
-    {ok, Pid} = couch_event:link_listener(
-            ?MODULE, handle_db_event, self(), [all_dbs]
-        ),
-    Pid.
-
-
-handle_db_event(DbName, created, Server) ->
-    case is_replicator_db(DbName) of
-	true ->
-	    ensure_rep_ddoc_exists(DbName);
-	_ ->
-	    ok
-    end,
-    {ok, Server};
-handle_db_event(DbName, updated, Server) ->
-    case is_replicator_db(DbName) of
-        true ->
-	    Msg = {resume_scan, DbName},
-	    ok = gen_server:cast(Server, Msg);
-        _ ->
-            ok
-    end,
-    {ok, Server};
-handle_db_event(DbName, deleted, Server) ->
-    case is_replicator_db(DbName) of
-        true ->
-            clean_up_replications(DbName);
-        _ ->
-            ok
-    end,
-    {ok, Server};
-handle_db_event(_DbName, _Event, Server) ->
-    {ok, Server}.
-
-rescan(#state{scan_pid = nil} = State) ->
-    true = ets:delete_all_objects(?DB_TO_SEQ),
-    Server = self(),
-    Epoch = make_ref(),
-    NewScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
-    State#state{scan_pid = NewScanPid, epoch = Epoch};
-rescan(#state{scan_pid = ScanPid} = State) ->
-    unlink(ScanPid),
-    exit(ScanPid, exit),
-    rescan(State#state{scan_pid = nil}).
-
-process_update(State, DbName, {Change}) ->
-    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
-    DocId = get_json_value(<<"_id">>, RepProps),
-    case {owner(DbName, DocId, State#state.live), get_json_value(deleted, Change, false)} of
-    {_, true} ->
-        rep_doc_deleted(DbName, DocId),
-        State;
-    {Owner, false} when Owner /= node() ->
-        couch_log:notice("Not starting '~s' from '~s' as owner is ~s.",
-            [DocId, DbName, Owner]),
-        State;
-    {_Owner, false} ->
-        couch_log:notice("Maybe starting '~s' from '~s' as I'm the owner", [DocId, DbName]),
-        case get_json_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            maybe_start_replication(State, DbName, DocId, JsonRepDoc);
-        <<"triggered">> ->
-            maybe_start_replication(State, DbName, DocId, JsonRepDoc);
-        <<"completed">> ->
-            replication_complete(DbName, DocId),
-            State;
-        <<"error">> ->
-            case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-            [] ->
-                maybe_start_replication(State, DbName, DocId, JsonRepDoc);
-            _ ->
-                State
-            end
-        end
-    end.
-
-owner(<<"shards/", _/binary>> = DbName, DocId, Live) ->
-    Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(mem3:dbname(DbName), DocId),
-			     lists:member(N, Live)]),
-    hd(mem3_util:rotate_list({DbName, DocId}, Nodes));
-owner(_DbName, _DocId, _Live) ->
-    node().
-
-rep_db_update_error(Error, DbName, DocId) ->
-    case Error of
-    {bad_rep_doc, Reason} ->
-        ok;
-    _ ->
-        Reason = to_binary(Error)
-    end,
-    couch_log:error("Replication manager, error processing document `~s`"
-        " from `~s`: ~s", [DocId, DbName, Reason]),
-    update_rep_doc(DbName, DocId, [{<<"_replication_state">>, <<"error">>},
-                           {<<"_replication_state_reason">>, Reason}]).
-
-
-rep_user_ctx({RepDoc}) ->
-    case get_json_value(<<"user_ctx">>, RepDoc) of
-    undefined ->
-        #user_ctx{};
-    {UserCtx} ->
-        #user_ctx{
-            name = get_json_value(<<"name">>, UserCtx, null),
-            roles = get_json_value(<<"roles">>, UserCtx, [])
-        }
-    end.
-
-
-maybe_start_replication(State, DbName, DocId, RepDoc) ->
-    #rep{id = {BaseId, _} = RepId} = Rep0 = parse_rep_doc(RepDoc),
-    Rep = Rep0#rep{db_name = DbName},
-    case rep_state(RepId) of
-    nil ->
-        RepState = #rep_state{
-            rep = Rep,
-            starting = true,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
-        true = ets:insert(?DOC_TO_REP, {{DbName, DocId}, RepId}),
-        couch_log:notice("Attempting to start replication `~s` (document `~s`"
-            " from `~s`).", [pp_rep_id(RepId), DocId, DbName]),
-        StartDelaySecs = erlang:max(0,
-            config:get_integer("replicator", "start_delay", 10)),
-        StartSplaySecs = erlang:max(1,
-            config:get_integer("replicator", "start_splay", 50)),
-        DelaySecs = StartDelaySecs + random:uniform(StartSplaySecs),
-        couch_log:notice("Delaying replication `~s` start by ~p seconds.",
-            [pp_rep_id(RepId), DelaySecs]),
-        Pid = spawn_link(?MODULE, start_replication, [Rep, DelaySecs]),
-        State#state{
-            rep_start_pids = [{rep_start, Pid} | State#state.rep_start_pids]
-        };
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        State;
-    #rep_state{starting = false, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
-        couch_log:notice("The replication specified by the document `~s` from"
-            " `~s` was already triggered by the document `~s`",
-            [DocId, DbName, OtherDocId]),
-        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
-        State;
-    #rep_state{starting = true, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
-        couch_log:notice("The replication specified by the document `~s` from"
-            " `~s` is already being triggered by the document `~s`",
-            [DocId, DbName, OtherDocId]),
-        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
-        State
-    end.
-
-
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} = try
-        couch_replicator_utils:parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-    catch
-    throw:{error, Reason} ->
-        throw({bad_rep_doc, Reason});
-    Tag:Err ->
-        throw({bad_rep_doc, to_binary({Tag, Err})})
-    end,
-    Rep.
-
-
-maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
-    case get_json_value(<<"_replication_id">>, RepProps) of
-    RepId ->
-        ok;
-    _ ->
-        update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}])
-    end.
-
-start_replication(Rep, Wait) ->
-    ok = timer:sleep(Wait * 1000),
-    case (catch couch_replicator:async_replicate(Rep)) of
-    {ok, _} ->
-        ok;
-    Error ->
-        replication_error(Rep, Error)
-    end.
-
-replication_complete(DbName, DocId) ->
-    case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-    [{{DbName, DocId}, {BaseId, Ext} = RepId}] ->
-        case rep_state(RepId) of
-        nil ->
-            % Prior to OTP R14B02, temporary child specs remain in
-            % in the supervisor after a worker finishes - remove them.
-            % We want to be able to start the same replication but with
-            % eventually different values for parameters that don't
-            % contribute to its ID calculation.
-            case erlang:system_info(otp_release) < "R14B02" of
-            true ->
-                spawn(fun() ->
-                    _ = supervisor:delete_child(couch_replicator_job_sup, BaseId ++ Ext)
-                end);
-            false ->
-                ok
-            end;
-        #rep_state{} ->
-            ok
-        end,
-        true = ets:delete(?DOC_TO_REP, {DbName, DocId});
-    _ ->
-        ok
-    end.
-
-
-rep_doc_deleted(DbName, DocId) ->
-    case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-    [{{DbName, DocId}, RepId}] ->
-        couch_replicator:cancel_replication(RepId),
-        true = ets:delete(?REP_TO_STATE, RepId),
-        true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
-        couch_log:notice("Stopped replication `~s` because replication document"
-            " `~s` from `~s` was deleted", [pp_rep_id(RepId), DocId, DbName]);
-    [] ->
-        ok
-    end.
-
-
-replication_error(State, RepId, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        State;
-    RepState ->
-        maybe_retry_replication(RepState, Error, State)
-    end.
-
-maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId, db_name = DbName},
-        max_retries = MaxRetries
-    } = RepState,
-    couch_replicator:cancel_replication(RepId),
-    true = ets:delete(?REP_TO_STATE, RepId),
-    true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
-    couch_log:error("Error in replication `~s` (triggered by document `~s` from"
-        " `~s` ): ~s~nReached maximum retry attempts (~p).", [pp_rep_id(RepId),
-        DocId, DbName, to_binary(error_reason(Error)), MaxRetries]),
-    State;
-
-maybe_retry_replication(RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId, db_name = DbName} = Rep
-    } = RepState,
-    #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
-    true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
-    couch_log:error("Error in replication `~s` (triggered by document `~s` from"
-        " `~s` ): ~s~nRestarting replication in ~p seconds.", [pp_rep_id(RepId),
-        DocId, DbName, to_binary(error_reason(Error)), Wait]),
-    Pid = spawn_link(?MODULE, start_replication, [Rep, Wait]),
-    State#state{
-        rep_start_pids = [{rep_start, Pid} | State#state.rep_start_pids]
-    }.
-
-
-stop_all_replications() ->
-    couch_log:notice("Stopping all ongoing replications", []),
-    ets:foldl(
-        fun({_, RepId}, _) ->
-            couch_replicator:cancel_replication(RepId)
-        end,
-        ok, ?DOC_TO_REP),
-    true = ets:delete_all_objects(?REP_TO_STATE),
-    true = ets:delete_all_objects(?DOC_TO_REP),
-    true = ets:delete_all_objects(?DB_TO_SEQ).
-
-clean_up_replications(DbName) ->
-    ets:foldl(
-        fun({{Name, DocId}, RepId}, _) when Name =:= DbName ->
-            couch_replicator:cancel_replication(RepId),
-            ets:delete(?DOC_TO_REP,{Name, DocId}),
-            ets:delete(?REP_TO_STATE, RepId);
-           ({_,_}, _) ->
-            ok
-        end,
-        ok, ?DOC_TO_REP),
-    ets:delete(?DB_TO_SEQ,DbName).
-
-
-update_rep_doc(RepDbName, RepDocId, KVs) ->
-    update_rep_doc(RepDbName, RepDocId, KVs, 1).
-
-update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
-    try
-        case open_rep_doc(RepDbName, RepDocId) of
-            {ok, LastRepDoc} ->
-                update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
-            _ ->
-                ok
-        end
-    catch
-        throw:conflict ->
-            Msg = "Conflict when updating replication document `~s`. Retrying.",
-            couch_log:error(Msg, [RepDocId]),
-            ok = timer:sleep(random:uniform(erlang:min(128, Wait)) * 100),
-            update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
-    end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
-    NewRepDocBody = lists:foldl(
-        fun({K, undefined}, Body) ->
-                lists:keydelete(K, 1, Body);
-           ({<<"_replication_state">> = K, State} = KV, Body) ->
-                case get_json_value(K, Body) of
-                State ->
-                    Body;
-                _ ->
-                    Body1 = lists:keystore(K, 1, Body, KV),
-                    lists:keystore(
-                        <<"_replication_state_time">>, 1, Body1,
-                        {<<"_replication_state_time">>, timestamp()})
-                end;
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody, KVs),
-    case NewRepDocBody of
-    RepDocBody ->
-        ok;
-    _ ->
-        % Might not succeed - when the replication doc is deleted right
-        % before this update (not an error, ignore).
-        save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
-    end.
-
-open_rep_doc(DbName, DocId) ->
-    case couch_db:open_int(DbName, [?CTX, sys_db]) of
-        {ok, Db} ->
-            try
-                couch_db:open_doc(Db, DocId, [ejson_body])
-            after
-                couch_db:close(Db)
-            end;
-        Else ->
-            Else
-    end.
-
-save_rep_doc(DbName, Doc) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
-    try
-        couch_db:update_doc(Db, Doc, [])
-    after
-        couch_db:close(Db)
-    end.
-
-% RFC3339 timestamps.
-% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
-timestamp() ->
-    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
-    UTime = erlang:universaltime(),
-    LocalTime = calendar:universal_time_to_local_time(UTime),
-    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
-        calendar:datetime_to_gregorian_seconds(UTime),
-    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
-    iolist_to_binary(
-        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
-            [Year, Month, Day, Hour, Min, Sec,
-                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
-
-zone(Hr, Min) when Hr >= 0, Min >= 0 ->
-    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
-zone(Hr, Min) ->
-    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
-
-
-ensure_rep_db_exists(<<"shards/", _/binary>>=DbName) ->
-    ensure_rep_ddoc_exists(DbName),
-    ok;
-ensure_rep_db_exists(DbName) ->
-    Db = case couch_db:open_int(DbName, [?CTX, sys_db, nologifmissing]) of
-        {ok, Db0} ->
-            Db0;
-        _Error ->
-            {ok, Db0} = couch_db:create(DbName, [?CTX, sys_db]),
-            Db0
-    end,
-    ensure_rep_ddoc_exists(DbName),
-    {ok, Db}.
-
-ensure_rep_ddoc_exists(RepDb) ->
-    DDocId = <<"_design/_replicator">>,
-    case mem3:belongs(RepDb, DDocId) of
-	true ->
-	    ensure_rep_ddoc_exists(RepDb, DDocId);
-	false ->
-	    ok
-    end.
-
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
-    case open_rep_doc(RepDb, DDocId) of
-        {not_found, no_db_file} ->
-            %% database was deleted.
-            ok;
-        {not_found, _Reason} ->
-            {ok, DDoc} = replication_design_doc(DDocId),
-            couch_log:notice("creating replicator ddoc", []),
-            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
-        {ok, Doc} ->
-            {Props} = couch_doc:to_json_obj(Doc, []),
-            case couch_util:get_value(<<"validate_doc_update">>, Props, []) of
-                ?REP_DB_DOC_VALIDATE_FUN ->
-                    ok;
-                _ ->
-                    Props1 = lists:keyreplace(<<"validate_doc_update">>, 1, Props,
-                         {<<"validate_doc_update">>,
-                        ?REP_DB_DOC_VALIDATE_FUN}),
-                    DDoc = couch_doc:from_json_obj({Props1}),
-                    couch_log:notice("updating replicator ddoc", []),
-                    try
-                        {ok, _} = save_rep_doc(RepDb, DDoc)
-                    catch
-                        throw:conflict ->
-                            %% ignore, we'll retry next time
-                            ok
-                    end
-            end
-    end,
-    ok.
-
-replication_design_doc(DDocId) ->
-    DocProps = [
-        {<<"_id">>, DDocId},
-        {<<"language">>, <<"javascript">>},
-        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-   ],
-   {ok, couch_doc:from_json_obj({DocProps})}.
-
-
-% pretty-print replication id
-pp_rep_id(#rep{id = RepId}) ->
-    pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
-    Base ++ Extension.
-
-
-rep_state(RepId) ->
-    case ets:lookup(?REP_TO_STATE, RepId) of
-    [{RepId, RepState}] ->
-        RepState;
-    [] ->
-        nil
-    end.
-
-
-error_reason({error, {Error, Reason}})
-  when is_atom(Error), is_binary(Reason) ->
-    io_lib:format("~s: ~s", [Error, Reason]);
-error_reason({error, Reason}) ->
-    Reason;
-error_reason(Reason) ->
-    Reason.
-
-
-retries_value("infinity") ->
-    infinity;
-retries_value(Value) ->
-    list_to_integer(Value).
-
-
-state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
-    Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
-    case Left of
-    infinity ->
-        State#rep_state{wait = Wait2};
-    _ ->
-        State#rep_state{retries_left = Left - 1, wait = Wait2}
-    end.
-
-
-before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
-    #user_ctx{roles = Roles, name = Name} = UserCtx,
-    case lists:member(<<"_replicator">>, Roles) of
-    true ->
-        Doc;
-    false ->
-        case couch_util:get_value(?OWNER, Body) of
-        undefined ->
-            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-        Name ->
-            Doc;
-        Other ->
-            case (catch couch_db:check_is_admin(Db)) of
-            ok when Other =:= null ->
-                Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-            ok ->
-                Doc;
-            _ ->
-                throw({forbidden, <<"Can't update replication documents",
-                    " from other users.">>})
-            end
-        end
-    end.
-
-
-after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
-    #user_ctx{name = Name} = UserCtx,
-    case (catch couch_db:check_is_admin(Db)) of
-    ok ->
-        Doc;
-    _ ->
-        case couch_util:get_value(?OWNER, Body) of
-        Name ->
-            Doc;
-        _Other ->
-            Source = strip_credentials(couch_util:get_value(<<"source">>,
-Body)),
-            Target = strip_credentials(couch_util:get_value(<<"target">>,
-Body)),
-            NewBody0 = ?replace(Body, <<"source">>, Source),
-            NewBody = ?replace(NewBody0, <<"target">>, Target),
-            #doc{revs = {Pos, [_ | Revs]}} = Doc,
-            NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-            NewRevId = couch_db:new_revid(NewDoc),
-            NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
-        end
-    end.
-
-
-strip_credentials(undefined) ->
-    undefined;
-strip_credentials(Url) when is_binary(Url) ->
-    re:replace(Url,
-        "http(s)?://(?:[^:]+):[^@]+@(.*)$",
-        "http\\1://\\2",
-        [{return, binary}]);
-strip_credentials({Props}) ->
-    {lists:keydelete(<<"oauth">>, 1, Props)}.
-
-scan_all_dbs(Server) when is_pid(Server) ->
-    {ok, Db} = mem3_util:ensure_exists(
-        config:get("mem3", "shards_db", "_dbs")),
-    ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
-    ChangesFun(fun({change, {Change}, _}, _) ->
-        DbName = couch_util:get_value(<<"id">>, Change),
-        case DbName of <<"_design/", _/binary>> -> ok; _Else ->
-            case couch_replicator_utils:is_deleted(Change) of
-            true ->
-                ok;
-            false ->
-                try
-                    [gen_server:cast(Server, {resume_scan, ShardName})
-                        || ShardName <- replicator_shards(DbName)]
-                catch error:database_does_not_exist ->
-                    ok
-                end
-            end
-        end;
-        (_, _) -> ok
-    end),
-    couch_db:close(Db).
-
-
-replicator_shards(DbName) ->
-    case is_replicator_db(DbName) of
-    false ->
-        [];
-    true ->
-        [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
-    end.
-
-
-% calculate random delay proportional to the number of replications
-% on current node, in order to prevent a stampede:
-%   - when a source with multiple replication targets fails
-%   - when we restart couch_replication_manager
-jitter(N) ->
-    Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC),
-    random:uniform(Range).
-
-is_replicator_db(DbName) ->
-    ?REPLICATOR_DB =:= couch_db:dbname_suffix(DbName).
-
-get_json_value(Key, Props) ->
-    get_json_value(Key, Props, undefined).
-
-get_json_value(Key, Props, Default) when is_atom(Key) ->
-    Ref = make_ref(),
-    case couch_util:get_value(Key, Props, Ref) of
-        Ref ->
-            couch_util:get_value(?l2b(atom_to_list(Key)), Props, Default);
-        Else ->
-            Else
-    end;
-get_json_value(Key, Props, Default) when is_binary(Key) ->
-    Ref = make_ref(),
-    case couch_util:get_value(Key, Props, Ref) of
-        Ref ->
-            couch_util:get_value(list_to_atom(?b2l(Key)), Props, Default);
-        Else ->
-            Else
-    end.
-
-
--ifdef(TEST).
-
--include_lib("couch/include/couch_eunit.hrl").
-
-replicator_shards_test_() ->
-{
-      foreach,
-      fun() -> test_util:start_couch([mem3, fabric]) end,
-      fun(Ctx) -> test_util:stop_couch(Ctx) end,
-      [
-          t_pass_replicator_shard(),
-          t_fail_non_replicator_shard()
-     ]
-}.
-
-
-t_pass_replicator_shard() ->
-    ?_test(begin
-        DbName0 = ?tempdb(),
-        DbName = <<DbName0/binary, "/_replicator">>,
-        ok = fabric:create_db(DbName, [?CTX]),
-        ?assertEqual(8, length(replicator_shards(DbName))),
-        fabric:delete_db(DbName, [?CTX])
-    end).
-
-
-t_fail_non_replicator_shard() ->
-    ?_test(begin
-        DbName = ?tempdb(),
-        ok = fabric:create_db(DbName, [?CTX]),
-        ?assertEqual([], replicator_shards(DbName)),
-        fabric:delete_db(DbName, [?CTX])
-    end).
-
-
--endif.
+after_doc_read(Doc, Db) ->
+    couch_replicator_docs:after_doc_read(Doc, Db).
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index 57ad63b..b50c0a1 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -26,18 +26,56 @@ init(_Args) ->
             brutal_kill,
             worker,
             dynamic},
-        {couch_replicator_manager,
-            {couch_replicator_manager, start_link, []},
+       {couch_replicator_clustering,
+            {couch_replicator_clustering, start_link, []},
             permanent,
             brutal_kill,
             worker,
-            [couch_replicator_manager]},
-        {couch_replicator_job_sup,
-            {couch_replicator_job_sup, start_link, []},
+            [couch_replicator_clustering]},
+       {couch_replicator_connection,
+            {couch_replicator_connection, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_connection]},
+       {couch_replicator_rate_limiter,
+            {couch_replicator_rate_limiter, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_rate_limiter]},
+        {couch_replicator_scheduler_sup,
+            {couch_replicator_scheduler_sup, start_link, []},
             permanent,
             infinity,
             supervisor,
-            [couch_replicator_job_sup]}
+            [couch_replicator_scheduler_sup]},
+        {couch_replicator_scheduler,
+            {couch_replicator_scheduler, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_scheduler]},
+        {couch_replicator_doc_processor,
+            {couch_replicator_doc_processor, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_doc_processor]},
+        {couch_replicator,
+            % This is a simple function call which does not create a process
+            % but returns `ignore`. It is used to make sure each node
+            % a local `_replicator` database.
+            {couch_replicator, ensure_rep_db_exists, []},
+            transient,
+            brutal_kill,
+            worker,
+            [couch_replicator]},
+        {couch_replicator_db_changes,
+            {couch_replicator_db_changes, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_multidb_changes]}
     ],
-    {ok, {{one_for_one,10,1}, Children}}.
-
+    {ok, {{rest_for_one,10,1}, Children}}.
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index e96d52a..3fd98a3 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -17,12 +17,14 @@
 -export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
 -export([replication_id/2]).
 -export([sum_stats/2, is_deleted/1]).
+-export([rep_error_to_binary/1]).
+-export([get_json_value/2, get_json_value/3]).
+-export([pp_rep_id/1]).
+-export([iso8601/1]).
 
 -export([handle_db_event/3]).
 
 -include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include("couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
 
 -import(couch_util, [
@@ -31,385 +33,7 @@
 ]).
 
 
-parse_rep_doc({Props}, UserCtx) ->
-    ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
-    Options = make_options(Props),
-    case get_value(cancel, Options, false) andalso
-        (get_value(id, Options, nil) =/= nil) of
-    true ->
-        {ok, #rep{options = Options, user_ctx = UserCtx}};
-    false ->
-        Source = parse_rep_db(get_value(<<"source">>, Props),
-                              ProxyParams, Options),
-        Target = parse_rep_db(get_value(<<"target">>, Props),
-                              ProxyParams, Options),
-
-
-        {RepType, View} = case get_value(<<"filter">>, Props) of
-                <<"_view">> ->
-                    {QP}  = get_value(query_params, Options, {[]}),
-                    ViewParam = get_value(<<"view">>, QP),
-                    View1 = case re:split(ViewParam, <<"/">>) of
-                        [DName, ViewName] ->
-                            {<< "_design/", DName/binary >>, ViewName};
-                        _ ->
-                            throw({bad_request, "Invalid `view` parameter."})
-                    end,
-                    {view, View1};
-                _ ->
-                    {db, nil}
-            end,
-
-        Rep = #rep{
-            source = Source,
-            target = Target,
-            options = Options,
-            user_ctx = UserCtx,
-            type = RepType,
-            view = View,
-            doc_id = get_value(<<"_id">>, Props, null)
-        },
-        {ok, Rep#rep{id = replication_id(Rep)}}
-    end.
-
-
-replication_id(#rep{options = Options} = Rep) ->
-    BaseId = replication_id(Rep, ?REP_ID_VERSION),
-    {BaseId, maybe_append_options([continuous, create_target], Options)}.
-
-
-% Versioned clauses for generating replication IDs.
-% If a change is made to how replications are identified,
-% please add a new clause and increase ?REP_ID_VERSION.
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 3) ->
-    UUID = couch_server:get_uuid(),
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
-    maybe_append_filters([UUID, Src, Tgt], Rep);
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
-    {ok, HostName} = inet:gethostname(),
-    Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
-    P when is_number(P) ->
-        P;
-    _ ->
-        % On restart we might be called before the couch_httpd process is
-        % started.
-        % TODO: we might be under an SSL socket server only, or both under
-        % SSL and a non-SSL socket.
-        % ... mochiweb_socket_server:get(https, port)
-        list_to_integer(config:get("httpd", "port", "5984"))
-    end,
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
-    maybe_append_filters([HostName, Port, Src, Tgt], Rep);
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
-    {ok, HostName} = inet:gethostname(),
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
-    maybe_append_filters([HostName, Src, Tgt], Rep).
-
-
-maybe_append_filters(Base,
-        #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
-    Filter = get_value(filter, Options),
-    DocIds = get_value(doc_ids, Options),
-    Selector = get_value(selector, Options),
-    Base2 = Base ++
-        case {Filter, DocIds, Selector} of
-        {undefined, undefined, undefined} ->
-            [];
-        {<<"_", _/binary>>, undefined, undefined} ->
-            [Filter, get_value(query_params, Options, {[]})];
-        {_, undefined, undefined} ->
-            [filter_code(Filter, Source, UserCtx),
-                get_value(query_params, Options, {[]})];
-        {undefined, _, undefined} ->
-            [DocIds];
-        {undefined, undefined, _} ->
-            [ejsort(mango_selector:normalize(Selector))];
-        _ ->
-            throw({error, <<"`selector`, `filter` and `doc_ids` fields are mutually exclusive">>})
-        end,
-    couch_util:to_hex(couch_crypto:hash(md5, term_to_binary(Base2))).
-
-
-filter_code(Filter, Source, UserCtx) ->
-    {DDocName, FilterName} =
-    case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
-    {match, [DDocName0, FilterName0]} ->
-        {DDocName0, FilterName0};
-    _ ->
-        throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
-    end,
-    Db = case (catch couch_replicator_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of
-    {ok, Db0} ->
-        Db0;
-    DbError ->
-        DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
-           [couch_replicator_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]),
-        throw({error, iolist_to_binary(DbErrorMsg)})
-    end,
-    try
-        Body = case (catch couch_replicator_api_wrap:open_doc(
-            Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
-        {ok, #doc{body = Body0}} ->
-            Body0;
-        DocError ->
-            DocErrorMsg = io_lib:format(
-                "Couldn't open document `_design/~s` from source "
-                "database `~s`: ~s", [DDocName, couch_replicator_api_wrap:db_uri(Source),
-                    couch_util:to_binary(DocError)]),
-            throw({error, iolist_to_binary(DocErrorMsg)})
-        end,
-        Code = couch_util:get_nested_json_value(
-            Body, [<<"filters">>, FilterName]),
-        re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
-    after
-        couch_replicator_api_wrap:db_close(Db)
-    end.
-
-
-maybe_append_options(Options, RepOptions) ->
-    lists:foldl(fun(Option, Acc) ->
-        Acc ++
-        case get_value(Option, RepOptions, false) of
-        true ->
-            "+" ++ atom_to_list(Option);
-        false ->
-            ""
-        end
-    end, [], Options).
-
-
-get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    case OAuth of
-    nil ->
-        {remote, Url, Headers -- DefaultHeaders};
-    #oauth{} ->
-        {remote, Url, Headers -- DefaultHeaders, OAuth}
-    end;
-get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
-    {local, DbName, UserCtx}.
-
-
-parse_rep_db({Props}, ProxyParams, Options) ->
-    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
-    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
-    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
-    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    OAuth = case get_value(<<"oauth">>, AuthProps) of
-    undefined ->
-        nil;
-    {OauthProps} ->
-        #oauth{
-            consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
-            token = ?b2l(get_value(<<"token">>, OauthProps)),
-            token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
-            consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)),
-            signature_method =
-                case get_value(<<"signature_method">>, OauthProps) of
-                undefined ->        hmac_sha1;
-                <<"PLAINTEXT">> ->  plaintext;
-                <<"HMAC-SHA1">> ->  hmac_sha1;
-                <<"RSA-SHA1">> ->   rsa_sha1
-                end
-        }
-    end,
-    #httpdb{
-        url = Url,
-        oauth = OAuth,
-        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
-        ibrowse_options = lists:keysort(1,
-            [{socket_options, get_value(socket_options, Options)} |
-                ProxyParams ++ ssl_params(Url)]),
-        timeout = get_value(connection_timeout, Options),
-        http_connections = get_value(http_connections, Options),
-        retries = get_value(retries, Options)
-    };
-parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
-parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
-parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
-    DbName.
-
-
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
-    maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
-    case lists:last(Url) of
-    $/ ->
-        Url;
-    _ ->
-        Url ++ "/"
-    end.
-
-
-make_options(Props) ->
-    Options0 = lists:ukeysort(1, convert_options(Props)),
-    Options = check_options(Options0),
-    DefWorkers = config:get("replicator", "worker_processes", "4"),
-    DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
-    DefConns = config:get("replicator", "http_connections", "20"),
-    DefTimeout = config:get("replicator", "connection_timeout", "30000"),
-    DefRetries = config:get("replicator", "retries_per_request", "10"),
-    UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
-    DefCheckpointInterval = config:get("replicator", "checkpoint_interval", "30000"),
-    {ok, DefSocketOptions} = couch_util:parse_term(
-        config:get("replicator", "socket_options",
-            "[{keepalive, true}, {nodelay, false}]")),
-    lists:ukeymerge(1, Options, lists:keysort(1, [
-        {connection_timeout, list_to_integer(DefTimeout)},
-        {retries, list_to_integer(DefRetries)},
-        {http_connections, list_to_integer(DefConns)},
-        {socket_options, DefSocketOptions},
-        {worker_batch_size, list_to_integer(DefBatchSize)},
-        {worker_processes, list_to_integer(DefWorkers)},
-        {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
-        {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
-    ])).
-
-
-convert_options([])->
-    [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)->
-    throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
-    [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
-        IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
-    Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
-    [{id, Id} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
-    throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
-    [{create_target, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)->
-    throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
-    [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
-    [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
-    [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
-    convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
-    throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
-    % Ensure same behaviour as old replicator: accept a list of percent
-    % encoded doc IDs.
-    DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
-    [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
-    throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
-    [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
-    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
-    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
-    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
-    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
-    [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
-    {ok, SocketOptions} = couch_util:parse_term(V),
-    [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
-    [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
-    [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
-    [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([_ | R]) -> % skip unknown option
-    convert_options(R).
-
-check_options(Options) ->
-    DocIds = lists:keyfind(doc_ids, 1, Options),
-    Filter = lists:keyfind(filter, 1, Options),
-    Selector = lists:keyfind(selector, 1, Options),
-    case {DocIds, Filter, Selector} of
-        {false, false, false} -> Options;
-        {false, false, _} -> Options;
-        {false, _, false} -> Options;
-        {_, false, false} -> Options;
-        _ ->
-            throw({bad_request, "`doc_ids`, `filter`, `selector` are mutually exclusive options"})
-    end.
-
-
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
-    parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
-    [];
-parse_proxy_params(ProxyUrl) ->
-    #url{
-        host = Host,
-        port = Port,
-        username = User,
-        password = Passwd,
-        protocol = Protocol
-    } = ibrowse_lib:parse_url(ProxyUrl),
-    [{proxy_protocol, Protocol}, {proxy_host, Host}, {proxy_port, Port}] ++
-        case is_list(User) andalso is_list(Passwd) of
-        false ->
-            [];
-        true ->
-            [{proxy_user, User}, {proxy_password, Passwd}]
-        end.
-
-
-ssl_params(Url) ->
-    case ibrowse_lib:parse_url(Url) of
-    #url{protocol = https} ->
-        Depth = list_to_integer(
-            config:get("replicator", "ssl_certificate_max_depth", "3")
-        ),
-        VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
-        CertFile = config:get("replicator", "cert_file", undefined),
-        KeyFile = config:get("replicator", "key_file", undefined),
-        Password = config:get("replicator", "password", undefined),
-        SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
-        SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
-            true ->
-                case Password of
-                    undefined ->
-                        [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
-                    _ ->
-                        [{certfile, CertFile}, {keyfile, KeyFile},
-                            {password, Password}] ++ SslOpts
-                end;
-            false -> SslOpts
-        end,
-        [{is_ssl, true}, {ssl_options, SslOpts1}];
-    #url{protocol = http} ->
-        []
-    end.
-
-ssl_verify_options(Value) ->
-    ssl_verify_options(Value, erlang:system_info(otp_release)).
-
-ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
-    [{verify, verify_none}];
-ssl_verify_options(true, _OTPVersion) ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, 2}, {cacertfile, CAFile}];
-ssl_verify_options(false, _OTPVersion) ->
-    [{verify, 0}].
-
 
-%% New db record has Options field removed here to enable smoother dbcore migration
 open_db(#db{name = Name, user_ctx = UserCtx}) ->
     {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | []]),
     Db;
@@ -444,103 +68,78 @@ handle_db_event(DbName, compacted, Server) ->
 handle_db_event(_DbName, _Event, Server) ->
     {ok, Server}.
 
-% Obsolete - remove in next release
-sum_stats(S1, S2) ->
-    couch_replicator_stats:sum_stats(S1, S2).
 
+
+rep_error_to_binary(Error) ->
+    couch_util:to_binary(error_reason(Error)).
+
+
+error_reason({shutdown, Error}) ->
+    error_reason(Error);
+error_reason({error, {Error, Reason}})
+  when is_atom(Error), is_binary(Reason) ->
+    io_lib:format("~s: ~s", [Error, Reason]);
+error_reason({error, Reason}) ->
+    Reason;
+error_reason(Reason) ->
+    Reason.
+
+
+
+get_json_value(Key, Props) ->
+    get_json_value(Key, Props, undefined).
+
+get_json_value(Key, Props, Default) when is_atom(Key) ->
+    Ref = make_ref(),
+    case get_value(Key, Props, Ref) of
+        Ref ->
+            get_value(?l2b(atom_to_list(Key)), Props, Default);
+        Else ->
+            Else
+    end;
+get_json_value(Key, Props, Default) when is_binary(Key) ->
+    Ref = make_ref(),
+    case get_value(Key, Props, Ref) of
+        Ref ->
+            get_value(list_to_atom(?b2l(Key)), Props, Default);
+        Else ->
+            Else
+    end.
+
+
+% pretty-print replication id
+-spec pp_rep_id(#rep{} | rep_id()) -> string().
+pp_rep_id(#rep{id = RepId}) ->
+    pp_rep_id(RepId);
+pp_rep_id({Base, Extension}) ->
+    Base ++ Extension.
+
+
+% NV: TODO: this function is not used outside api wrap module
+% consider moving it there during final cleanup
 is_deleted(Change) ->
-    case couch_util:get_value(<<"deleted">>, Change) of
+    case get_value(<<"deleted">>, Change) of
     undefined ->
         % keep backwards compatibility for a while
-        couch_util:get_value(deleted, Change, false);
+        get_value(deleted, Change, false);
     Else ->
         Else
     end.
 
+% NV: TODO: proxy some functions which used to be here, later remove
+% these and replace calls to their respective modules
+replication_id(Rep, Version) ->
+    couch_replicator_ids:replication_id(Rep, Version).
+
+sum_stats(S1, S2) ->
+    couch_replicator_stats:sum_stats(S1, S2).
+
+parse_rep_doc(Props, UserCtx) ->
+    couch_replicator_docs:parse_rep_doc(Props, UserCtx).
+
 
-% Sort an EJSON object's properties to attempt
-% to generate a unique representation. This is used
-% to reduce the chance of getting different
-% replication checkpoints for the same Mango selector
-ejsort({V})->
-    ejsort_props(V, []);
-ejsort(V) when is_list(V) ->
-    ejsort_array(V, []);
-ejsort(V) ->
-    V.
-
-ejsort_props([], Acc)->
-    {lists:keysort(1, Acc)};
-ejsort_props([{K, V}| R], Acc) ->
-    ejsort_props(R, [{K, ejsort(V)} | Acc]).
-
-ejsort_array([], Acc)->
-    lists:reverse(Acc);
-ejsort_array([V | R], Acc) ->
-    ejsort_array(R, [ejsort(V) | Acc]).
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-ejsort_basic_values_test() ->
-    ?assertEqual(ejsort(0), 0),
-    ?assertEqual(ejsort(<<"a">>), <<"a">>),
-    ?assertEqual(ejsort(true), true),
-    ?assertEqual(ejsort([]), []),
-    ?assertEqual(ejsort({[]}), {[]}).
-
-ejsort_compound_values_test() ->
-    ?assertEqual(ejsort([2, 1, 3 ,<<"a">>]), [2, 1, 3, <<"a">>]),
-    Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0},  {<<"b">>, 0}]},
-    Ej1s =  {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]},
-    ?assertEqual(ejsort(Ej1), Ej1s),
-    Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]},
-    ?assertEqual(ejsort(Ej2),
-        {[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}).
-
-check_options_pass_values_test() ->
-    ?assertEqual(check_options([]), []),
-    ?assertEqual(check_options([baz, {other,fiz}]), [baz, {other, fiz}]),
-    ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]),
-    ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
-    ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
-
-check_options_fail_values_test() ->
-    ?assertThrow({bad_request, _},
-        check_options([{doc_ids, x}, {filter, y}])),
-    ?assertThrow({bad_request, _},
-        check_options([{doc_ids, x}, {selector, y}])),
-    ?assertThrow({bad_request, _},
-        check_options([{filter, x}, {selector, y}])),
-    ?assertThrow({bad_request, _},
-        check_options([{doc_ids, x}, {selector, y}, {filter, z}])).
-
-check_convert_options_pass_test() ->
-    ?assertEqual([], convert_options([])),
-    ?assertEqual([], convert_options([{<<"random">>, 42}])),
-    ?assertEqual([{cancel, true}],
-        convert_options([{<<"cancel">>, true}])),
-    ?assertEqual([{create_target, true}],
-        convert_options([{<<"create_target">>, true}])),
-    ?assertEqual([{continuous, true}],
-        convert_options([{<<"continuous">>, true}])),
-    ?assertEqual([{doc_ids, [<<"id">>]}],
-        convert_options([{<<"doc_ids">>, [<<"id">>]}])),
-    ?assertEqual([{selector, {key, value}}],
-        convert_options([{<<"selector">>, {key, value}}])).
-
-check_convert_options_fail_test() ->
-    ?assertThrow({bad_request, _},
-        convert_options([{<<"cancel">>, <<"true">>}])),
-    ?assertThrow({bad_request, _},
-        convert_options([{<<"create_target">>, <<"true">>}])),
-    ?assertThrow({bad_request, _},
-        convert_options([{<<"continuous">>, <<"true">>}])),
-    ?assertThrow({bad_request, _},
-        convert_options([{<<"doc_ids">>, not_a_list}])),
-    ?assertThrow({bad_request, _},
-        convert_options([{<<"selector">>, [{key, value}]}])).
-
--endif.
+-spec iso8601(erlang:timestamp()) -> binary().
+iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
+    {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
+    Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
+    iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index ee0c455..1907879 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -196,6 +196,9 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
         {noreply, State2}
     end;
 
+handle_info({'EXIT', _Pid, max_backoff}, State) ->
+    {stop, {shutdown, max_backoff}, State};
+
 handle_info({'EXIT', Pid, Reason}, State) ->
    {stop, {process_died, Pid, Reason}, State}.
 
diff --git a/src/couch_replicator/test/couch_replicator_compact_tests.erl b/src/couch_replicator/test/couch_replicator_compact_tests.erl
index 7a5a25a..4dce69b 100644
--- a/src/couch_replicator/test/couch_replicator_compact_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_compact_tests.erl
@@ -16,6 +16,10 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_replicator/src/couch_replicator.hrl").
 
+-import(couch_replicator_test_helper, [
+    db_url/1
+]).
+
 -define(ATTFILE, filename:join([?FIXTURESDIR, "logo.png"])).
 -define(DELAY, 100).
 -define(TIMEOUT, 30000).
@@ -92,7 +96,7 @@ should_run_replication(RepPid, RepId, Source, Target) ->
 should_ensure_replication_still_running(RepPid, RepId, Source, Target) ->
     ?_test(check_active_tasks(RepPid, RepId, Source, Target)).
 
-check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
+check_active_tasks(RepPid, {BaseId, Ext} = RepId, Src, Tgt) ->
     Source = case Src of
         {remote, NameSrc} ->
             <<(db_url(NameSrc))/binary, $/>>;
@@ -107,7 +111,7 @@ check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
     end,
     FullRepId = ?l2b(BaseId ++ Ext),
     Pid = ?l2b(pid_to_list(RepPid)),
-    ok = wait_for_replicator(RepPid),
+    ok = wait_for_replicator(RepId),
     [RepTask] = couch_task_status:all(),
     ?assertEqual(Pid, couch_util:get_value(pid, RepTask)),
     ?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)),
@@ -124,16 +128,25 @@ check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
     Pending = couch_util:get_value(changes_pending, RepTask),
     ?assert(is_integer(Pending)).
 
-wait_for_replicator(Pid) ->
+
+get_pid(RepId) ->
+    Pid = global:whereis_name({couch_replicator_scheduler_job,RepId}),
+    ?assert(is_pid(Pid)),
+    Pid.
+
+rep_details(RepId) ->
+    gen_server:call(get_pid(RepId), get_details).
+
+wait_for_replicator(RepId) ->
     %% since replicator started asynchronously
     %% we need to wait when it would be in couch_task_status
     %% we query replicator:details to ensure that do_init happen
-    ?assertMatch({ok, _}, couch_replicator:details(Pid)),
+    ?assertMatch({ok, _}, rep_details(RepId)),
     ok.
 
 should_cancel_replication(RepId, RepPid) ->
     ?_assertNot(begin
-        {ok, _} = couch_replicator:cancel_replication(RepId),
+        ok = couch_replicator_scheduler:remove_job(RepId),
         is_process_alive(RepPid)
     end).
 
@@ -295,13 +308,6 @@ wait_for_compaction(Type, Db) ->
                                          " database failed with: ", Reason])}]})
     end.
 
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
 replicate({remote, Db}, Target) ->
     replicate(db_url(Db), Target);
 
@@ -315,7 +321,9 @@ replicate(Source, Target) ->
         {<<"continuous">>, true}
     ]},
     {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    ok = couch_replicator_scheduler:add_job(Rep),
+    couch_replicator_scheduler:reschedule(),
+    Pid = get_pid(Rep#rep.id),
     {ok, Pid, Rep#rep.id}.
 
 
diff --git a/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl b/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl
index ea36f7f..c4ad4e9 100644
--- a/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_httpc_pool_tests.erl
@@ -30,7 +30,7 @@ httpc_pool_test_() ->
         "httpc pool tests",
         {
             setup,
-            fun test_util:start_couch/0, fun test_util:stop_couch/1,
+            fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
             {
                 foreach,
                 fun setup/0, fun teardown/1,
diff --git a/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl b/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl
index bde0e2c..a6999bd 100644
--- a/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_many_leaves_tests.erl
@@ -15,6 +15,11 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+-import(couch_replicator_test_helper, [
+    db_url/1,
+    replicate/2
+]).
+
 -define(DOCS_CONFLICTS, [
     {<<"doc1">>, 10},
     {<<"doc2">>, 100},
@@ -199,22 +204,3 @@ add_attachments(SourceDb, NumAtts,  [{DocId, NumConflicts} | Rest]) ->
     ?assertEqual(length(NewDocs), length(NewRevs)),
     add_attachments(SourceDb, NumAtts, Rest).
 
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
-replicate(Source, Target) ->
-    RepObject = {[
-        {<<"source">>, Source},
-        {<<"target">>, Target}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-        {'DOWN', MonRef, process, Pid, _} ->
-            ok
-    end.
diff --git a/src/couch_replicator/test/couch_replicator_modules_load_tests.erl b/src/couch_replicator/test/couch_replicator_modules_load_tests.erl
index 96a9346..a552d14 100644
--- a/src/couch_replicator/test/couch_replicator_modules_load_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_modules_load_tests.erl
@@ -28,13 +28,18 @@ should_load_modules() ->
         couch_replicator_httpc,
         couch_replicator_httpd,
         couch_replicator_manager,
+        couch_replicator_scheduler,
+        couch_replicator_scheduler_job,
+        couch_replicator_docs,
+        couch_replicator_clustering,
+        couch_replicator_changes_reader,
+        couch_replicator_ids,
         couch_replicator_notifier,
         couch_replicator,
         couch_replicator_worker,
-        couch_replicator_utils,
-        couch_replicator_job_sup
+        couch_replicator_utils
     ],
     [should_load_module(Mod) || Mod <- Modules].
 
 should_load_module(Mod) ->
-    {atom_to_list(Mod), ?_assertMatch({module, _}, code:load_file(Mod))}.
+    {atom_to_list(Mod), ?_assertMatch({module, _}, code:ensure_loaded(Mod))}.
diff --git a/src/couch_replicator/test/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/couch_replicator_proxy_tests.erl
new file mode 100644
index 0000000..a40e5b1
--- /dev/null
+++ b/src/couch_replicator/test/couch_replicator_proxy_tests.erl
@@ -0,0 +1,69 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_proxy_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+-include_lib("couch_replicator/src/couch_replicator_api_wrap.hrl").
+
+
+setup() ->
+    ok.
+
+
+teardown(_) ->
+    ok.
+
+
+replicator_proxy_test_() ->
+    {
+        "replicator proxy tests",
+        {
+            setup,
+            fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun parse_rep_doc_without_proxy/1,
+                    fun parse_rep_doc_with_proxy/1
+                ]
+            }
+        }
+    }.
+
+
+parse_rep_doc_without_proxy(_) ->
+    ?_test(begin
+        NoProxyDoc = {[
+            {<<"source">>, <<"http://unproxied.com">>},
+            {<<"target">>, <<"http://otherunproxied.com">>}
+        ]},
+        Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+        ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
+        ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined)
+    end).
+
+
+parse_rep_doc_with_proxy(_) ->
+    ?_test(begin
+        ProxyURL = <<"http://myproxy.com">>,
+        ProxyDoc = {[
+            {<<"source">>, <<"http://unproxied.com">>},
+            {<<"target">>, <<"http://otherunproxied.com">>},
+            {<<"proxy">>, ProxyURL}
+        ]},
+        Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+        ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
+        ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL))
+    end).
diff --git a/src/couch_replicator/test/couch_replicator_test_helper.erl b/src/couch_replicator/test/couch_replicator_test_helper.erl
index 398b27b..f52ab11 100644
--- a/src/couch_replicator/test/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/couch_replicator_test_helper.erl
@@ -2,7 +2,7 @@
 
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
-
+-include_lib("couch_replicator/src/couch_replicator.hrl").
 -export([compare_dbs/2, compare_dbs/3, db_url/1, replicate/1, replicate/2]).
 
 
@@ -103,6 +103,11 @@ db_url(DbName) ->
         "/", DbName
     ]).
 
+get_pid(RepId) ->
+    Pid = global:whereis_name({couch_replicator_scheduler_job,RepId}),
+    ?assert(is_pid(Pid)),
+    Pid.
+
 replicate(Source, Target) ->
     replicate({[
         {<<"source">>, Source},
@@ -111,9 +116,12 @@ replicate(Source, Target) ->
 
 replicate({[_ | _]} = RepObject) ->
     {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    ok = couch_replicator_scheduler:add_job(Rep),
+    couch_replicator_scheduler:reschedule(),
+    Pid = get_pid(Rep#rep.id),
     MonRef = erlang:monitor(process, Pid),
     receive
         {'DOWN', MonRef, process, Pid, _} ->
             ok
-    end.
+    end,
+    ok = couch_replicator_scheduler:remove_job(Rep#rep.id).
diff --git a/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl b/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl
index e04488e..73ea7f1 100644
--- a/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl
@@ -15,6 +15,11 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+-import(couch_replicator_test_helper, [
+    db_url/1,
+    replicate/1
+]).
+
 -define(DOCS_COUNT, 100).
 -define(TIMEOUT_EUNIT, 30).
 -define(i2l(I), integer_to_list(I)).
@@ -167,23 +172,10 @@ compare_dbs(Source, Target) ->
     ok = couch_db:close(SourceDb),
     ok = couch_db:close(TargetDb).
 
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
 replicate(Source, Target, UseCheckpoints) ->
-    RepObject = {[
+    replicate({[
         {<<"source">>, Source},
         {<<"target">>, Target},
         {<<"use_checkpoints">>, UseCheckpoints}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-        {'DOWN', MonRef, process, Pid, _} ->
-            ok
-    end.
+    ]}).
+

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <commits@couchdb.apache.org>.

Mime
View raw message