couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [13/44] couch-replicator commit: updated refs/heads/63012-defensive to 1afa5ea
Date Tue, 07 Jun 2016 11:05:17 GMT
Remove `couch_replicator_manager`

Its logic is now spread amongst a few modules:

 - `couch_multidb_changes`:
    Monitor `*/_replicator` dbs, run callbacks for db / doc changes.

 - `couch_replicator_clustering`:
    Keep track of cluster membership. Trigger rescans, by restarting
    `couch_multidb_changes`. Resolve replication ownership.

 - `couch_replicator`:
    Create local `_replicator` db on start-up. Add / remove replication jobs.
    Call `couch_replicator_docs` to write info back to replicator docs. Handle
    `_replicate` endpoint replications but use job scheduler instead.

Also:

* Dialyze more code, especially in couch_replicatorl and couch_replicator_docs.

* Keep `couch_replicator_manager` module around for a while in order to
proxy calls made from other applications. Otherwise would have to clone other
repos. Remove it and update code accordingly on final cleanup.

* Update `owner` function from couch_replicator_clustering to have result which
makes more sense for replication jobs.

* Avoided touching scheduler as there are prs pending for it. Created a few
job manipulation functions in couch_replicator.erl. Moving those to scheduler
API eventually, as they access internal job scheduler data in a hacky way.

* A major change to behavior is errors are not written back to docs anymore
(except errors in parsing the doc). Triggered updates are still written because
they also tag the doc with the replication_id.

Updated eunit tests, all pass (still):

```
$ make eunit apps=couch_replicator
...
 [done in 51.177 s]
=======================================================
  All 171 tests passed.
```

Replications running from dev cluster with `rep` tool still trigger documents
and replicate data.

```
>>> rep.replicate_1_to_n_then_check_replication(2)
creating rdyno_src_0001
 > created  1 dbs with prefix rdyno_src_
creating rdyno_tgt_0001
creating rdyno_tgt_0002
 > created  2 dbs with prefix rdyno_tgt_
updating documents
 > _replicator rdyno_0001_0001 : 1-34f09a6a62181f4d4631bf8544b582fc
 > _replicator rdyno_0001_0002 : 1-0f52a360f959fea67e6cb20aa1bb0606
waiting for replication documents to trigger
 > function wait_to_trigger succeded after 0.011 +/- 10.0  sec.
all replication documents triggered
>>> update cycle 0  <<<
 > waiting for target  1
 > retrying function wait_till_dbs_equal
...
 > function wait_till_dbs_equal succeded after 49.574 +/- 1.0  sec.
 > waiting for target  2
 > function wait_till_dbs_equal succeded after 0.019 +/- 1.0  sec.
 > waiting to propagate changes from  1 to 2  : 49.623 sec.
```


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/72e1f1ed
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/72e1f1ed
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/72e1f1ed

Branch: refs/heads/63012-defensive
Commit: 72e1f1ed1a72def3840748ed529ef672e12c55ec
Parents: e69aaab
Author: Nick Vatamaniuc <vatamane@gmail.com>
Authored: Sun May 22 04:02:39 2016 -0400
Committer: Nick Vatamaniuc <vatamane@gmail.com>
Committed: Mon May 23 15:30:59 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator.app.src                    |   22 +-
 src/couch_replicator.erl                        | 1140 +++---------------
 src/couch_replicator.hrl                        |    2 +
 src/couch_replicator_clustering.erl             |   22 +-
 src/couch_replicator_docs.erl                   |   65 +-
 src/couch_replicator_manager.erl                |  303 +----
 src/couch_replicator_manager_sup.erl            |   56 -
 src/couch_replicator_scheduler_job.erl          |   47 +-
 src/couch_replicator_sup.erl                    |   58 +-
 src/couch_replicator_utils.erl                  |   10 +
 test/couch_replicator_compact_tests.erl         |   33 +-
 test/couch_replicator_many_leaves_tests.erl     |   24 +-
 test/couch_replicator_modules_load_tests.erl    |    9 +-
 test/couch_replicator_test_helper.erl           |   20 +-
 test/couch_replicator_use_checkpoints_tests.erl |   24 +-
 15 files changed, 382 insertions(+), 1453 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/src/couch_replicator.app.src
----------------------------------------------------------------------
diff --git a/src/couch_replicator.app.src b/src/couch_replicator.app.src
index 4f12195..f3d3dae 100644
--- a/src/couch_replicator.app.src
+++ b/src/couch_replicator.app.src
@@ -14,24 +14,12 @@
     {description, "CouchDB replicator"},
     {vsn, git},
     {mod, {couch_replicator_app, []}},
-    {modules, [
-        couch_replicator,
-        couch_replicator_api_wrap,
-        couch_replicator_app,
-        couch_replicator_httpc,
-        couch_replicator_httpd,
-        couch_replicator_job_sup,
-        couch_replicator_notifier,
-        couch_replicator_manager,
-        couch_replicator_httpc_pool,
-        couch_replicator_sup,
-        couch_replicator_utils,
-        couch_replicator_worker
-    ]},
     {registered, [
-        couch_replicator,
-        couch_replicator_manager,
-        couch_replicator_job_sup
+        couch_replicator_sup,
+        couch_replication,  % couch_replication_event gen_event
+        couch_replicator_clustering,
+        couch_replicator_scheduler,
+        couch_replicator_scheduler_sup
     ]},
     {applications, [
         kernel,

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 7f0c7ee..996bda2 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -11,79 +11,38 @@
 % the License.
 
 -module(couch_replicator).
--behaviour(gen_server).
--vsn(1).
+-behaviour(couch_multidb_changes).
 
 % public API
 -export([replicate/2]).
 
-% meant to be used only by the replicator database listener
--export([async_replicate/1]).
--export([cancel_replication/1]).
+% called from couch_replicator_sup supervisor
+-export([ensure_rep_db_exists/0]).
 
-% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
--export([format_status/2]).
-
--export([details/1]).
+% multidb changes callback
+-export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
 
 -include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
 
--define(LOWEST_SEQ, 0).
-
--define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+-import(couch_replicator_utils, [
+    get_json_value/2,
+    get_json_value/3,
+    pp_rep_id/1
+]).
 
 -import(couch_util, [
     get_value/2,
-    get_value/3,
-    to_binary/1
+    get_value/3
 ]).
 
--import(couch_replicator_utils, [
-    start_db_compaction_notifier/2,
-    stop_db_compaction_notifier/1
-]).
-
--record(rep_state, {
-    rep_details,
-    source_name,
-    target_name,
-    source,
-    target,
-    history,
-    checkpoint_history,
-    start_seq,
-    committed_seq,
-    current_through_seq,
-    seqs_in_progress = [],
-    highest_seq_done = {0, ?LOWEST_SEQ},
-    source_log,
-    target_log,
-    rep_starttime,
-    src_starttime,
-    tgt_starttime,
-    timer, % checkpoint timer
-    changes_queue,
-    changes_manager,
-    changes_reader,
-    workers,
-    stats = couch_replicator_stats:new(),
-    session_id,
-    source_db_compaction_notifier = nil,
-    target_db_compaction_notifier = nil,
-    source_monitor = nil,
-    target_monitor = nil,
-    source_seq = nil,
-    use_checkpoints = true,
-    checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
-    type = db,
-    view = nil
-}).
 
 
+-spec replicate({[_]}, #user_ctx{}) ->
+    {ok, {continuous, binary()}} |
+    {ok, {[_]}} |
+    {ok, {cancelled, binary()}} |
+    {error, any()}.
 replicate(PostBody, Ctx) ->
     {ok, #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep} =
         couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
@@ -103,75 +62,59 @@ replicate(PostBody, Ctx) ->
     end.
 
 
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
-    case async_replicate(Rep) of
-    {ok, _Pid} ->
-        case get_value(continuous, Options, false) of
-        true ->
-            {ok, {continuous, ?l2b(BaseId ++ Ext)}};
-        false ->
-            wait_for_result(Id)
-        end;
-    Error ->
-        Error
-    end.
+% This is called from supervisor. Must respect supervisor protocol so
+% it returns `ignore`.
+ensure_rep_db_exists() ->
+    couch_log:notice("~p : created local _replicator database", [?MODULE]),
+    ignore.
 
 
-async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
-    RepChildId = BaseId ++ Ext,
-    Source = couch_replicator_api_wrap:db_uri(Src),
-    Target = couch_replicator_api_wrap:db_uri(Tgt),
-    Timeout = get_value(connection_timeout, Rep#rep.options),
-    ChildSpec = {
-        RepChildId,
-        {gen_server, start_link, [?MODULE, Rep, [{timeout, Timeout}]]},
-        temporary,
-        250,
-        worker,
-        [?MODULE]
-    },
-    % All these nested cases to attempt starting/restarting a replication child
-    % are ugly and not 100% race condition free. The following patch submission
-    % is a solution:
-    %
-    % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
-    %
-    case supervisor:start_child(couch_replicator_job_sup, ChildSpec) of
-    {ok, Pid} ->
-        couch_log:notice("starting new replication `~s` at ~p (`~s` -> `~s`)",
-            [RepChildId, Pid, Source, Target]),
-        {ok, Pid};
-    {error, already_present} ->
-        case supervisor:restart_child(couch_replicator_job_sup, RepChildId) of
-        {ok, Pid} ->
-            couch_log:notice("restarting replication `~s` at ~p (`~s` -> `~s`)",
-                [RepChildId, Pid, Source, Target]),
-            {ok, Pid};
-        {error, running} ->
-            %% this error occurs if multiple replicators are racing
-            %% each other to start and somebody else won. Just grab
-            %% the Pid by calling start_child again.
-            timer:sleep(50 + random:uniform(100)),
-            async_replicate(Rep);
-        {error, {'EXIT', {badarg,
-            [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
-            % Clause to deal with a change in the supervisor module introduced
-            % in R14B02. For more details consult the thread at:
-            %     http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
-            _ = supervisor:delete_child(couch_replicator_job_sup, RepChildId),
-            async_replicate(Rep);
-        {error, _} = Error ->
-            Error
-        end;
-    {error, {already_started, Pid}} ->
-        couch_log:notice("replication `~s` already running at ~p (`~s` -> `~s`)",
-            [RepChildId, Pid, Source, Target]),
-        {ok, Pid};
-    {error, {Error, _}} ->
-        {error, Error}
-    end.
+%%%%%% Multidb changes callbacks
+
+db_created(DbName, Server) ->
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    Server.
+
+db_deleted(DbName, Server) ->
+    clean_up_replications(DbName),
+    Server.
 
+db_found(DbName, Server) ->
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    Server.
+
+db_change(DbName, {ChangeProps} = Change, Server) ->
+    try
+        ok = process_update(DbName, Change)
+    catch
+    _Tag:Error ->
+        {RepProps} = get_json_value(doc, ChangeProps),
+        DocId = get_json_value(<<"_id">>, RepProps),
+        couch_replicator_docs:update_doc_process_error(DbName, DocId, Error)
+    end,
+    Server.
 
+
+%%%%%%%%% Private helper functions
+
+-spec do_replication_loop(#rep{}) ->
+    {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
+do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
+    case couch_replicator_scheduler:add_job(Rep) of
+    ok ->
+        ok;
+    {error, already_added} ->
+        couch_log:notice("Replication '~s' already running", [BaseId ++ Ext]),
+        ok
+    end,
+    case get_value(continuous, Options, false) of
+    true ->
+        {ok, {continuous, ?l2b(BaseId ++ Ext)}};
+    false ->
+        wait_for_result(Id)
+    end.
+
+-spec rep_result_listener(rep_id()) -> {ok, pid()}.
 rep_result_listener(RepId) ->
     ReplyTo = self(),
     {ok, _Listener} = couch_replicator_notifier:start_link(
@@ -181,7 +124,8 @@ rep_result_listener(RepId) ->
                 ok
         end).
 
-
+-spec wait_for_result(rep_id()) ->
+    {ok, any()} | {error, any()}.
 wait_for_result(RepId) ->
     receive
     {finished, RepId, RepResult} ->
@@ -191,847 +135,145 @@ wait_for_result(RepId) ->
     end.
 
 
-cancel_replication({BaseId, Extension}) ->
-    FullRepId = BaseId ++ Extension,
-    couch_log:notice("Canceling replication `~s`...", [FullRepId]),
-    case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
-    ok ->
-        couch_log:notice("Replication `~s` canceled.", [FullRepId]),
-        case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
-            ok ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            {error, not_found} ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            Error ->
-                Error
-        end;
-    Error ->
-        couch_log:error("Error canceling replication `~s`: ~p", [FullRepId, Error]),
-        Error
+-spec cancel_replication(rep_id()) ->
+    {ok, {cancelled, binary()}} | {error, not_found}.
+cancel_replication({BasedId, Extension} = RepId) ->
+    FullRepId = BasedId ++ Extension,
+    couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
+    case rep_state(RepId) of
+    #rep{} ->
+        ok = couch_replicator_scheduler:remove_job(RepId),
+        couch_log:notice("Replication '~s' cancelled", [FullRepId]),
+        {ok, {cancelled, ?l2b(FullRepId)}};
+    nil ->
+        couch_log:notice("Replication '~s' not found", [FullRepId]),
+        {error, not_found}
     end.
 
+
+-spec cancel_replication(rep_id(), #user_ctx{}) ->
+    {ok, {cancelled, binary()}} | {error, not_found}.
 cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
     case lists:member(<<"_admin">>, Roles) of
     true ->
         cancel_replication(RepId);
     false ->
-        case find_replicator(RepId) of
-        {ok, Pid} ->
-            case details(Pid) of
-            {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
-                cancel_replication(RepId);
-            {ok, _} ->
-                throw({unauthorized,
-                    <<"Can't cancel a replication triggered by another user">>});
-            Error ->
-                Error
-            end;
-        Error ->
-            Error
-        end
-    end.
-
-find_replicator({BaseId, Ext} = _RepId) ->
-    case lists:keysearch(
-        BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
-    {value, {_, Pid, _, _}} when is_pid(Pid) ->
-            {ok, Pid};
-    _ ->
+        case rep_state(RepId) of
+        #rep{user_ctx = #user_ctx{name = Name}} ->
+            cancel_replication(RepId);
+        #rep{user_ctx = #user_ctx{name = _Other}} ->
+            throw({unauthorized,
+                <<"Can't cancel a replication triggered by another user">>});
+        nil ->
             {error, not_found}
-    end.
-
-details(Pid) ->
-    case (catch gen_server:call(Pid, get_details)) of
-    {ok, Rep} ->
-        {ok, Rep};
-    {'EXIT', {noproc, {gen_server, call, _}}} ->
-        {error, not_found};
-    Error ->
-        throw(Error)
-    end.
-
-init(InitArgs) ->
-    {ok, InitArgs, 0}.
-
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
-    process_flag(trap_exit, true),
-
-    random:seed(os:timestamp()),
-
-    #rep_state{
-        source = Source,
-        target = Target,
-        source_name = SourceName,
-        target_name = TargetName,
-        start_seq = {_Ts, StartSeq},
-        committed_seq = {_, CommittedSeq},
-        highest_seq_done = {_, HighestSeq},
-        checkpoint_interval = CheckpointInterval
-    } = State = init_state(Rep),
-
-    NumWorkers = get_value(worker_processes, Options),
-    BatchSize = get_value(worker_batch_size, Options),
-    {ok, ChangesQueue} = couch_work_queue:new([
-        {max_items, BatchSize * NumWorkers * 2},
-        {max_size, 100 * 1024 * NumWorkers}
-    ]),
-    % This starts the _changes reader process. It adds the changes from
-    % the source db to the ChangesQueue.
-    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
-        StartSeq, Source, ChangesQueue, Options
-    ),
-    % Changes manager - responsible for dequeing batches from the changes queue
-    % and deliver them to the worker processes.
-    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
-    % This starts the worker processes. They ask the changes queue manager for a
-    % a batch of _changes rows to process -> check which revs are missing in the
-    % target, and for the missing ones, it copies them from the source to the target.
-    MaxConns = get_value(http_connections, Options),
-    Workers = lists:map(
-        fun(_) ->
-            couch_stats:increment_counter([couch_replicator, workers_started]),
-            {ok, Pid} = couch_replicator_worker:start_link(
-                self(), Source, Target, ChangesManager, MaxConns),
-            Pid
-        end,
-        lists:seq(1, NumWorkers)),
-
-    couch_task_status:add_task([
-        {type, replication},
-        {user, UserCtx#user_ctx.name},
-        {replication_id, ?l2b(BaseId ++ Ext)},
-        {database, Rep#rep.db_name},
-        {doc_id, Rep#rep.doc_id},
-        {source, ?l2b(SourceName)},
-        {target, ?l2b(TargetName)},
-        {continuous, get_value(continuous, Options, false)},
-        {revisions_checked, 0},
-        {missing_revisions_found, 0},
-        {docs_read, 0},
-        {docs_written, 0},
-        {changes_pending, get_pending_count(State)},
-        {doc_write_failures, 0},
-        {source_seq, HighestSeq},
-        {checkpointed_source_seq, CommittedSeq},
-        {checkpoint_interval, CheckpointInterval}
-    ]),
-    couch_task_status:set_update_frequency(1000),
-
-    % Until OTP R14B03:
-    %
-    % Restarting a temporary supervised child implies that the original arguments
-    % (#rep{} record) specified in the MFA component of the supervisor
-    % child spec will always be used whenever the child is restarted.
-    % This implies the same replication performance tunning parameters will
-    % always be used. The solution is to delete the child spec (see
-    % cancel_replication/1) and then start the replication again, but this is
-    % unfortunately not immune to race conditions.
-
-    couch_log:notice("Replication `~p` is using:~n"
-        "~c~p worker processes~n"
-        "~ca worker batch size of ~p~n"
-        "~c~p HTTP connections~n"
-        "~ca connection timeout of ~p milliseconds~n"
-        "~c~p retries per request~n"
-        "~csocket options are: ~s~s",
-        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
-            MaxConns, $\t, get_value(connection_timeout, Options),
-            $\t, get_value(retries, Options),
-            $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
-            case StartSeq of
-            ?LOWEST_SEQ ->
-                "";
-            _ ->
-                io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
-            end]),
-
-    couch_log:debug("Worker pids are: ~p", [Workers]),
-
-    couch_replicator_manager:replication_started(Rep),
-
-    {ok, State#rep_state{
-            changes_queue = ChangesQueue,
-            changes_manager = ChangesManager,
-            changes_reader = ChangesReader,
-            workers = Workers
-        }
-    }.
-
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
-    Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
-    couch_log:notice(Msg, [RepId]),
-    Src#httpdb{http_connections = 2};
-
-adjust_maxconn(Src, _RepId) ->
-    Src.
-
-handle_info(shutdown, St) ->
-    {stop, shutdown, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
-    couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
-    couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
-    couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
-    case Workers -- [Pid] of
-    Workers ->
-        couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
-        {noreply, State#rep_state{workers = Workers}};
-        %% not clear why a stop was here before
-        %%{stop, {unknown_process_died, Pid, normal}, State};
-    [] ->
-        catch unlink(State#rep_state.changes_manager),
-        catch exit(State#rep_state.changes_manager, kill),
-        do_last_checkpoint(State);
-    Workers2 ->
-        {noreply, State#rep_state{workers = Workers2}}
-    end;
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
-    State2 = cancel_timer(State),
-    case lists:member(Pid, Workers) of
-    false ->
-        {stop, {unknown_process_died, Pid, Reason}, State2};
-    true ->
-        couch_stats:increment_counter([couch_replicator, worker_deaths]),
-        couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
-    end;
-
-handle_info(timeout, InitArgs) ->
-    try do_init(InitArgs) of {ok, State} ->
-        {noreply, State}
-    catch Class:Error ->
-        Stack = erlang:get_stacktrace(),
-        {stop, shutdown, {error, Class, Error, Stack, InitArgs}}
-    end.
-
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
-    {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
-    gen_server:reply(From, ok),
-    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
-    {noreply, State#rep_state{stats = NewStats}};
-
-handle_call({report_seq_done, Seq, StatsInc}, From,
-    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
-        current_through_seq = ThroughSeq, stats = Stats} = State) ->
-    gen_server:reply(From, ok),
-    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
-    [] ->
-        {Seq, []};
-    [Seq | Rest] ->
-        {Seq, Rest};
-    [_ | _] ->
-        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
-    end,
-    NewHighestDone = lists:max([HighestDone, Seq]),
-    NewThroughSeq = case NewSeqsInProgress of
-    [] ->
-        lists:max([NewThroughSeq0, NewHighestDone]);
-    _ ->
-        NewThroughSeq0
-    end,
-    couch_log:debug("Worker reported seq ~p, through seq was ~p, "
-        "new through seq is ~p, highest seq done was ~p, "
-        "new highest seq done is ~p~n"
-        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
-        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
-            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
-    NewState = State#rep_state{
-        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
-        current_through_seq = NewThroughSeq,
-        seqs_in_progress = NewSeqsInProgress,
-        highest_seq_done = NewHighestDone
-    },
-    update_task(NewState),
-    {noreply, NewState}.
-
-
-handle_cast({db_compacted, DbName},
-    #rep_state{source = #db{name = DbName} = Source} = State) ->
-    {ok, NewSource} = couch_db:reopen(Source),
-    {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
-    #rep_state{target = #db{name = DbName} = Target} = State) ->
-    {ok, NewTarget} = couch_db:reopen(Target),
-    {noreply, State#rep_state{target = NewTarget}};
-
-handle_cast(checkpoint, State) ->
-    #rep_state{rep_details = #rep{} = Rep} = State,
-    case couch_replicator_manager:continue(Rep) of
-    {true, _} ->
-        case do_checkpoint(State) of
-        {ok, NewState} ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-            {noreply, NewState#rep_state{timer = start_timer(State)}};
-        Error ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
-            {stop, Error, State}
+        end
+     end.
+
+
+
+-spec process_update(binary(), {[_]}) -> ok.
+process_update(DbName, {Change}) ->
+    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
+    DocId = get_json_value(<<"_id">>, RepProps),
+    Owner = couch_replicator_clustering:owner(DbName, DocId),
+    case {Owner, get_json_value(deleted, Change, false)} of
+    {_, true} ->
+        remove_jobs(DbName, DocId);
+    {unstable, false} ->
+	couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
+    {ThisNode, false} when ThisNode =:= node() ->
+        couch_log:notice("Maybe starting '~s' as I'm the owner", [DocId]),
+        case get_json_value(<<"_replication_state">>, RepProps) of
+        undefined ->
+            maybe_start_replication(DbName, DocId, JsonRepDoc);
+        <<"triggered">> ->
+            maybe_start_replication(DbName, DocId, JsonRepDoc);
+        <<"completed">> ->
+            couch_log:notice("Replication '~s' marked as completed", [DocId])
         end;
-    {false, Owner} ->
-        couch_replicator_manager:replication_usurped(Rep, Owner),
-        {stop, shutdown, State}
-    end;
-
-handle_cast({report_seq, Seq},
-    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
-    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
-    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-code_change(_OldVsn, #rep_state{}=State, _Extra) ->
-    {ok, State}.
-
-
-headers_strip_creds([], Acc) ->
-    lists:reverse(Acc);
-headers_strip_creds([{Key, Value0} | Rest], Acc) ->
-    Value = case string:to_lower(Key) of
-    "authorization" ->
-        "****";
-    _ ->
-        Value0
+     {Owner, false} ->
+         couch_log:notice("Not starting '~s' as owner is ~s.", [DocId, Owner])
     end,
-    headers_strip_creds(Rest, [{Key, Value} | Acc]).
-
-
-httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
-    HttpDb#httpdb{
-        url = couch_util:url_strip_password(Url),
-        headers = headers_strip_creds(Headers, [])
-    };
-httpdb_strip_creds(LocalDb) ->
-    LocalDb.
-
-
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
-    Rep#rep{
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
-    % #rep_state contains the source and target at the top level and also
-    % in the nested #rep_details record
-    State#rep_state{
-        rep_details = rep_strip_creds(Rep),
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
-    checkpoint_history = CheckpointHistory} = State) ->
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
-    couch_replicator_manager:replication_completed(Rep, rep_stats(State));
-
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
-    % cancelled replication throught ?MODULE:cancel_replication/1
-    couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
-    terminate_cleanup(State);
-
-terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
-    #rep{id=RepId} = InitArgs,
-    couch_stats:increment_counter([couch_replicator, failed_starts]),
-    CleanInitArgs = rep_strip_creds(InitArgs),
-    couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
-             [Class, Error, CleanInitArgs, Stack]),
-    case Error of
-    {unauthorized, DbUri} ->
-        NotifyError = {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
-    {db_not_found, DbUri} ->
-        NotifyError = {db_not_found, <<"could not open ", DbUri/binary>>};
-    _ ->
-        NotifyError = Error
-    end,
-    couch_replicator_notifier:notify({error, RepId, NotifyError}),
-    couch_replicator_manager:replication_error(InitArgs, NotifyError);
-terminate(Reason, State) ->
-    #rep_state{
-        source_name = Source,
-        target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
-    } = State,
-    couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
-        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, Reason}),
-    couch_replicator_manager:replication_error(Rep, Reason).
-
-terminate_cleanup(State) ->
-    update_task(State),
-    stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
-    stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
-    couch_replicator_api_wrap:db_close(State#rep_state.source),
-    couch_replicator_api_wrap:db_close(State#rep_state.target).
-
-
-format_status(_Opt, [_PDict, State]) ->
-    [{data, [{"State", state_strip_creds(State)}]}].
-
-
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
-    {stop, normal, cancel_timer(State)};
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = Seq} = State) ->
-    case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
-    {ok, NewState} ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-        {stop, normal, cancel_timer(NewState)};
-    Error ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
-        {stop, Error, State}
-    end.
-
-
-start_timer(State) ->
-    After = State#rep_state.checkpoint_interval,
-    case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
-    {ok, Ref} ->
-        Ref;
-    Error ->
-        couch_log:error("Replicator, error scheduling checkpoint:  ~p", [Error]),
-        nil
-    end.
-
-
-cancel_timer(#rep_state{timer = nil} = State) ->
-    State;
-cancel_timer(#rep_state{timer = Timer} = State) ->
-    {ok, cancel} = timer:cancel(Timer),
-    State#rep_state{timer = nil}.
-
-
-init_state(Rep) ->
-    #rep{
-        id = {BaseId, _Ext},
-        source = Src0, target = Tgt,
-        options = Options, user_ctx = UserCtx,
-        type = Type, view = View
-    } = Rep,
-    % Adjust minimum number of http source connections to 2 to avoid deadlock
-    Src = adjust_maxconn(Src0, BaseId),
-    {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
-        get_value(create_target, Options, false)),
-
-    {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
-    {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
-
-    [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
-
-    {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
-    StartSeq1 = get_value(since_seq, Options, StartSeq0),
-    StartSeq = {0, StartSeq1},
-
-    SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
-
-    #doc{body={CheckpointHistory}} = SourceLog,
-    State = #rep_state{
-        rep_details = Rep,
-        source_name = couch_replicator_api_wrap:db_uri(Source),
-        target_name = couch_replicator_api_wrap:db_uri(Target),
-        source = Source,
-        target = Target,
-        history = History,
-        checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
-        start_seq = StartSeq,
-        current_through_seq = StartSeq,
-        committed_seq = StartSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = httpd_util:rfc1123_date(),
-        src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
-        tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
-        session_id = couch_uuids:random(),
-        source_db_compaction_notifier =
-            start_db_compaction_notifier(Source, self()),
-        target_db_compaction_notifier =
-            start_db_compaction_notifier(Target, self()),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
-        source_seq = SourceSeq,
-        use_checkpoints = get_value(use_checkpoints, Options, true),
-        checkpoint_interval = get_value(checkpoint_interval, Options,
-                                        ?DEFAULT_CHECKPOINT_INTERVAL),
-        type = Type,
-        view = View
-    },
-    State#rep_state{timer = start_timer(State)}.
-
-
-find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
-    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
-    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
-
-
-fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
-    lists:reverse(Acc);
-
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
-    case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
-    {error, <<"not_found">>} when Vsn > 1 ->
-        OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
-        fold_replication_logs(Dbs, Vsn - 1,
-            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
-    {error, <<"not_found">>} ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
-    {ok, Doc} when LogId =:= NewId ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
-    {ok, Doc} ->
-        MigratedLog = #doc{id = NewId, body = Doc#doc.body},
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
-    end.
-
-
-spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
-    spawn_link(fun() ->
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
-    end).
-
-changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
-    receive
-    {get_changes, From} ->
-        case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
-        closed ->
-            From ! {closed, self()};
-        {ok, Changes} ->
-            #doc_info{high_seq = Seq} = lists:last(Changes),
-            ReportSeq = {Ts, Seq},
-            ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
-            From ! {changes, self(), Changes, ReportSeq}
-        end,
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
-    end.
-
-
-do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
-    NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
-    {ok, NewState};
-do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
-    update_task(State),
-    {ok, State};
-do_checkpoint(State) ->
-    #rep_state{
-        source_name=SourceName,
-        target_name=TargetName,
-        source = Source,
-        target = Target,
-        history = OldHistory,
-        start_seq = {_, StartSeq},
-        current_through_seq = {_Ts, NewSeq} = NewTsSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = ReplicationStartTime,
-        src_starttime = SrcInstanceStartTime,
-        tgt_starttime = TgtInstanceStartTime,
-        stats = Stats,
-        rep_details = #rep{options = Options},
-        session_id = SessionId
-    } = State,
-    case commit_to_both(Source, Target) of
-    {source_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
-    {target_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
-    {SrcInstanceStartTime, TgtInstanceStartTime} ->
-        couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
-            [SourceName, TargetName, NewSeq]),
-        StartTime = ?l2b(ReplicationStartTime),
-        EndTime = ?l2b(httpd_util:rfc1123_date()),
-        NewHistoryEntry = {[
-            {<<"session_id">>, SessionId},
-            {<<"start_time">>, StartTime},
-            {<<"end_time">>, EndTime},
-            {<<"start_last_seq">>, StartSeq},
-            {<<"end_last_seq">>, NewSeq},
-            {<<"recorded_seq">>, NewSeq},
-            {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
-            {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
-            {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
-            {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
-            {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
-        ]},
-        BaseHistory = [
-            {<<"session_id">>, SessionId},
-            {<<"source_last_seq">>, NewSeq},
-            {<<"replication_id_version">>, ?REP_ID_VERSION}
-        ] ++ case get_value(doc_ids, Options) of
-        undefined ->
-            [];
-        _DocIds ->
-            % backwards compatibility with the result of a replication by
-            % doc IDs in versions 0.11.x and 1.0.x
-            % TODO: deprecate (use same history format, simplify code)
-            [
-                {<<"start_time">>, StartTime},
-                {<<"end_time">>, EndTime},
-                {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
-                {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
-                {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
-            ]
+    ok.
+
+
+-spec maybe_start_replication(binary(), binary(), {[_]}) -> ok.
+maybe_start_replication(DbName, DocId, RepDoc) ->
+    Rep0 = couch_replicator_docs:parse_rep_doc(RepDoc),
+    #rep{id = {BaseId, _} = RepId} = Rep0,
+    Rep = Rep0#rep{db_name = DbName},
+    case rep_state(RepId) of
+    nil ->
+        couch_log:notice("Attempting to start replication `~s` (document `~s`).",
+            [pp_rep_id(RepId), DocId]),
+        case couch_replicator_scheduler:add_job(Rep) of
+        ok ->
+            ok;
+        {error, already_added} ->
+            couch_log:warning("replicator scheduler: ~p was already added", [Rep])
         end,
-        % limit history to 50 entries
-        NewRepHistory = {
-            BaseHistory ++
-            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
-        },
-
-        try
-            {SrcRevPos, SrcRevId} = update_checkpoint(
-                Source, SourceLog#doc{body = NewRepHistory}, source),
-            {TgtRevPos, TgtRevId} = update_checkpoint(
-                Target, TargetLog#doc{body = NewRepHistory}, target),
-            NewState = State#rep_state{
-                checkpoint_history = NewRepHistory,
-                committed_seq = NewTsSeq,
-                source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
-                target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
-            },
-            update_task(NewState),
-            {ok, NewState}
-        catch throw:{checkpoint_commit_failure, _} = Failure ->
-            Failure
-        end;
-    {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Target database out of sync. "
-            "Try to increase max_dbs_open at the target's server.">>};
-    {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source database out of sync. "
-            "Try to increase max_dbs_open at the source's server.">>};
-    {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source and target databases out of "
-            "sync. Try to increase max_dbs_open at both servers.">>}
-    end.
-
-
-update_checkpoint(Db, Doc, DbType) ->
-    try
-        update_checkpoint(Db, Doc)
-    catch throw:{checkpoint_commit_failure, Reason} ->
-        throw({checkpoint_commit_failure,
-            <<"Error updating the ", (to_binary(DbType))/binary,
-                " checkpoint document: ", (to_binary(Reason))/binary>>})
-    end.
-
-update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
-    try
-        case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
-        {ok, PosRevId} ->
-            PosRevId;
-        {error, Reason} ->
-            throw({checkpoint_commit_failure, Reason})
-        end
-    catch throw:conflict ->
-        case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
-        {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
-            % This means that we were able to update successfully the
-            % checkpoint doc in a previous attempt but we got a connection
-            % error (timeout for e.g.) before receiving the success response.
-            % Therefore the request was retried and we got a conflict, as the
-            % revision we sent is not the current one.
-            % We confirm this by verifying the doc body we just got is the same
-            % that we have just sent.
-            {Pos, RevId};
-        _ ->
-            throw({checkpoint_commit_failure, conflict})
-        end
-    end.
-
-
-commit_to_both(Source, Target) ->
-    % commit the src async
-    ParentPid = self(),
-    SrcCommitPid = spawn_link(
-        fun() ->
-            Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
-            ParentPid ! {self(), Result}
-        end),
-
-    % commit tgt sync
-    TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
-
-    SourceResult = receive
-    {SrcCommitPid, Result} ->
-        unlink(SrcCommitPid),
-        receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
-        Result;
-    {'EXIT', SrcCommitPid, Reason} ->
-        {error, Reason}
+        ok;
+    #rep{doc_id = DocId} ->
+        ok;
+    #rep{db_name = DbName, doc_id = OtherDocId} ->
+        couch_log:notice("The replication specified by the document `~s` already started"
+            " triggered by the document `~s`", [DocId, OtherDocId]),
+        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId))
     end,
-    case TargetResult of
-    {ok, TargetStartTime} ->
-        case SourceResult of
-        {ok, SourceStartTime} ->
-            {SourceStartTime, TargetStartTime};
-        SourceError ->
-            {source_error, SourceError}
-        end;
-    TargetError ->
-        {target_error, TargetError}
-    end.
+    ok.
 
 
-compare_replication_logs(SrcDoc, TgtDoc) ->
-    #doc{body={RepRecProps}} = SrcDoc,
-    #doc{body={RepRecPropsTgt}} = TgtDoc,
-    case get_value(<<"session_id">>, RepRecProps) ==
-            get_value(<<"session_id">>, RepRecPropsTgt) of
-    true ->
-        % if the records have the same session id,
-        % then we have a valid replication history
-        OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
-        OldHistory = get_value(<<"history">>, RepRecProps, []),
-        {OldSeqNum, OldHistory};
-    false ->
-        SourceHistory = get_value(<<"history">>, RepRecProps, []),
-        TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
-        couch_log:notice("Replication records differ. "
-                "Scanning histories to find a common ancestor.", []),
-        couch_log:debug("Record on source:~p~nRecord on target:~p~n",
-                [RepRecProps, RepRecPropsTgt]),
-        compare_rep_history(SourceHistory, TargetHistory)
-    end.
-
-compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
-    couch_log:notice("no common ancestry -- performing full replication", []),
-    {?LOWEST_SEQ, []};
-compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
-    SourceId = get_value(<<"session_id">>, S),
-    case has_session_id(SourceId, Target) of
-    true ->
-        RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
-        couch_log:notice("found a common replication record with source_seq ~p",
-            [RecordSeqNum]),
-        {RecordSeqNum, SourceRest};
-    false ->
-        TargetId = get_value(<<"session_id">>, T),
-        case has_session_id(TargetId, SourceRest) of
-        true ->
-            RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
-            couch_log:notice("found a common replication record with source_seq ~p",
-                [RecordSeqNum]),
-            {RecordSeqNum, TargetRest};
-        false ->
-            compare_rep_history(SourceRest, TargetRest)
-        end
-    end.
-
-
-has_session_id(_SessionId, []) ->
-    false;
-has_session_id(SessionId, [{Props} | Rest]) ->
-    case get_value(<<"session_id">>, Props, nil) of
-    SessionId ->
-        true;
-    _Else ->
-        has_session_id(SessionId, Rest)
-    end.
-
-
-db_monitor(#db{} = Db) ->
-    couch_db:monitor(Db);
-db_monitor(_HttpDb) ->
-    nil.
-
-get_pending_count(St) ->
-    Rep = St#rep_state.rep_details,
-    Timeout = get_value(connection_timeout, Rep#rep.options),
-    TimeoutMicro = Timeout * 1000,
-    case get(pending_count_state) of
-        {LastUpdate, PendingCount} ->
-            case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of
-                true ->
-                    NewPendingCount = get_pending_count_int(St),
-                    put(pending_count_state, {os:timestamp(), NewPendingCount}),
-                    NewPendingCount;
-                false ->
-                    PendingCount
-            end;
-        undefined ->
-            NewPendingCount = get_pending_count_int(St),
-            put(pending_count_state, {os:timestamp(), NewPendingCount}),
-            NewPendingCount
-    end.
-
-
-get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) ->
-    {_, Seq} = St#rep_state.highest_seq_done,
-    Db = Db0#httpdb{retries = 3},
-    case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
-    {ok, Pending} ->
-        Pending;
+-spec maybe_tag_rep_doc(binary(), binary(), {[_]}, binary()) -> ok.
+maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
+    case get_json_value(<<"_replication_id">>, RepProps) of
+    RepId ->
+        ok;
     _ ->
-        null
-    end;
-get_pending_count_int(#rep_state{source = Db}=St) ->
-    {_, Seq} = St#rep_state.highest_seq_done,
-    {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
-    Pending.
-
-
-update_task(State) ->
-    #rep_state{
-        current_through_seq = {_, ThroughSeq},
-        highest_seq_done = {_, HighestSeq}
-    } = State,
-    couch_task_status:update(
-        rep_stats(State) ++ [
-        {source_seq, HighestSeq},
-        {through_seq, ThroughSeq}
-    ]).
+        couch_replicator_docs:update_doc_replication_id(DbName, DocId, RepId)
+    end.
 
 
-rep_stats(State) ->
-    #rep_state{
-        committed_seq = {_, CommittedSeq},
-        stats = Stats
-    } = State,
+-spec remove_jobs(binary(), binary()) -> ok.
+remove_jobs(DbName, DocId) ->
+    LogMsg = "Stopped replication `~s` , replication document `~s`",
     [
-        {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
-        {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
-        {docs_read, couch_replicator_stats:docs_read(Stats)},
-        {docs_written, couch_replicator_stats:docs_written(Stats)},
-        {changes_pending, get_pending_count(State)},
-        {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
-        {checkpointed_source_seq, CommittedSeq}
-    ].
+        begin
+            couch_replicator_scheduler:remove_job(RepId),
+            couch_log:notice(LogMsg, [pp_rep_id(RepId), DocId])
+        end || RepId <- find_jobs_by_doc(DbName, DocId)
+    ],
+    ok.
+
+
+% TODO: make this a function in couch_replicator_scheduler API
+-spec clean_up_replications(binary()) -> ok.
+clean_up_replications(DbName) ->
+    RepIds = find_jobs_by_dbname(DbName),
+    lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
+
+
+% TODO: make this a function in couch_replicator_scheduler API
+-spec find_jobs_by_dbname(binary()) -> list(#rep{}).
+find_jobs_by_dbname(DbName) ->
+    RepSpec = #rep{db_name = DbName, _ = '_'},
+    MatchSpec = {job, '$1', RepSpec, '_', '_'},
+    [RepId || [RepId] <- ets:match(couch_replicator_scheduler, MatchSpec)].
+
+
+% TODO: make this a function in couch_replicator_scheduler API
+-spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}).
+find_jobs_by_doc(DbName, DocId) ->
+    RepSpec =  #rep{db_name = DbName, doc_id = DocId, _ = '_'},
+    MatchSpec = {job, '$1', RepSpec, '_', '_'},
+    [RepId || [RepId] <- ets:match(couch_replicator_scheduler, MatchSpec)].
+
+
+% TODO: make this a function in couch_replicator_scheduler API
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+    case (catch ets:lookup_element(couch_replicator_scheduler, RepId, 3)) of
+        {'EXIT',{badarg, _}} ->
+            nil;
+        Rep ->
+            Rep
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index d3485c0..4fca58e 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -23,3 +23,5 @@
     doc_id,
     db_name = null
 }).
+
+-type rep_id() :: {string(), string()}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/src/couch_replicator_clustering.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_clustering.erl b/src/couch_replicator_clustering.erl
index edb00d6..6c8cb1f 100644
--- a/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator_clustering.erl
@@ -42,28 +42,24 @@ start_link() ->
 
 
 % owner/2 function computes ownership for a {DbName, DocId} tuple
-% Returns {ok no_owner} in case no DocId is null. That case
-% would happen in the old replicator_manager if replication was
-% posted from _replicate endpoint and not via a document in 
-% *_replicator db.
-%
-% {error, unstable} value is returned if cluster membership has
-% been changing recently. Recency is a configuration parameter.
+% Returns `no_owner` in case no DocId is null, `unstable` if cluster
+% is considered to be unstable i.e. it has changed recently, or returns
+% node() which is considered to be the owner.
 %
 -spec owner(Dbname :: binary(), DocId :: binary() | null) ->
-    {ok, node()} | {ok, no_owner} | {error, unstable}.
+    node() | no_owner | unstable.
 owner(_DbName, null) ->
-    {ok, no_owner};
+    no_owner;
 owner(<<"shards/", _/binary>> = DbName, DocId) ->
     IsStable = gen_server:call(?MODULE, is_stable, infinity),
     case IsStable of
         false ->
-            {error, unstable};
+            unstable;
         true ->
-            {ok, owner_int(DbName, DocId)}
+            owner_int(DbName, DocId)
     end;
 owner(_DbName, _DocId) ->
-    {ok, node()}.
+    node().
 
 
 
@@ -149,7 +145,7 @@ new_timer(Interval) ->
 
 trigger_rescan() ->
     couch_log:notice("Triggering replicator rescan from ~p", [?MODULE]),
-    couch_replicator_manager_sup:restart_mdb_listener(),
+    couch_replicator_sup:restart_mdb_listener(),
     ok.
 
 is_stable(State) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index 8f68a28..6c71f66 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -18,7 +18,6 @@
 -export([
     update_doc_triggered/3,
     update_doc_completed/3,
-    update_doc_error/4,
     update_doc_replication_id/3,
     update_doc_process_error/3
 ]).
@@ -50,7 +49,7 @@
 
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 
-
+-spec update_doc_triggered(binary(), binary(), rep_id()) -> any().
 update_doc_triggered(DbName, DocId, {BaseId, _}) ->
     update_rep_doc(DbName, DocId, [
             {<<"_replication_state">>, <<"triggered">>},
@@ -59,6 +58,7 @@ update_doc_triggered(DbName, DocId, {BaseId, _}) ->
             {<<"_replication_stats">>, undefined}]).
 
 
+-spec update_doc_completed(binary(), binary(), [_]) -> any().
 update_doc_completed(DbName, DocId, Stats) ->
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"completed">>},
@@ -66,14 +66,7 @@ update_doc_completed(DbName, DocId, Stats) ->
         {<<"_replication_stats">>, {Stats}}]).
 
 
-update_doc_error(DbName, DocId, {BaseId, _}, Error) ->
-    ErrorBinary = couch_replicator_utils:rep_error_to_binary(Error),
-    update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"error">>},
-            {<<"_replication_state_reason">>, ErrorBinary},
-            {<<"_replication_id">>, ?l2b(BaseId)}]).
-
-
+-spec update_doc_process_error(binary(), binary(), any()) -> any().
 update_doc_process_error(DbName, DocId, Error) ->
     Reason = case Error of
         {bad_rep_doc, Reas} ->
@@ -87,10 +80,12 @@ update_doc_process_error(DbName, DocId, Error) ->
         {<<"_replication_state_reason">>, Reason}]).
 
 
+-spec update_doc_replication_id(binary(), binary(), binary()) -> any().
 update_doc_replication_id(DbName, DocId, RepId) ->
     update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}]).
 
 
+-spec ensure_rep_db_exists() -> {ok, #db{}}.
 ensure_rep_db_exists() ->
     Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db, nologifmissing]) of
         {ok, Db0} ->
@@ -99,10 +94,11 @@ ensure_rep_db_exists() ->
             {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]),
             Db0
     end,
-    ensure_rep_ddoc_exists(?REP_DB_NAME),
+    ok = ensure_rep_ddoc_exists(?REP_DB_NAME),
     {ok, Db}.
 
 
+-spec ensure_rep_ddoc_exists(binary()) -> ok.
 ensure_rep_ddoc_exists(RepDb) ->
     case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
 	true ->
@@ -111,7 +107,7 @@ ensure_rep_ddoc_exists(RepDb) ->
 	    ok
     end.
 
-
+-spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
 ensure_rep_ddoc_exists(RepDb, DDocId) ->
     case open_rep_doc(RepDb, DDocId) of
         {ok, _Doc} ->
@@ -123,7 +119,8 @@ ensure_rep_ddoc_exists(RepDb, DDocId) ->
                 {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
             ]}),
             try
-                {ok, _} = save_rep_doc(RepDb, DDoc)
+                {ok, _} = save_rep_doc(RepDb, DDoc),
+                ok
             catch
                 throw:conflict ->
                     % NFC what to do about this other than
@@ -133,6 +130,7 @@ ensure_rep_ddoc_exists(RepDb, DDocId) ->
     end.
 
 
+-spec parse_rep_doc({[_]}) -> #rep{}.
 parse_rep_doc(RepDoc) ->
     {ok, Rep} = try
         parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
@@ -144,6 +142,8 @@ parse_rep_doc(RepDoc) ->
     end,
     Rep.
 
+
+-spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
 parse_rep_doc({Props}, UserCtx) ->
     ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
     Options = make_options(Props),
@@ -250,6 +250,7 @@ save_rep_doc(DbName, Doc) ->
 
 % RFC3339 timestamps.
 % Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
+-spec timestamp() -> binary().
 timestamp() ->
     {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(os:timestamp()),
     UTime = erlang:universaltime(),
@@ -262,14 +263,14 @@ timestamp() ->
             [Year, Month, Day, Hour, Min, Sec,
                 zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
 
-
+-spec zone(integer(), integer()) -> iolist().
 zone(Hr, Min) when Hr >= 0, Min >= 0 ->
     io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
 zone(Hr, Min) ->
     io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
 
 
-
+-spec rep_user_ctx({[_]}) -> #user_ctx{}.
 rep_user_ctx({RepDoc}) ->
     case get_json_value(<<"user_ctx">>, RepDoc) of
     undefined ->
@@ -281,7 +282,7 @@ rep_user_ctx({RepDoc}) ->
         }
     end.
 
-
+-spec parse_rep_db({[_]} | binary(), [_], [_]) -> #httpd{} | binary().
 parse_rep_db({Props}, ProxyParams, Options) ->
     Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
     {AuthProps} = get_value(<<"auth">>, Props, {[]}),
@@ -324,7 +325,7 @@ parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
 parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
     DbName.
 
-
+-spec maybe_add_trailing_slash(binary() | list()) -> list().
 maybe_add_trailing_slash(Url) when is_binary(Url) ->
     maybe_add_trailing_slash(?b2l(Url));
 maybe_add_trailing_slash(Url) ->
@@ -335,7 +336,7 @@ maybe_add_trailing_slash(Url) ->
         Url ++ "/"
     end.
 
-
+-spec make_options([_]) -> [_].
 make_options(Props) ->
     Options0 = lists:ukeysort(1, convert_options(Props)),
     Options = check_options(Options0),
@@ -361,6 +362,7 @@ make_options(Props) ->
     ])).
 
 
+-spec convert_options([_]) -> [_].
 convert_options([])->
     [];
 convert_options([{<<"cancel">>, V} | R]) ->
@@ -412,6 +414,7 @@ convert_options([{<<"checkpoint_interval">>, V} | R]) ->
 convert_options([_ | R]) -> % skip unknown option
     convert_options(R).
 
+-spec check_options([_]) -> [_].
 check_options(Options) ->
     DocIds = lists:keyfind(doc_ids, 1, Options),
     Filter = lists:keyfind(filter, 1, Options),
@@ -425,7 +428,7 @@ check_options(Options) ->
             throw({bad_request, "`doc_ids`, `filter`, `selector` are mutually exclusive options"})
     end.
 
-
+-spec parse_proxy_params(binary() | [_]) -> [_].
 parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
     parse_proxy_params(?b2l(ProxyUrl));
 parse_proxy_params([]) ->
@@ -446,7 +449,7 @@ parse_proxy_params(ProxyUrl) ->
             [{proxy_user, User}, {proxy_password, Passwd}]
         end.
 
-
+-spec ssl_params([_]) -> [_].
 ssl_params(Url) ->
     case ibrowse_lib:parse_url(Url) of
     #url{protocol = https} ->
@@ -474,22 +477,15 @@ ssl_params(Url) ->
         []
     end.
 
-ssl_verify_options(Value) ->
-    ssl_verify_options(Value, erlang:system_info(otp_release)).
-
-ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
+-spec ssl_verify_options(true | false) -> [_].
+ssl_verify_options(true) ->
     CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
     [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
-    [{verify, verify_none}];
-ssl_verify_options(true, _OTPVersion) ->
-    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, 2}, {cacertfile, CAFile}];
-ssl_verify_options(false, _OTPVersion) ->
-    [{verify, 0}].
-
+ssl_verify_options(false) ->
+    [{verify, verify_none}].
 
 
+-spec before_doc_update(#doc{}, #db{}) -> #doc{}.
 before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
     Doc;
 before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
@@ -517,6 +513,7 @@ before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
     end.
 
 
+-spec after_doc_read(#doc{}, #db{}) -> #doc{}.
 after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
     Doc;
 after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
@@ -543,7 +540,9 @@ Body)),
     end.
 
 
-
+-spec strip_credentials(undefined) -> undefined;
+    (binary()) -> binary();
+    ({[_]}) -> {[_]}.
 strip_credentials(undefined) ->
     undefined;
 strip_credentials(Url) when is_binary(Url) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 08d659e..6a79ce6 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -11,315 +11,16 @@
 % the License.
 
 -module(couch_replicator_manager).
--behaviour(gen_server).
--vsn(2).
--behaviour(couch_multidb_changes).
 
-% public API
--export([replication_started/1, replication_completed/2, replication_error/2]).
--export([continue/1, replication_usurped/2]).
+% TODO: This is a temporary proxy module to external calls (outside replicator) to other 
+% replicator modules. This is done to avoid juggling multiple repos during development.
 
 % NV: TODO: These functions were moved to couch_replicator_docs
 % but it is still called from fabric_doc_update. Keep it here for now
 % later, update fabric to call couch_replicator_docs instead
 -export([before_doc_update/2, after_doc_read/2]).
 
-% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
--export([code_change/3, terminate/2]).
 
-% multidb changes callback
--export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
-
-%% exported but private
--export([start_replication/1]).
-
-% imports
--import(couch_replicator_utils, [
-    get_json_value/2,
-    get_json_value/3
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/include/mem3.hrl").
--include("couch_replicator.hrl").
-
-
--define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
--define(REP_TO_STATE, couch_rep_id_to_rep_state).
-
-
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-
-%%%%%% Multidb changes callbacks
-
-db_created(DbName, Server) ->
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
-
-db_deleted(DbName, Server) ->
-    clean_up_replications(DbName),
-    Server.
-
-db_found(DbName, Server) ->
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
-
-db_change(DbName, Change, Server) ->
-    ok = gen_server:call(Server, {rep_db_update, DbName, Change}, infinity),
-    Server.
-
-
--spec replication_started(#rep{}) -> ok.
-replication_started(#rep{id = RepId}) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep{db_name = DbName, doc_id = DocId} ->
-        couch_replicator_docs:update_doc_triggered(DbName, DocId, RepId),
-        %NV: TODO: This used to be
-        % ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
-        % now just write triggered for compatibility, in the future do something
-        % in the scheduler to handle repeated failed starts
-        couch_log:notice("Document `~s` triggered replication `~s`",
-            [DocId, pp_rep_id(RepId)]),
-        ok
-    end.
-
--spec replication_completed(#rep{}, list()) -> ok.
-replication_completed(#rep{id = RepId}, Stats) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep{db_name = DbName, doc_id = DocId} ->
-        couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
-        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
-        couch_log:notice("Replication `~s` finished (triggered by document `~s`)",
-            [pp_rep_id(RepId), DocId]),
-        ok
-    end.
-
-
--spec replication_usurped(#rep{}, node()) -> ok.
-replication_usurped(#rep{id = RepId}, By) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep{doc_id = DocId} ->
-        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
-        couch_log:notice("Replication `~s` usurped by ~s (triggered by document `~s`)",
-            [pp_rep_id(RepId), By, DocId]),
-        ok
-    end.
-
--spec replication_error(#rep{}, any()) -> ok.
-replication_error(#rep{id = RepId}, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep{db_name = DbName, doc_id = DocId} ->
-        % NV: TODO: later, perhaps don't update doc on each error
-        couch_replicator_docs:update_doc_error(DbName, DocId, RepId, Error)
-    end.
-
-
-% NV: TODO: Here need to use the new cluster ownership bit.
--spec continue(#rep{}) -> {true, no_owner | unstable | node()} |
-    {false, node()}.
-continue(#rep{doc_id = null}) ->
-    {true, no_owner};
-continue(#rep{id = RepId}) ->
-    case rep_state(RepId) of
-    nil ->
-        {false, nonode};
-    #rep{db_name = DbName, doc_id = DocId} ->
-	case couch_replicator_clustering:owner(DbName, DocId) of
-        {ok, no_owner} ->
-	    {true, no_owner};
-	{ok, Owner} ->
-	    {node() == Owner, Owner};
-	{error, unstable} ->
-	    {true, unstable}
-        end
-    end.
-
-
-init(_) ->
-    ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, public]),
-    ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, public]),
-    couch_replicator_docs:ensure_rep_db_exists(),
-    {ok, nil}.
-
-
-handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
-    try
-        process_update(DbName, Change)
-    catch
-    _Tag:Error ->
-        {RepProps} = get_json_value(doc, ChangeProps),
-        DocId = get_json_value(<<"_id">>, RepProps),
-        couch_replicator_docs:update_doc_process_error(DbName, DocId, Error)
-    end,
-    {reply, ok, State};
-
-
-handle_call({rep_complete, RepId}, _From, State) ->
-    true = ets:delete(?REP_TO_STATE, RepId),
-    {reply, ok, State}.
-
-
-handle_cast(Msg, State) ->
-    couch_log:error("Replication manager received unexpected cast ~p", [Msg]),
-    {stop, {error, {unexpected_cast, Msg}}, State}.
-
-
-handle_info(Msg, State) ->
-    couch_log:error("Replication manager received unexpected message ~p", [Msg]),
-    {stop, {unexpected_msg, Msg}, State}.
-
-
-
-terminate(_Reason, _State) ->
-    ok.
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
--spec process_update(binary(), tuple()) -> ok.
-process_update(DbName, {Change}) ->
-    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
-    DocId = get_json_value(<<"_id">>, RepProps),
-    OwnerRes = couch_replicator_clustering:owner(DbName, DocId),
-    case {OwnerRes, get_json_value(deleted, Change, false)} of
-    {_, true} ->
-        rep_doc_deleted(DbName, DocId);
-    {{ok, Owner}, false} when Owner /= node() ->
-        couch_log:notice("Not starting '~s' as owner is ~s.", [DocId, Owner]);
-    {{error, unstable}, false} ->
-	couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
-    {{ok,_Owner}, false} ->
-        couch_log:notice("Maybe starting '~s' as I'm the owner", [DocId]),
-        case get_json_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            maybe_start_replication(DbName, DocId, JsonRepDoc);
-        <<"triggered">> ->
-            maybe_start_replication(DbName, DocId, JsonRepDoc);
-        <<"completed">> ->
-            replication_complete(DbName, DocId);
-        <<"error">> ->
-            case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-            [] ->
-                maybe_start_replication(DbName, DocId, JsonRepDoc);
-            _ ->
-                ok
-            end
-        end
-    end,
-    ok.
-
--spec maybe_start_replication(binary(), binary(), tuple()) -> ok.
-maybe_start_replication(DbName, DocId, RepDoc) ->
-    Rep0 = couch_replicator_docs:parse_rep_doc(RepDoc),
-    #rep{id = {BaseId, _} = RepId} = Rep0,
-    Rep = Rep0#rep{db_name = DbName},
-    case rep_state(RepId) of
-    nil ->
-        true = ets:insert(?REP_TO_STATE, {RepId, Rep}),
-        true = ets:insert(?DOC_TO_REP, {{DbName, DocId}, RepId}),
-        couch_log:notice("Attempting to start replication `~s` (document `~s`).",
-            [pp_rep_id(RepId), DocId]),
-        ok = start_replication(Rep);
-    #rep{doc_id = DocId} ->
-        ok;
-    #rep{db_name = DbName, doc_id = OtherDocId} ->
-        couch_log:notice("The replication specified by the document `~s` already started"
-            " triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId))
-    end,
-    ok.
-
--spec maybe_tag_rep_doc(binary(), binary(), tuple(), binary()) -> ok.
-maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
-    case get_json_value(<<"_replication_id">>, RepProps) of
-    RepId ->
-        ok;
-    _ ->
-        couch_replicator_docs:update_doc_replication_id(DbName, DocId, RepId)
-    end.
-
--spec start_replication(#rep{}) -> ok.
-start_replication(Rep) ->
-    case couch_replicator_scheduler:add_job(Rep) of
-    ok ->
-        ok;
-    {error, already_added} ->
-        couch_log:error("replicator scheduler add_job ~p was already added", [Rep])
-    end.
-
--spec replication_complete(binary(), binary()) -> ok.
-replication_complete(DbName, DocId) ->
-    case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-    [{{DbName, DocId}, _RepId}] ->
-        true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
-        ok;
-    _ ->
-        ok
-    end.
-
--spec rep_doc_deleted(binary(), binary()) -> ok.
-rep_doc_deleted(DbName, DocId) ->
-    case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-    [{{DbName, DocId}, RepId}] ->
-        couch_replicator_scheduler:remove_job(RepId),
-        true = ets:delete(?REP_TO_STATE, RepId),
-        true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
-        couch_log:notice("Stopped replication `~s` because replication document `~s`"
-            " was deleted", [pp_rep_id(RepId), DocId]),
-        ok;
-    [] ->
-        ok
-    end.
-
-
--spec clean_up_replications(binary()) -> ok.
-clean_up_replications(DbName) ->
-    ets:foldl(
-        fun({{Name, DocId}, RepId}, _) when Name =:= DbName ->
-            couch_replicator_scheduler:remove_job(RepId),
-            ets:delete(?DOC_TO_REP,{Name, DocId}),
-            ets:delete(?REP_TO_STATE, RepId);
-           ({_,_}, _) ->
-            ok
-        end,
-        ok, ?DOC_TO_REP),
-    ok.
-
-
-% pretty-print replication id
--spec pp_rep_id(#rep{}) -> string().
-pp_rep_id(#rep{id = RepId}) ->
-    pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
-    Base ++ Extension.
-
-
-rep_state(RepId) ->
-    case ets:lookup(?REP_TO_STATE, RepId) of
-    [{RepId, RepState}] ->
-        RepState;
-    [] ->
-        nil
-    end.
-
-
-% NV: TODO: This function was moved to couch_replicator_docs
-% but it is still called from fabric_doc_update. Keep it here for now
-% later, update fabric to call couch_replicator_docs instead
 before_doc_update(Doc, Db) ->
     couch_replicator_docs:before_doc_update(Doc, Db).
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/src/couch_replicator_manager_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager_sup.erl b/src/couch_replicator_manager_sup.erl
deleted file mode 100644
index 6564962..0000000
--- a/src/couch_replicator_manager_sup.erl
+++ /dev/null
@@ -1,56 +0,0 @@
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_manager_sup).
--behaviour(supervisor).
--export([start_link/0, init/1]).
--export([restart_mdb_listener/0]).
-
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-init(_Args) ->
-    MdbChangesArgs = [
-        <<"_replicator">>,        % DbSuffix
-	couch_replicator_manager, % Module
-	couch_replicator_manager, % Context(Server)
-        [skip_ddocs]              % Options
-    ],
-    Children = [
-        {couch_replicator_clustering,
-            {couch_replicator_clustering, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_clustering]},
-        {couch_replicator_manager,
-            {couch_replicator_manager, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_manager]},
-        {couch_multidb_changes,
-            {couch_multidb_changes, start_link, MdbChangesArgs},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_multidb_changes]}
-    ],
-    {ok, {{rest_for_one,10,1}, Children}}.
-
-
-restart_mdb_listener() ->
-    ok = supervisor:terminate_child(?MODULE, couch_multidb_changes),
-    {ok, ChildPid} = supervisor:restart_child(?MODULE, couch_multidb_changes),
-    ChildPid.
-

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/src/couch_replicator_scheduler_job.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler_job.erl b/src/couch_replicator_scheduler_job.erl
index ad600d1..dd9d8f3 100644
--- a/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator_scheduler_job.erl
@@ -37,9 +37,11 @@
 
 -import(couch_replicator_utils, [
     start_db_compaction_notifier/2,
-    stop_db_compaction_notifier/1
+    stop_db_compaction_notifier/1,
+    pp_rep_id/1
 ]).
 
+
 %% definitions
 -define(LOWEST_SEQ, 0).
 -define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
@@ -191,7 +193,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
 
     couch_log:debug("Worker pids are: ~p", [Workers]),
 
-    couch_replicator_manager:replication_started(Rep),
+    doc_update_triggered(Rep),
 
     {ok, State#rep_state{
             changes_queue = ChangesQueue,
@@ -332,9 +334,9 @@ handle_cast({db_compacted, DbName},
     {noreply, State#rep_state{target = NewTarget}};
 
 handle_cast(checkpoint, State) ->
-    #rep_state{rep_details = #rep{} = Rep} = State,
-    case couch_replicator_manager:continue(Rep) of
-    {true, _} ->
+    #rep_state{rep_details = #rep{db_name = DbName, doc_id = DocId} = Rep} = State,
+    case couch_replicator_clustering:owner(DbName, DocId) of
+    Owner when Owner =:= node(); Owner =:= no_owner; Owner =:= unstable ->
         case do_checkpoint(State) of
         {ok, NewState} ->
             couch_stats:increment_counter([couch_replicator, checkpoints, success]),
@@ -343,8 +345,9 @@ handle_cast(checkpoint, State) ->
             couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
             {stop, Error, State}
         end;
-    {false, Owner} ->
-        couch_replicator_manager:replication_usurped(Rep, Owner),
+    Other when Other =/= node() ->
+        couch_log:notice("Replication `~s` usurped by ~s (triggered by `~s`)",
+            [Rep#rep.id, Other, DocId]),
         {stop, shutdown, State}
     end;
 
@@ -400,7 +403,7 @@ terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
     checkpoint_history = CheckpointHistory} = State) ->
     terminate_cleanup(State),
     couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
-    couch_replicator_manager:replication_completed(Rep, rep_stats(State));
+    doc_update_completed(Rep, rep_stats(State));
 
 terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
     % cancelled replication throught ?MODULE:cancel_replication/1
@@ -422,7 +425,7 @@ terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
         NotifyError = Error
     end,
     couch_replicator_notifier:notify({error, RepId, NotifyError}),
-    couch_replicator_manager:replication_error(InitArgs, NotifyError);
+    report_job_error(InitArgs, NotifyError);
 terminate(Reason, State) ->
     #rep_state{
         source_name = Source,
@@ -433,7 +436,7 @@ terminate(Reason, State) ->
         [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
     terminate_cleanup(State),
     couch_replicator_notifier:notify({error, RepId, Reason}),
-    couch_replicator_manager:replication_error(Rep, Reason).
+    report_job_error(Rep, Reason).
 
 terminate_cleanup(State) ->
     update_task(State),
@@ -447,6 +450,30 @@ format_status(_Opt, [_PDict, State]) ->
     [{data, [{"State", state_strip_creds(State)}]}].
 
 
+report_job_error(_Rep, _Error) ->
+    % TODO: handle errors back to job scheduler to let it
+    % decide if needed to backoff and retry and what the back-off schedule
+    % should be
+    ok.
+
+-spec doc_update_triggered(#rep{}) -> ok.
+doc_update_triggered(#rep{db_name = null}) ->
+    ok;
+doc_update_triggered(#rep{id = RepId, db_name = DbName, doc_id = DocId}) ->
+    couch_replicator_docs:update_doc_triggered(DbName, DocId, RepId),
+    couch_log:notice("Document `~s` triggered replication `~s`",
+        [DocId, pp_rep_id(RepId)]),
+    ok.
+
+-spec doc_update_completed(#rep{}, list()) -> ok.
+doc_update_completed(#rep{db_name = null}, _Stats) ->
+    ok;
+doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName}, Stats) ->
+    couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
+    couch_log:notice("Replication `~s` completed (triggered by `~s`)",
+        [pp_rep_id(RepId), DocId]),
+    ok.
+
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
     highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
     {stop, normal, cancel_timer(State)};

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/src/couch_replicator_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_sup.erl b/src/couch_replicator_sup.erl
index e67f20b..8ed65a8 100644
--- a/src/couch_replicator_sup.erl
+++ b/src/couch_replicator_sup.erl
@@ -13,43 +13,63 @@
 
 -module(couch_replicator_sup).
 -behaviour(supervisor).
--export([start_link/0, init/1]).
+-export([start_link/0, init/1, restart_mdb_listener/0]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init(_Args) ->
+    MdbChangesArgs = [
+        <<"_replicator">>,  % DbSuffix
+        couch_replicator,   % Module
+        nil,                % Callback context
+        [skip_ddocs]        % Options
+    ],
     Children = [
-        {couch_replicator_scheduler,
-            {couch_replicator_scheduler, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_scheduler]},
         {couch_replicator_scheduler_sup,
             {couch_replicator_scheduler_sup, start_link, []},
             permanent,
             infinity,
             supervisor,
             [couch_replicator_scheduler_sup]},
+        {couch_replicator_scheduler,
+            {couch_replicator_scheduler, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_scheduler]},
         {couch_replication_event,
             {gen_event, start_link, [{local, couch_replication}]},
             permanent,
             brutal_kill,
             worker,
             dynamic},
-        {couch_replicator_manager_sup,
-	    {couch_replicator_manager_sup, start_link, []},
-	    permanent,
-            infinity,
-	    supervisor,
-            [couch_replicator_manager_sup]},
-        {couch_replicator_job_sup,
-            {couch_replicator_job_sup, start_link, []},
+        {couch_replicator_clustering,
+            {couch_replicator_clustering, start_link, []},
             permanent,
-            infinity,
-            supervisor,
-            [couch_replicator_job_sup]}
+            brutal_kill,
+            worker,
+            [couch_replicator_clustering]},
+        {couch_replicator,
+            % This is simple function call which does not create a process
+            % but returns `ignore`. It is used to make sure each node
+            % a local `_replicator` database.
+            {couch_replicator, ensure_rep_db_exists, []},
+            transient,
+            brutal_kill,
+            worker,
+            [couch_replicator]},
+        {couch_multidb_changes,
+            {couch_multidb_changes, start_link, MdbChangesArgs},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_multidb_changes]}
     ],
-    {ok, {{one_for_one,10,1}, Children}}.
+    {ok, {{rest_for_one,10,1}, Children}}.
+
 
+restart_mdb_listener() ->
+    ok = supervisor:terminate_child(?MODULE, couch_multidb_changes),
+    {ok, ChildPid} = supervisor:restart_child(?MODULE, couch_multidb_changes),
+    ChildPid.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index 8bf350d..43ce5fe 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -19,10 +19,12 @@
 -export([sum_stats/2, is_deleted/1]).
 -export([rep_error_to_binary/1]).
 -export([get_json_value/2, get_json_value/3]).
+-export([pp_rep_id/1]).
 
 -export([handle_db_event/3]).
 
 -include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
 
 -import(couch_util, [
     get_value/2,
@@ -102,6 +104,14 @@ get_json_value(Key, Props, Default) when is_binary(Key) ->
     end.
 
 
+% pretty-print replication id
+-spec pp_rep_id(#rep{} | rep_id()) -> string().
+pp_rep_id(#rep{id = RepId}) ->
+    pp_rep_id(RepId);
+pp_rep_id({Base, Extension}) ->
+    Base ++ Extension.
+
+
 % NV: TODO: this function is not used outside api wrap module
 % consider moving it there during final cleanup
 is_deleted(Change) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/test/couch_replicator_compact_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_replicator_compact_tests.erl b/test/couch_replicator_compact_tests.erl
index 7a5a25a..0cf1196 100644
--- a/test/couch_replicator_compact_tests.erl
+++ b/test/couch_replicator_compact_tests.erl
@@ -16,6 +16,10 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_replicator/src/couch_replicator.hrl").
 
+-import(couch_replicator_test_helper, [
+    db_url/1
+]).
+
 -define(ATTFILE, filename:join([?FIXTURESDIR, "logo.png"])).
 -define(DELAY, 100).
 -define(TIMEOUT, 30000).
@@ -92,7 +96,7 @@ should_run_replication(RepPid, RepId, Source, Target) ->
 should_ensure_replication_still_running(RepPid, RepId, Source, Target) ->
     ?_test(check_active_tasks(RepPid, RepId, Source, Target)).
 
-check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
+check_active_tasks(RepPid, {BaseId, Ext} = RepId, Src, Tgt) ->
     Source = case Src of
         {remote, NameSrc} ->
             <<(db_url(NameSrc))/binary, $/>>;
@@ -107,7 +111,7 @@ check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
     end,
     FullRepId = ?l2b(BaseId ++ Ext),
     Pid = ?l2b(pid_to_list(RepPid)),
-    ok = wait_for_replicator(RepPid),
+    ok = wait_for_replicator(RepId),
     [RepTask] = couch_task_status:all(),
     ?assertEqual(Pid, couch_util:get_value(pid, RepTask)),
     ?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)),
@@ -124,16 +128,25 @@ check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
     Pending = couch_util:get_value(changes_pending, RepTask),
     ?assert(is_integer(Pending)).
 
-wait_for_replicator(Pid) ->
+
+get_pid(RepId) ->
+    Pid = global:whereis_name({couch_replicator_scheduler_job,RepId}),
+    ?assert(is_pid(Pid)),
+    Pid.
+
+rep_details(RepId) ->
+    gen_server:call(get_pid(RepId), get_details).
+
+wait_for_replicator(RepId) ->
     %% since replicator started asynchronously
     %% we need to wait when it would be in couch_task_status
     %% we query replicator:details to ensure that do_init happen
-    ?assertMatch({ok, _}, couch_replicator:details(Pid)),
+    ?assertMatch({ok, _}, rep_details(RepId)),
     ok.
 
 should_cancel_replication(RepId, RepPid) ->
     ?_assertNot(begin
-        {ok, _} = couch_replicator:cancel_replication(RepId),
+        ok = couch_replicator_scheduler:remove_job(RepId),
         is_process_alive(RepPid)
     end).
 
@@ -295,13 +308,6 @@ wait_for_compaction(Type, Db) ->
                                          " database failed with: ", Reason])}]})
     end.
 
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
 replicate({remote, Db}, Target) ->
     replicate(db_url(Db), Target);
 
@@ -315,7 +321,8 @@ replicate(Source, Target) ->
         {<<"continuous">>, true}
     ]},
     {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    ok = couch_replicator_scheduler:add_job(Rep),
+    Pid = get_pid(Rep#rep.id),
     {ok, Pid, Rep#rep.id}.
 
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/test/couch_replicator_many_leaves_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_replicator_many_leaves_tests.erl b/test/couch_replicator_many_leaves_tests.erl
index bde0e2c..a6999bd 100644
--- a/test/couch_replicator_many_leaves_tests.erl
+++ b/test/couch_replicator_many_leaves_tests.erl
@@ -15,6 +15,11 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+-import(couch_replicator_test_helper, [
+    db_url/1,
+    replicate/2
+]).
+
 -define(DOCS_CONFLICTS, [
     {<<"doc1">>, 10},
     {<<"doc2">>, 100},
@@ -199,22 +204,3 @@ add_attachments(SourceDb, NumAtts,  [{DocId, NumConflicts} | Rest]) ->
     ?assertEqual(length(NewDocs), length(NewRevs)),
     add_attachments(SourceDb, NumAtts, Rest).
 
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
-replicate(Source, Target) ->
-    RepObject = {[
-        {<<"source">>, Source},
-        {<<"target">>, Target}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-        {'DOWN', MonRef, process, Pid, _} ->
-            ok
-    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/72e1f1ed/test/couch_replicator_modules_load_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_replicator_modules_load_tests.erl b/test/couch_replicator_modules_load_tests.erl
index 96a9346..ff0401f 100644
--- a/test/couch_replicator_modules_load_tests.erl
+++ b/test/couch_replicator_modules_load_tests.erl
@@ -28,11 +28,16 @@ should_load_modules() ->
         couch_replicator_httpc,
         couch_replicator_httpd,
         couch_replicator_manager,
+        couch_replicator_scheduler,
+        couch_replicator_scheduler_job,
+        couch_replicator_docs,
+        couch_replicator_clustering,
+        couch_replicator_changes_reader,
+        couch_replicator_ids,
         couch_replicator_notifier,
         couch_replicator,
         couch_replicator_worker,
-        couch_replicator_utils,
-        couch_replicator_job_sup
+        couch_replicator_utils
     ],
     [should_load_module(Mod) || Mod <- Modules].
 


Mime
View raw message