couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [05/30] couch-replicator commit: updated refs/heads/63012-scheduler-dont-start-immediately to 6a913dc
Date Fri, 03 Jun 2016 15:17:29 GMT
Refactor replication manager, utils before switching to scheduler

Cleaned up code and prepared to switch scheduler. This is a refactor as
replications still work, eunit tests still pass.

Changes:

 * Code from replication manager which deals with doc updating moved to
   `couch_replicator_docs.erl`

 * Code dealing with calculating replication ids moved to `couch_replicator_ids.erl`

 * Few functions left in `_utils.erl`, but kept external exported functions
   as proxied to avoid modifying various unlreated modules which call them.

 * Added TODO: comments to functions which will need attention later:
    - Still using old code
    - Unused functions
    - `utils` function but used from only one place, etc.

 * Removed code which handles detection of failed starts. That will be
   handled in scheduler.

EUnit test still pass:
`make eunit skip_deps+=couch_epi,couch_log apps=couch_replicator`
```
=======================================================
  All 166 tests passed.
==> rel (eunit)
==> couchdb (eunit)
```

Replication utility from dyno still show docs trigger and replicate:

```
rep.replicate_1_to_n_then_check_replication(1)
 > created  1 dbs with prefix rdyno_src_
 > created  1 dbs with prefix rdyno_tgt_
updating documents
 > _replicator rdyno_0001_0001 : 7-1aac3a47cacc5dd2f5cf27cc3d603e49
waiting for replication documents to trigger
 > retrying function wait_to_trigger
 > function wait_to_trigger succeded after 10.029 +/- 10.0  sec.
all replication documents triggered
>>> update cycle 0  <<<
 > waiting for target  1
 > function wait_till_dbs_equal succeded after 0.024 +/- 1.0  sec.
 > waiting to propagate changes from  1 to 1  : 0.042 sec.
 ```


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/5218edf8
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/5218edf8
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/5218edf8

Branch: refs/heads/63012-scheduler-dont-start-immediately
Commit: 5218edf82644854843ba7be3c173a2845c8f11ad
Parents: fbadd8e
Author: Nick Vatamaniuc <vatamane@gmail.com>
Authored: Fri May 13 18:12:52 2016 -0400
Committer: Nick Vatamaniuc <vatamane@gmail.com>
Committed: Fri May 13 18:43:00 2016 -0400

----------------------------------------------------------------------
 src/couch_multidb_changes.erl          |  18 +-
 src/couch_replicator_docs.erl          | 582 ++++++++++++++++++++++
 src/couch_replicator_ids.erl           | 193 ++++++++
 src/couch_replicator_manager.erl       | 740 +++++-----------------------
 src/couch_replicator_scheduler_job.erl |   2 +-
 src/couch_replicator_utils.erl         | 513 ++-----------------
 6 files changed, 940 insertions(+), 1108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/5218edf8/src/couch_multidb_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_multidb_changes.erl b/src/couch_multidb_changes.erl
index b831202..16e12d4 100644
--- a/src/couch_multidb_changes.erl
+++ b/src/couch_multidb_changes.erl
@@ -93,7 +93,7 @@ handle_call({change, DbName, Change}, _From,
     case {SkipDDocs, is_design_doc(Change)} of
         {true, true} ->
             {reply, ok, State};
-        {false, _} ->
+        {_, _} ->
             {reply, ok, State#state{ctx=Mod:db_change(DbName, Change, Ctx)}}
     end;
 
@@ -220,7 +220,7 @@ start_event_listener(DbSuffix) ->
     Pid.
 
 handle_db_event(DbName, created, {Server, DbSuffix}) ->
-    case suffix_match(DbName, DbSuffix) of
+    case DbSuffix =:= couch_db:dbname_suffix(DbName) of
 	true ->
 	    ok = gen_server:call(Server, {created, DbName});
 	_ ->
@@ -229,7 +229,7 @@ handle_db_event(DbName, created, {Server, DbSuffix}) ->
     {ok, {Server, DbSuffix}};
 
 handle_db_event(DbName, deleted, {Server, DbSuffix}) ->
-    case suffix_match(DbName, DbSuffix) of
+    case DbSuffix =:= couch_db:dbname_suffix(DbName) of
         true ->
             ok = gen_server:call(Server, {deleted, DbName});
         _ ->
@@ -238,7 +238,7 @@ handle_db_event(DbName, deleted, {Server, DbSuffix}) ->
     {ok, {Server, DbSuffix}};
 
 handle_db_event(DbName, updated, {Server, DbSuffix}) ->
-    case suffix_match(DbName, DbSuffix) of
+    case DbSuffix =:= couch_db:dbname_suffix(DbName) of
         true ->
 	    ok = gen_server:cast(Server, {resume_scan, DbName});
         _ ->
@@ -268,19 +268,11 @@ scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
 	end, ok).
 
 
-suffix_match(DbName, DbSuffix) ->
-    case lists:last(binary:split(mem3:dbname(DbName), <<"/">>, [global])) of
-        DbSuffix ->
-            true;
-        _ ->
-            false
-    end.
-
 is_design_doc({Change}) ->
     case lists:keyfind(<<"id">>, 1, Change) of
         false ->
             false;
-        {Id, _} ->
+        {_, Id} ->
             is_design_doc_id(Id)
     end.
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/5218edf8/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
new file mode 100644
index 0000000..8f68a28
--- /dev/null
+++ b/src/couch_replicator_docs.erl
@@ -0,0 +1,582 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_docs).
+
+-export([parse_rep_doc/1, parse_rep_doc/2]).
+-export([before_doc_update/2, after_doc_read/2]).
+-export([ensure_rep_db_exists/0, ensure_rep_ddoc_exists/1]).
+-export([
+    update_doc_triggered/3,
+    update_doc_completed/3,
+    update_doc_error/4,
+    update_doc_replication_id/3,
+    update_doc_process_error/3
+]).
+
+-define(REP_DB_NAME, <<"_replicator">>).
+-define(REP_DESIGN_DOC, <<"_design/_replicator">>).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+-include("couch_replicator_js_functions.hrl").
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3,
+    to_binary/1
+]).
+
+-import(couch_replicator_utils, [
+    get_json_value/2,
+    get_json_value/3
+]).
+
+
+-define(OWNER, <<"owner">>).
+-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
+
+-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
+
+
+update_doc_triggered(DbName, DocId, {BaseId, _}) ->
+    update_rep_doc(DbName, DocId, [
+            {<<"_replication_state">>, <<"triggered">>},
+            {<<"_replication_state_reason">>, undefined},
+            {<<"_replication_id">>, ?l2b(BaseId)},
+            {<<"_replication_stats">>, undefined}]).
+
+
+update_doc_completed(DbName, DocId, Stats) ->
+    update_rep_doc(DbName, DocId, [
+        {<<"_replication_state">>, <<"completed">>},
+        {<<"_replication_state_reason">>, undefined},
+        {<<"_replication_stats">>, {Stats}}]).
+
+
+update_doc_error(DbName, DocId, {BaseId, _}, Error) ->
+    ErrorBinary = couch_replicator_utils:rep_error_to_binary(Error),
+    update_rep_doc(DbName, DocId, [
+            {<<"_replication_state">>, <<"error">>},
+            {<<"_replication_state_reason">>, ErrorBinary},
+            {<<"_replication_id">>, ?l2b(BaseId)}]).
+
+
+update_doc_process_error(DbName, DocId, Error) ->
+    Reason = case Error of
+        {bad_rep_doc, Reas} ->
+            Reas;
+        _ ->
+            to_binary(Error)
+    end,
+    couch_log:error("Error processing replication doc `~s`: ~s", [DocId, Reason]),
+    update_rep_doc(DbName, DocId, [
+        {<<"_replication_state">>, <<"error">>},
+        {<<"_replication_state_reason">>, Reason}]).
+
+
+update_doc_replication_id(DbName, DocId, RepId) ->
+    update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}]).
+
+
+ensure_rep_db_exists() ->
+    Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db, nologifmissing]) of
+        {ok, Db0} ->
+            Db0;
+        _Error ->
+            {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]),
+            Db0
+    end,
+    ensure_rep_ddoc_exists(?REP_DB_NAME),
+    {ok, Db}.
+
+
+ensure_rep_ddoc_exists(RepDb) ->
+    case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
+	true ->
+	    ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
+	false ->
+	    ok
+    end.
+
+
+ensure_rep_ddoc_exists(RepDb, DDocId) ->
+    case open_rep_doc(RepDb, DDocId) of
+        {ok, _Doc} ->
+            ok;
+        _ ->
+            DDoc = couch_doc:from_json_obj({[
+                {<<"_id">>, DDocId},
+                {<<"language">>, <<"javascript">>},
+                {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+            ]}),
+            try
+                {ok, _} = save_rep_doc(RepDb, DDoc)
+            catch
+                throw:conflict ->
+                    % NFC what to do about this other than
+                    % not kill the process.
+                    ok
+            end
+    end.
+
+
+parse_rep_doc(RepDoc) ->
+    {ok, Rep} = try
+        parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
+    catch
+    throw:{error, Reason} ->
+        throw({bad_rep_doc, Reason});
+    Tag:Err ->
+        throw({bad_rep_doc, to_binary({Tag, Err})})
+    end,
+    Rep.
+
+parse_rep_doc({Props}, UserCtx) ->
+    ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
+    Options = make_options(Props),
+    case get_value(cancel, Options, false) andalso
+        (get_value(id, Options, nil) =/= nil) of
+    true ->
+        {ok, #rep{options = Options, user_ctx = UserCtx}};
+    false ->
+        Source = parse_rep_db(get_value(<<"source">>, Props),
+                              ProxyParams, Options),
+        Target = parse_rep_db(get_value(<<"target">>, Props),
+                              ProxyParams, Options),
+
+
+        {RepType, View} = case get_value(<<"filter">>, Props) of
+                <<"_view">> ->
+                    {QP}  = get_value(query_params, Options, {[]}),
+                    ViewParam = get_value(<<"view">>, QP),
+                    View1 = case re:split(ViewParam, <<"/">>) of
+                        [DName, ViewName] ->
+                            {<< "_design/", DName/binary >>, ViewName};
+                        _ ->
+                            throw({bad_request, "Invalid `view` parameter."})
+                    end,
+                    {view, View1};
+                _ ->
+                    {db, nil}
+            end,
+
+        Rep = #rep{
+            source = Source,
+            target = Target,
+            options = Options,
+            user_ctx = UserCtx,
+            type = RepType,
+            view = View,
+            doc_id = get_value(<<"_id">>, Props, null)
+        },
+        {ok, Rep#rep{id = couch_replicator_ids:replication_id(Rep)}}
+    end.
+
+
+update_rep_doc(RepDbName, RepDocId, KVs) ->
+    update_rep_doc(RepDbName, RepDocId, KVs, 1).
+
+update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
+    try
+        case open_rep_doc(RepDbName, RepDocId) of
+            {ok, LastRepDoc} ->
+                update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
+            _ ->
+                ok
+        end
+    catch
+        throw:conflict ->
+            Msg = "Conflict when updating replication document `~s`. Retrying.",
+            couch_log:error(Msg, [RepDocId]),
+            ok = timer:sleep(random:uniform(erlang:min(128, Wait)) * 100),
+            update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
+    end;
+update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
+    NewRepDocBody = lists:foldl(
+        fun({K, undefined}, Body) ->
+                lists:keydelete(K, 1, Body);
+           ({<<"_replication_state">> = K, State} = KV, Body) ->
+                case get_json_value(K, Body) of
+                State ->
+                    Body;
+                _ ->
+                    Body1 = lists:keystore(K, 1, Body, KV),
+                    lists:keystore(
+                        <<"_replication_state_time">>, 1, Body1,
+                        {<<"_replication_state_time">>, timestamp()})
+                end;
+            ({K, _V} = KV, Body) ->
+                lists:keystore(K, 1, Body, KV)
+        end,
+        RepDocBody, KVs),
+    case NewRepDocBody of
+    RepDocBody ->
+        ok;
+    _ ->
+        % Might not succeed - when the replication doc is deleted right
+        % before this update (not an error, ignore).
+        save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
+    end.
+
+open_rep_doc(DbName, DocId) ->
+    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+    try
+        couch_db:open_doc(Db, DocId, [ejson_body])
+    after
+        couch_db:close(Db)
+    end.
+
+save_rep_doc(DbName, Doc) ->
+    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+    try
+        couch_db:update_doc(Db, Doc, [])
+    after
+        couch_db:close(Db)
+    end.
+
+
+% RFC3339 timestamps.
+% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
+timestamp() ->
+    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(os:timestamp()),
+    UTime = erlang:universaltime(),
+    LocalTime = calendar:universal_time_to_local_time(UTime),
+    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
+        calendar:datetime_to_gregorian_seconds(UTime),
+    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
+    iolist_to_binary(
+        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
+            [Year, Month, Day, Hour, Min, Sec,
+                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
+
+
+zone(Hr, Min) when Hr >= 0, Min >= 0 ->
+    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
+zone(Hr, Min) ->
+    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
+
+
+
+rep_user_ctx({RepDoc}) ->
+    case get_json_value(<<"user_ctx">>, RepDoc) of
+    undefined ->
+        #user_ctx{};
+    {UserCtx} ->
+        #user_ctx{
+            name = get_json_value(<<"name">>, UserCtx, null),
+            roles = get_json_value(<<"roles">>, UserCtx, [])
+        }
+    end.
+
+
+parse_rep_db({Props}, ProxyParams, Options) ->
+    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
+    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
+    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
+    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
+    DefaultHeaders = (#httpdb{})#httpdb.headers,
+    OAuth = case get_value(<<"oauth">>, AuthProps) of
+    undefined ->
+        nil;
+    {OauthProps} ->
+        #oauth{
+            consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
+            token = ?b2l(get_value(<<"token">>, OauthProps)),
+            token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
+            consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)),
+            signature_method =
+                case get_value(<<"signature_method">>, OauthProps) of
+                undefined ->        hmac_sha1;
+                <<"PLAINTEXT">> ->  plaintext;
+                <<"HMAC-SHA1">> ->  hmac_sha1;
+                <<"RSA-SHA1">> ->   rsa_sha1
+                end
+        }
+    end,
+    #httpdb{
+        url = Url,
+        oauth = OAuth,
+        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
+        ibrowse_options = lists:keysort(1,
+            [{socket_options, get_value(socket_options, Options)} |
+                ProxyParams ++ ssl_params(Url)]),
+        timeout = get_value(connection_timeout, Options),
+        http_connections = get_value(http_connections, Options),
+        retries = get_value(retries, Options)
+    };
+parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
+    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
+parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
+    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
+parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
+    DbName.
+
+
+maybe_add_trailing_slash(Url) when is_binary(Url) ->
+    maybe_add_trailing_slash(?b2l(Url));
+maybe_add_trailing_slash(Url) ->
+    case lists:last(Url) of
+    $/ ->
+        Url;
+    _ ->
+        Url ++ "/"
+    end.
+
+
+make_options(Props) ->
+    Options0 = lists:ukeysort(1, convert_options(Props)),
+    Options = check_options(Options0),
+    DefWorkers = config:get("replicator", "worker_processes", "4"),
+    DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
+    DefConns = config:get("replicator", "http_connections", "20"),
+    DefTimeout = config:get("replicator", "connection_timeout", "30000"),
+    DefRetries = config:get("replicator", "retries_per_request", "10"),
+    UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
+    DefCheckpointInterval = config:get("replicator", "checkpoint_interval", "30000"),
+    {ok, DefSocketOptions} = couch_util:parse_term(
+        config:get("replicator", "socket_options",
+            "[{keepalive, true}, {nodelay, false}]")),
+    lists:ukeymerge(1, Options, lists:keysort(1, [
+        {connection_timeout, list_to_integer(DefTimeout)},
+        {retries, list_to_integer(DefRetries)},
+        {http_connections, list_to_integer(DefConns)},
+        {socket_options, DefSocketOptions},
+        {worker_batch_size, list_to_integer(DefBatchSize)},
+        {worker_processes, list_to_integer(DefWorkers)},
+        {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
+        {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
+    ])).
+
+
+convert_options([])->
+    [];
+convert_options([{<<"cancel">>, V} | R]) ->
+    [{cancel, V} | convert_options(R)];
+convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
+        IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
+    Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
+    [{id, Id} | convert_options(R)];
+convert_options([{<<"create_target">>, V} | R]) ->
+    [{create_target, V} | convert_options(R)];
+convert_options([{<<"continuous">>, V} | R]) ->
+    [{continuous, V} | convert_options(R)];
+convert_options([{<<"filter">>, V} | R]) ->
+    [{filter, V} | convert_options(R)];
+convert_options([{<<"query_params">>, V} | R]) ->
+    [{query_params, V} | convert_options(R)];
+convert_options([{<<"doc_ids">>, null} | R]) ->
+    convert_options(R);
+convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
+    throw({bad_request, <<"parameter `doc_ids` must be an array">>});
+convert_options([{<<"doc_ids">>, V} | R]) ->
+    % Ensure same behaviour as old replicator: accept a list of percent
+    % encoded doc IDs.
+    DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
+    [{doc_ids, DocIds} | convert_options(R)];
+convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
+    throw({bad_request, <<"parameter `selector` must be a JSON object">>});
+convert_options([{<<"selector">>, V} | R]) ->
+    [{selector, V} | convert_options(R)];
+convert_options([{<<"worker_processes">>, V} | R]) ->
+    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"worker_batch_size">>, V} | R]) ->
+    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"http_connections">>, V} | R]) ->
+    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"connection_timeout">>, V} | R]) ->
+    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"retries_per_request">>, V} | R]) ->
+    [{retries, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"socket_options">>, V} | R]) ->
+    {ok, SocketOptions} = couch_util:parse_term(V),
+    [{socket_options, SocketOptions} | convert_options(R)];
+convert_options([{<<"since_seq">>, V} | R]) ->
+    [{since_seq, V} | convert_options(R)];
+convert_options([{<<"use_checkpoints">>, V} | R]) ->
+    [{use_checkpoints, V} | convert_options(R)];
+convert_options([{<<"checkpoint_interval">>, V} | R]) ->
+    [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([_ | R]) -> % skip unknown option
+    convert_options(R).
+
+check_options(Options) ->
+    DocIds = lists:keyfind(doc_ids, 1, Options),
+    Filter = lists:keyfind(filter, 1, Options),
+    Selector = lists:keyfind(selector, 1, Options),
+    case {DocIds, Filter, Selector} of
+        {false, false, false} -> Options;
+        {false, false, _} -> Options;
+        {false, _, false} -> Options;
+        {_, false, false} -> Options;
+        _ ->
+            throw({bad_request, "`doc_ids`, `filter`, `selector` are mutually exclusive options"})
+    end.
+
+
+parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
+    parse_proxy_params(?b2l(ProxyUrl));
+parse_proxy_params([]) ->
+    [];
+parse_proxy_params(ProxyUrl) ->
+    #url{
+        host = Host,
+        port = Port,
+        username = User,
+        password = Passwd,
+        protocol = Protocol
+    } = ibrowse_lib:parse_url(ProxyUrl),
+    [{proxy_protocol, Protocol}, {proxy_host, Host}, {proxy_port, Port}] ++
+        case is_list(User) andalso is_list(Passwd) of
+        false ->
+            [];
+        true ->
+            [{proxy_user, User}, {proxy_password, Passwd}]
+        end.
+
+
+ssl_params(Url) ->
+    case ibrowse_lib:parse_url(Url) of
+    #url{protocol = https} ->
+        Depth = list_to_integer(
+            config:get("replicator", "ssl_certificate_max_depth", "3")
+        ),
+        VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
+        CertFile = config:get("replicator", "cert_file", undefined),
+        KeyFile = config:get("replicator", "key_file", undefined),
+        Password = config:get("replicator", "password", undefined),
+        SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
+        SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
+            true ->
+                case Password of
+                    undefined ->
+                        [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
+                    _ ->
+                        [{certfile, CertFile}, {keyfile, KeyFile},
+                            {password, Password}] ++ SslOpts
+                end;
+            false -> SslOpts
+        end,
+        [{is_ssl, true}, {ssl_options, SslOpts1}];
+    #url{protocol = http} ->
+        []
+    end.
+
+ssl_verify_options(Value) ->
+    ssl_verify_options(Value, erlang:system_info(otp_release)).
+
+ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
+    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
+    [{verify, verify_peer}, {cacertfile, CAFile}];
+ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
+    [{verify, verify_none}];
+ssl_verify_options(true, _OTPVersion) ->
+    CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
+    [{verify, 2}, {cacertfile, CAFile}];
+ssl_verify_options(false, _OTPVersion) ->
+    [{verify, 0}].
+
+
+
+before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
+    Doc;
+before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
+    #user_ctx{roles = Roles, name = Name} = UserCtx,
+    case lists:member(<<"_replicator">>, Roles) of
+    true ->
+        Doc;
+    false ->
+        case couch_util:get_value(?OWNER, Body) of
+        undefined ->
+            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+        Name ->
+            Doc;
+        Other ->
+            case (catch couch_db:check_is_admin(Db)) of
+            ok when Other =:= null ->
+                Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+            ok ->
+                Doc;
+            _ ->
+                throw({forbidden, <<"Can't update replication documents",
+                    " from other users.">>})
+            end
+        end
+    end.
+
+
+after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
+    Doc;
+after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
+    #user_ctx{name = Name} = UserCtx,
+    case (catch couch_db:check_is_admin(Db)) of
+    ok ->
+        Doc;
+    _ ->
+        case couch_util:get_value(?OWNER, Body) of
+        Name ->
+            Doc;
+        _Other ->
+            Source = strip_credentials(couch_util:get_value(<<"source">>,
+Body)),
+            Target = strip_credentials(couch_util:get_value(<<"target">>,
+Body)),
+            NewBody0 = ?replace(Body, <<"source">>, Source),
+            NewBody = ?replace(NewBody0, <<"target">>, Target),
+            #doc{revs = {Pos, [_ | Revs]}} = Doc,
+            NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
+            NewRevId = couch_db:new_revid(NewDoc),
+            NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
+        end
+    end.
+
+
+
+strip_credentials(undefined) ->
+    undefined;
+strip_credentials(Url) when is_binary(Url) ->
+    re:replace(Url,
+        "http(s)?://(?:[^:]+):[^@]+@(.*)$",
+        "http\\1://\\2",
+        [{return, binary}]);
+strip_credentials({Props}) ->
+    {lists:keydelete(<<"oauth">>, 1, Props)}.
+
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+check_options_pass_values_test() ->
+    ?assertEqual(check_options([]), []),
+    ?assertEqual(check_options([baz, {other,fiz}]), [baz, {other, fiz}]),
+    ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]),
+    ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
+    ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
+
+check_options_fail_values_test() ->
+    ?assertThrow({bad_request, _},
+        check_options([{doc_ids, x}, {filter, y}])),
+    ?assertThrow({bad_request, _},
+        check_options([{doc_ids, x}, {selector, y}])),
+    ?assertThrow({bad_request, _},
+        check_options([{filter, x}, {selector, y}])),
+    ?assertThrow({bad_request, _},
+        check_options([{doc_ids, x}, {selector, y}, {filter, z}])).
+
+-endif.
+
+

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/5218edf8/src/couch_replicator_ids.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_ids.erl b/src/couch_replicator_ids.erl
new file mode 100644
index 0000000..4c9b987
--- /dev/null
+++ b/src/couch_replicator_ids.erl
@@ -0,0 +1,193 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_ids).
+
+-export([replication_id/1, replication_id/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3
+]).
+
+
+replication_id(#rep{options = Options} = Rep) ->
+    BaseId = replication_id(Rep, ?REP_ID_VERSION),
+    {BaseId, maybe_append_options([continuous, create_target], Options)}.
+
+% Versioned clauses for generating replication IDs.
+% If a change is made to how replications are identified,
+% please add a new clause and increase ?REP_ID_VERSION.
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 3) ->
+    UUID = couch_server:get_uuid(),
+    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    maybe_append_filters([UUID, Src, Tgt], Rep);
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
+    {ok, HostName} = inet:gethostname(),
+    Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
+    P when is_number(P) ->
+        P;
+    _ ->
+        % On restart we might be called before the couch_httpd process is
+        % started.
+        % TODO: we might be under an SSL socket server only, or both under
+        % SSL and a non-SSL socket.
+        % ... mochiweb_socket_server:get(https, port)
+        list_to_integer(config:get("httpd", "port", "5984"))
+    end,
+    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    maybe_append_filters([HostName, Port, Src, Tgt], Rep);
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
+    {ok, HostName} = inet:gethostname(),
+    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    maybe_append_filters([HostName, Src, Tgt], Rep).
+
+
+% Private functions
+
+maybe_append_filters(Base,
+        #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
+    Filter = get_value(filter, Options),
+    DocIds = get_value(doc_ids, Options),
+    Selector = get_value(selector, Options),
+    Base2 = Base ++
+        case {Filter, DocIds, Selector} of
+        {undefined, undefined, undefined} ->
+            [];
+        {<<"_", _/binary>>, undefined, undefined} ->
+            [Filter, get_value(query_params, Options, {[]})];
+        {_, undefined, undefined} ->
+            [filter_code(Filter, Source, UserCtx),
+                get_value(query_params, Options, {[]})];
+        {undefined, _, undefined} ->
+            [DocIds];
+        {undefined, undefined, _} ->
+            [ejsort(mango_selector:normalize(Selector))];
+        _ ->
+            throw({error, <<"`selector`, `filter` and `doc_ids` fields are mutually exclusive">>})
+        end,
+    couch_util:to_hex(couch_crypto:hash(md5, term_to_binary(Base2))).
+
+
+filter_code(Filter, Source, UserCtx) ->
+    {DDocName, FilterName} =
+    case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
+    {match, [DDocName0, FilterName0]} ->
+        {DDocName0, FilterName0};
+    _ ->
+        throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
+    end,
+    Db = case (catch couch_replicator_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of
+    {ok, Db0} ->
+        Db0;
+    DbError ->
+        DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
+           [couch_replicator_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]),
+        throw({error, iolist_to_binary(DbErrorMsg)})
+    end,
+    try
+        Body = case (catch couch_replicator_api_wrap:open_doc(
+            Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
+        {ok, #doc{body = Body0}} ->
+            Body0;
+        DocError ->
+            DocErrorMsg = io_lib:format(
+                "Couldn't open document `_design/~s` from source "
+                "database `~s`: ~s", [DDocName, couch_replicator_api_wrap:db_uri(Source),
+                    couch_util:to_binary(DocError)]),
+            throw({error, iolist_to_binary(DocErrorMsg)})
+        end,
+        Code = couch_util:get_nested_json_value(
+            Body, [<<"filters">>, FilterName]),
+        re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
+    after
+        couch_replicator_api_wrap:db_close(Db)
+    end.
+
+
+maybe_append_options(Options, RepOptions) ->
+    lists:foldl(fun(Option, Acc) ->
+        Acc ++
+        case get_value(Option, RepOptions, false) of
+        true ->
+            "+" ++ atom_to_list(Option);
+        false ->
+            ""
+        end
+    end, [], Options).
+
+
+get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
+    DefaultHeaders = (#httpdb{})#httpdb.headers,
+    case OAuth of
+    nil ->
+        {remote, Url, Headers -- DefaultHeaders};
+    #oauth{} ->
+        {remote, Url, Headers -- DefaultHeaders, OAuth}
+    end;
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+    {local, DbName, UserCtx}.
+
+
+% Sort an EJSON object's properties to attempt
+% to generate a unique representation. This is used
+% to reduce the chance of getting different
+% replication checkpoints for the same Mango selector
+ejsort({V})->
+    ejsort_props(V, []);
+ejsort(V) when is_list(V) ->
+    ejsort_array(V, []);
+ejsort(V) ->
+    V.
+
+ejsort_props([], Acc)->
+    {lists:keysort(1, Acc)};
+ejsort_props([{K, V}| R], Acc) ->
+    ejsort_props(R, [{K, ejsort(V)} | Acc]).
+
+ejsort_array([], Acc)->
+    lists:reverse(Acc);
+ejsort_array([V | R], Acc) ->
+    ejsort_array(R, [ejsort(V) | Acc]).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+ejsort_basic_values_test() ->
+    ?assertEqual(ejsort(0), 0),
+    ?assertEqual(ejsort(<<"a">>), <<"a">>),
+    ?assertEqual(ejsort(true), true),
+    ?assertEqual(ejsort([]), []),
+    ?assertEqual(ejsort({[]}), {[]}).
+
+ejsort_compound_values_test() ->
+    ?assertEqual(ejsort([2, 1, 3 ,<<"a">>]), [2, 1, 3, <<"a">>]),
+    Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0},  {<<"b">>, 0}]},
+    Ej1s =  {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]},
+    ?assertEqual(ejsort(Ej1), Ej1s),
+    Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]},
+    ?assertEqual(ejsort(Ej2),
+        {[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}).
+
+-endif.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/5218edf8/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 342dffb..37bbb82 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -13,97 +13,72 @@
 -module(couch_replicator_manager).
 -behaviour(gen_server).
 -vsn(2).
--behaviour(config_listener).
+-behaviour(couch_multidb_changes).
 
 % public API
 -export([replication_started/1, replication_completed/2, replication_error/2]).
 -export([continue/1, replication_usurped/2]).
 
+% NV: TODO: These functions were moved to couch_replicator_docs
+% but it is still called from fabric_doc_update. Keep it here for now
+% later, update fabric to call couch_replicator_docs instead
 -export([before_doc_update/2, after_doc_read/2]).
 
 % gen_server callbacks
 -export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
 -export([code_change/3, terminate/2]).
 
-% changes callbacks
--export([changes_reader/3, changes_reader_cb/3]).
-
-% config_listener callback
--export([handle_config_change/5, handle_config_terminate/3]).
-
--export([handle_db_event/3]).
+% multidb changes callback
+-export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
 
 %% exported but private
--export([start_replication/2]).
+-export([start_replication/1]).
+
+% imports
+-import(couch_replicator_utils, [
+    get_json_value/2,
+    get_json_value/3
+]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
+
 
 -define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
 -define(REP_TO_STATE, couch_rep_id_to_rep_state).
--define(INITIAL_WAIT, 2.5). % seconds
--define(MAX_WAIT, 600).     % seconds
--define(AVG_ERROR_DELAY_MSEC, 100).
--define(MAX_ERROR_DELAY_MSEC, 60000).
--define(OWNER, <<"owner">>).
-
--define(DB_TO_SEQ, db_to_seq).
--define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
-
--define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-
--record(rep_state, {
-    rep,
-    starting,
-    retries_left,
-    max_retries,
-    wait = ?INITIAL_WAIT
-}).
 
--import(couch_util, [
-    to_binary/1
-]).
 
 -record(state, {
-    event_listener = nil,
-    scan_pid = nil,
+    mdb_listener = nil,
     rep_start_pids = [],
-    max_retries,
-    live = [],
-    epoch = nil
+    live = []
 }).
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
-replication_started(#rep{id = {BaseId, _} = RepId}) ->
+replication_started(#rep{id = RepId}) ->
     case rep_state(RepId) of
     nil ->
         ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"triggered">>},
-            {<<"_replication_state_reason">>, undefined},
-            {<<"_replication_id">>, ?l2b(BaseId)},
-            {<<"_replication_stats">>, undefined}]),
-        ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+    #rep{db_name = DbName, doc_id = DocId} ->
+        couch_replicator_docs:update_doc_triggered(DbName, DocId, RepId),
+        %NV: TODO: This used to be
+        % ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+        % now just write triggered for compatibility, in the future do something
+        % in the scheduler to handle repeated failed starts
         couch_log:notice("Document `~s` triggered replication `~s`",
             [DocId, pp_rep_id(RepId)])
     end.
 
-
 replication_completed(#rep{id = RepId}, Stats) ->
     case rep_state(RepId) of
     nil ->
         ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"completed">>},
-            {<<"_replication_state_reason">>, undefined},
-            {<<"_replication_stats">>, {Stats}}]),
+    #rep{db_name = DbName, doc_id = DocId} ->
+        couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
         ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
         couch_log:notice("Replication `~s` finished (triggered by document `~s`)",
             [pp_rep_id(RepId), DocId])
@@ -114,35 +89,25 @@ replication_usurped(#rep{id = RepId}, By) ->
     case rep_state(RepId) of
     nil ->
         ok;
-    #rep_state{rep = #rep{doc_id = DocId}} ->
+    #rep{doc_id = DocId} ->
         ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
         couch_log:notice("Replication `~s` usurped by ~s (triggered by document `~s`)",
             [pp_rep_id(RepId), By, DocId])
     end.
 
 
-replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
+replication_error(#rep{id = RepId}, Error) ->
     case rep_state(RepId) of
     nil ->
         ok;
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
-        ok = add_error_jitter(),
-        update_rep_doc(DbName, DocId, [
-            {<<"_replication_state">>, <<"error">>},
-            {<<"_replication_state_reason">>, to_binary(error_reason(Error))},
-            {<<"_replication_id">>, ?l2b(BaseId)}]),
+    #rep{db_name = DbName, doc_id = DocId} ->
+        % NV: TODO: We might want to do something else instead of update doc on every error
+         couch_replicator_docs:update_doc_error(DbName, DocId, RepId, Error),
         ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
     end.
 
-% Add random delay proportional to the number of replications
-% on current node, in order to prevent a stampede when a source
-% with multiple replication targets fails
-add_error_jitter() ->
-    RepCount = ets:info(?REP_TO_STATE, size),
-    Range = min(2 * RepCount * ?AVG_ERROR_DELAY_MSEC, ?MAX_ERROR_DELAY_MSEC),
-    timer:sleep(random:uniform(Range)).
-
 
+% NV: TODO: Here need to use the new cluster ownership bit.
 continue(#rep{doc_id = null}) ->
     {true, no_owner};
 continue(#rep{id = RepId}) ->
@@ -150,48 +115,22 @@ continue(#rep{id = RepId}) ->
     {node() == Owner, Owner}.
 
 
-handle_config_change("replicator", "max_replication_retry_count", V, _, S) ->
-    ok = gen_server:cast(S, {set_max_retries, retries_value(V)}),
-    {ok, S};
-handle_config_change(_, _, _, _, S) ->
-    {ok, S}.
-
-handle_config_terminate(_, stop, _) -> ok;
-handle_config_terminate(Self, _, _) ->
-    spawn(fun() ->
-        timer:sleep(5000),
-        config:listen_for_changes(?MODULE, Self)
-    end).
-
 init(_) ->
     process_flag(trap_exit, true),
     net_kernel:monitor_nodes(true),
     Live = [node() | nodes()],
     ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, public]),
     ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, public]),
-    ?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, public]),
-    Server = self(),
-    ok = config:listen_for_changes(?MODULE, Server),
-    Epoch = make_ref(),
-    ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
-    % Automatically start node local changes feed loop
-    ensure_rep_db_exists(<<"_replicator">>),
-    Pid = start_changes_reader(<<"_replicator">>, 0, Epoch),
-    {ok, #state{
-        event_listener = start_event_listener(),
-        scan_pid = ScanPid,
-        max_retries = retries_value(
-            config:get("replicator", "max_replication_retry_count", "10")),
-        rep_start_pids = [Pid],
-        live = Live,
-        epoch = Epoch
-    }}.
+    couch_replicator_docs:ensure_rep_db_exists(),
+    {ok, #state{mdb_listener = start_mdb_listener(), live = Live}}.
 
+% NV: TODO: Use new cluster membership module here. Possible return value
+% could be 'unstable' in which case should keep old owner + possibly logging.
 handle_call({owner, RepId}, _From, State) ->
     case rep_state(RepId) of
     nil ->
         {reply, nonode, State};
-    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
+    #rep{db_name = DbName, doc_id = DocId} ->
         {reply, owner(DbName, DocId, State#state.live), State}
     end;
 
@@ -202,27 +141,12 @@ handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
     _Tag:Error ->
         {RepProps} = get_json_value(doc, ChangeProps),
         DocId = get_json_value(<<"_id">>, RepProps),
-        rep_db_update_error(Error, DbName, DocId),
+        couch_replicator_docs:update_doc_process_error(DbName, DocId, Error),
         State
     end,
     {reply, ok, NewState};
 
 
-handle_call({rep_started, RepId}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    RepState ->
-        NewRepState = RepState#rep_state{
-            starting = false,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries,
-            wait = ?INITIAL_WAIT
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
-    end,
-    {reply, ok, State};
-
 handle_call({rep_complete, RepId}, _From, State) ->
     true = ets:delete(?REP_TO_STATE, RepId),
     {reply, ok, State};
@@ -230,99 +154,40 @@ handle_call({rep_complete, RepId}, _From, State) ->
 handle_call({rep_error, RepId, Error}, _From, State) ->
     {reply, ok, replication_error(State, RepId, Error)};
 
-% Match changes epoch with the current epoch in the state.
-% New epoch ref is created on a full rescan. Change feeds have to
-% be replayed from the start to determine ownership in the new
-% cluster configuration and epoch is used to match & checkpoint
-% only changes from the current cluster configuration.
-handle_call({rep_db_checkpoint, DbName, EndSeq, Epoch}, _From,
-            #state{epoch = Epoch} = State) ->
-    Entry = case ets:lookup(?DB_TO_SEQ, DbName) of
-        [] ->
-            {DbName, EndSeq, false};
-        [{DbName, _OldSeq, Rescan}] ->
-            {DbName, EndSeq, Rescan}
-    end,
-    true = ets:insert(?DB_TO_SEQ, Entry),
-    {reply, ok, State};
-
-% Ignore checkpoints from previous epoch.
-handle_call({rep_db_checkpoint, _DbName, _EndSeq, _Epoch}, _From, State) ->
-    {reply, ok, State};
-
 handle_call(Msg, From, State) ->
     couch_log:error("Replication manager received unexpected call ~p from ~p",
         [Msg, From]),
     {stop, {error, {unexpected_call, Msg}}, State}.
 
-handle_cast({resume_scan, DbName}, State) ->
-    Pids = State#state.rep_start_pids,
-    NewPids = case lists:keyfind(DbName, 1, Pids) of
-        {DbName, _Pid} ->
-            Entry = case ets:lookup(?DB_TO_SEQ, DbName) of
-                [] ->
-                    {DbName, 0, true};
-                [{DbName, EndSeq, _Rescan}] ->
-                    {DbName, EndSeq, true}
-            end,
-            true = ets:insert(?DB_TO_SEQ, Entry),
-            Pids;
-        false ->
-            Since = case ets:lookup(?DB_TO_SEQ, DbName) of
-                [] -> 0;
-                [{DbName, EndSeq, _Rescan}] -> EndSeq
-            end,
-            true = ets:insert(?DB_TO_SEQ, {DbName, Since, false}),
-            ensure_rep_ddoc_exists(DbName),
-            Pid = start_changes_reader(DbName, Since, State#state.epoch),
-            couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
-            [{DbName, Pid} | Pids]
-    end,
-    {noreply, State#state{rep_start_pids = NewPids}};
-
-handle_cast({set_max_retries, MaxRetries}, State) ->
-    {noreply, State#state{max_retries = MaxRetries}};
 
 handle_cast(Msg, State) ->
     couch_log:error("Replication manager received unexpected cast ~p", [Msg]),
     {stop, {error, {unexpected_cast, Msg}}, State}.
 
+% NV: TODO: Remove when switching to new cluster membership module
 handle_info({nodeup, Node}, State) ->
     couch_log:notice("Rescanning replicator dbs as ~s came up.", [Node]),
     Live = lists:usort([Node | State#state.live]),
     {noreply, rescan(State#state{live=Live})};
 
+% NV: TODO: Remove when switching to new cluster membership module
 handle_info({nodedown, Node}, State) ->
     couch_log:notice("Rescanning replicator dbs ~s went down.", [Node]),
     Live = State#state.live -- [Node],
     {noreply, rescan(State#state{live=Live})};
 
-handle_info({'EXIT', From, normal}, #state{scan_pid = From} = State) ->
-    couch_log:debug("Background scan has completed.", []),
-    {noreply, State#state{scan_pid=nil}};
-
-handle_info({'EXIT', From, Reason}, #state{scan_pid = From} = State) ->
-    couch_log:error("Background scanner died. Reason: ~p", [Reason]),
-    {stop, {scanner_died, Reason}, State};
-
-handle_info({'EXIT', From, Reason}, #state{event_listener = From} = State) ->
+handle_info({'EXIT', From, Reason}, #state{mdb_listener = From} = State) ->
     couch_log:error("Database update notifier died. Reason: ~p", [Reason]),
     {stop, {db_update_notifier_died, Reason}, State};
 
 handle_info({'EXIT', From, Reason}, #state{rep_start_pids = Pids} = State) ->
     case lists:keytake(From, 2, Pids) of
-        {value, {DbName, From}, NewPids} ->
+        {value, {rep_start, From}, NewPids} ->
             if Reason == normal -> ok; true ->
-                Fmt = "~s : Known replication or change feed pid ~w died :: ~w",
+                Fmt = "~s : Known replication pid ~w died :: ~w",
                 couch_log:error(Fmt, [?MODULE, From, Reason])
             end,
-            NewState = State#state{rep_start_pids = NewPids},
-            case ets:lookup(?DB_TO_SEQ, DbName) of
-                [{DbName, _EndSeq, true}] ->
-                    handle_cast({resume_scan, DbName}, NewState);
-                _ ->
-                    {noreply, NewState}
-            end;
+            {noreply, State#state{rep_start_pids = NewPids}};
         false when Reason == normal ->
             {noreply, State};
         false ->
@@ -345,9 +210,8 @@ handle_info(Msg, State) ->
 
 terminate(_Reason, State) ->
     #state{
-        scan_pid = ScanPid,
         rep_start_pids = StartPids,
-        event_listener = Listener
+        mdb_listener = Listener
     } = State,
     stop_all_replications(),
     lists:foreach(
@@ -355,107 +219,53 @@ terminate(_Reason, State) ->
             catch unlink(Pid),
             catch exit(Pid, stop)
         end,
-        [{scanner, ScanPid} | StartPids]),
+        StartPids),
     true = ets:delete(?REP_TO_STATE),
     true = ets:delete(?DOC_TO_REP),
-    true = ets:delete(?DB_TO_SEQ),
-    couch_event:stop_listener(Listener).
+    catch unlink(Listener),
+    catch exit(Listener).
 
 
-code_change(1, State, _Extra) ->
-    {ok, erlang:append_element(State, [node() | nodes()])};
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-start_changes_reader(DbName, Since, Epoch) ->
-    spawn_link(?MODULE, changes_reader, [{self(), Epoch}, DbName, Since]).
-
-changes_reader({Server, Epoch}, DbName, Since) ->
-    UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
-    DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
-    {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
-    ChangesFeedFun = couch_changes:handle_db_changes(
-        #changes_args{
-            include_docs = true,
-            since = Since,
-            feed = "longpoll",
-            timeout = infinity
-        },
-        {json_req, null},
-        Db
-    ),
-    ChangesFeedFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName, Epoch}}).
-
-changes_reader_cb({change, Change, _}, _, {Server, DbName, Epoch}) ->
-    case has_valid_rep_id(Change) of
-        true ->
-            Msg = {rep_db_update, DbName, Change},
-            ok = gen_server:call(Server, Msg, infinity);
-        false ->
-            ok
-    end,
-    {Server, DbName, Epoch};
-changes_reader_cb({stop, EndSeq, _Pending}, _, {Server, DbName, Epoch}) ->
-    Msg = {rep_db_checkpoint, DbName, EndSeq},
-    ok = gen_server:call(Server, Msg, infinity),
-    {Server, DbName, Epoch};
-changes_reader_cb(_, _, Acc) ->
-    Acc.
-
-has_valid_rep_id({Change}) ->
-    has_valid_rep_id(get_json_value(<<"id">>, Change));
-has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
-    false;
-has_valid_rep_id(_Else) ->
-    true.
-
-
-start_event_listener() ->
-    {ok, Pid} = couch_event:link_listener(
-            ?MODULE, handle_db_event, self(), [all_dbs]
-        ),
+start_mdb_listener() ->
+    {ok, Pid} = couch_multidb_changes:start_link(
+        <<"_replicator">>, ?MODULE, self(), [skip_ddocs]),
     Pid.
 
 
-handle_db_event(DbName, created, Server) ->
-    case is_replicator_db(DbName) of
-	true ->
-	    ensure_rep_ddoc_exists(DbName);
-	_ ->
-	    ok
-    end,
-    {ok, Server};
-handle_db_event(DbName, updated, Server) ->
-    case is_replicator_db(DbName) of
-        true ->
-	    Msg = {resume_scan, DbName},
-	    ok = gen_server:cast(Server, Msg);
-        _ ->
-            ok
-    end,
-    {ok, Server};
-handle_db_event(DbName, deleted, Server) ->
-    case is_replicator_db(DbName) of
-        true ->
-            clean_up_replications(DbName);
-        _ ->
-            ok
-    end,
-    {ok, Server};
-handle_db_event(_DbName, _Event, Server) ->
-    {ok, Server}.
-
-rescan(#state{scan_pid = nil} = State) ->
-    true = ets:delete_all_objects(?DB_TO_SEQ),
-    Server = self(),
-    Epoch = make_ref(),
-    NewScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
-    State#state{scan_pid = NewScanPid, epoch = Epoch};
-rescan(#state{scan_pid = ScanPid} = State) ->
-    unlink(ScanPid),
-    exit(ScanPid, exit),
-    rescan(State#state{scan_pid = nil}).
+%%%%%% multidb changes callbacks
+
+db_created(DbName, Server) ->
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    Server.
+
+db_deleted(DbName, Server) ->
+    clean_up_replications(DbName),
+    Server.
+
+db_found(DbName, Server) ->
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    Server.
+
+db_change(DbName, Change, Server) ->
+    ok = gen_server:call(Server, {rep_db_update, DbName, Change}, infinity),
+    Server.
+
+
+% NV: TODO: This will change when using the new clustering module.
+% Rescan should happend when cluster membership changes, clustering module
+% handles with an appropriate back-off. mdb_listener should probably live
+% in a supervisor and that supervisor should be asked to restart it.
+rescan(#state{mdb_listener = nil} = State) ->
+    State#state{mdb_listener = start_mdb_listener()};
+rescan(#state{mdb_listener = MPid} = State) ->
+    unlink(MPid),
+    exit(MPid, exit),
+    rescan(State#state{mdb_listener = nil}).
+
 
 process_update(State, DbName, {Change}) ->
     {RepProps} = JsonRepDoc = get_json_value(doc, Change),
@@ -487,6 +297,7 @@ process_update(State, DbName, {Change}) ->
         end
     end.
 
+
 owner(<<"shards/", _/binary>> = DbName, DocId, Live) ->
     Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(mem3:dbname(DbName), DocId),
 			     lists:member(N, Live)]),
@@ -494,94 +305,45 @@ owner(<<"shards/", _/binary>> = DbName, DocId, Live) ->
 owner(_DbName, _DocId, _Live) ->
     node().
 
-rep_db_update_error(Error, DbName, DocId) ->
-    case Error of
-    {bad_rep_doc, Reason} ->
-        ok;
-    _ ->
-        Reason = to_binary(Error)
-    end,
-    couch_log:error("Replication manager, error processing document `~s`: ~s",
-        [DocId, Reason]),
-    update_rep_doc(DbName, DocId, [{<<"_replication_state">>, <<"error">>},
-                           {<<"_replication_state_reason">>, Reason}]).
-
-
-rep_user_ctx({RepDoc}) ->
-    case get_json_value(<<"user_ctx">>, RepDoc) of
-    undefined ->
-        #user_ctx{};
-    {UserCtx} ->
-        #user_ctx{
-            name = get_json_value(<<"name">>, UserCtx, null),
-            roles = get_json_value(<<"roles">>, UserCtx, [])
-        }
-    end.
 
 
 maybe_start_replication(State, DbName, DocId, RepDoc) ->
-    #rep{id = {BaseId, _} = RepId} = Rep0 = parse_rep_doc(RepDoc),
+    Rep0 = couch_replicator_docs:parse_rep_doc(RepDoc),
+    #rep{id = {BaseId, _} = RepId} = Rep0,
     Rep = Rep0#rep{db_name = DbName},
     case rep_state(RepId) of
     nil ->
-        RepState = #rep_state{
-            rep = Rep,
-            starting = true,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
+        true = ets:insert(?REP_TO_STATE, {RepId, Rep}),
         true = ets:insert(?DOC_TO_REP, {{DbName, DocId}, RepId}),
         couch_log:notice("Attempting to start replication `~s` (document `~s`).",
             [pp_rep_id(RepId), DocId]),
-        StartDelaySecs = erlang:max(0,
-            config:get_integer("replicator", "start_delay", 10)),
-        StartSplaySecs = erlang:max(1,
-            config:get_integer("replicator", "start_splay", 50)),
-        DelaySecs = StartDelaySecs + random:uniform(StartSplaySecs),
-        couch_log:notice("Delaying replication `~s` start by ~p seconds.",
-            [pp_rep_id(RepId), DelaySecs]),
-        Pid = spawn_link(?MODULE, start_replication, [Rep, DelaySecs]),
+        Pid = spawn_link(?MODULE, start_replication, [Rep]),
         State#state{
             rep_start_pids = [{rep_start, Pid} | State#state.rep_start_pids]
         };
-    #rep_state{rep = #rep{doc_id = DocId}} ->
+    #rep{doc_id = DocId} ->
         State;
-    #rep_state{starting = false, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
-        couch_log:notice("The replication specified by the document `~s` was already"
+    #rep{db_name = DbName, doc_id = OtherDocId} ->
+        couch_log:notice("The replication specified by the document `~s` already started"
             " triggered by the document `~s`", [DocId, OtherDocId]),
         maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
-        State;
-    #rep_state{starting = true, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
-        couch_log:notice("The replication specified by the document `~s` is already"
-            " being triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
         State
     end.
 
 
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} = try
-        couch_replicator_utils:parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-    catch
-    throw:{error, Reason} ->
-        throw({bad_rep_doc, Reason});
-    Tag:Err ->
-        throw({bad_rep_doc, to_binary({Tag, Err})})
-    end,
-    Rep.
-
-
 maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
     case get_json_value(<<"_replication_id">>, RepProps) of
     RepId ->
         ok;
     _ ->
-        update_rep_doc(DbName, DocId, [{<<"_replication_id">>, RepId}])
+        couch_replicator_docs:update_doc_replication_id(DbName, DocId, RepId)
     end.
 
-start_replication(Rep, Wait) ->
-    ok = timer:sleep(Wait * 1000),
+start_replication(Rep) ->
+    % NV: TODO: Removed splay and back-off sleep on error. Instead to replace that
+    % temporarily add some random sleep here. To avoid repeated failed restarts in
+    % a loop if source doc is broken
+    timer:sleep(random:uniform(1000)),
     case (catch couch_replicator:async_replicate(Rep)) of
     {ok, _} ->
         ok;
@@ -591,31 +353,12 @@ start_replication(Rep, Wait) ->
 
 replication_complete(DbName, DocId) ->
     case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
-    [{{DbName, DocId}, {BaseId, Ext} = RepId}] ->
-        case rep_state(RepId) of
-        nil ->
-            % Prior to OTP R14B02, temporary child specs remain in
-            % in the supervisor after a worker finishes - remove them.
-            % We want to be able to start the same replication but with
-            % eventually different values for parameters that don't
-            % contribute to its ID calculation.
-            case erlang:system_info(otp_release) < "R14B02" of
-            true ->
-                spawn(fun() ->
-                    _ = supervisor:delete_child(couch_replicator_job_sup, BaseId ++ Ext)
-                end);
-            false ->
-                ok
-            end;
-        #rep_state{} ->
-            ok
-        end,
+    [{{DbName, DocId}, _RepId}] ->
         true = ets:delete(?DOC_TO_REP, {DbName, DocId});
     _ ->
         ok
     end.
 
-
 rep_doc_deleted(DbName, DocId) ->
     case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
     [{{DbName, DocId}, RepId}] ->
@@ -637,29 +380,13 @@ replication_error(State, RepId, Error) ->
         maybe_retry_replication(RepState, Error, State)
     end.
 
-maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId, db_name = DbName},
-        max_retries = MaxRetries
-    } = RepState,
-    couch_replicator:cancel_replication(RepId),
-    true = ets:delete(?REP_TO_STATE, RepId),
-    true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
-    couch_log:error("Error in replication `~s` (triggered by document `~s`): ~s"
-        "~nReached maximum retry attempts (~p).",
-        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
-    State;
-
-maybe_retry_replication(RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId} = Rep
-    } = RepState,
-    #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
-    true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
-    couch_log:error("Error in replication `~s` (triggered by document `~s`): ~s"
-        "~nRestarting replication in ~p seconds.",
-        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
-    Pid = spawn_link(?MODULE, start_replication, [Rep, Wait]),
+maybe_retry_replication(#rep{id = RepId, doc_id = DocId} = Rep, Error, State) ->
+    ErrorBinary = couch_replicator_utils:rep_error_to_binary(Error),
+    couch_log:error("Error in replication `~s` (triggered by `~s`): ~s",
+        [pp_rep_id(RepId), DocId, ErrorBinary]),
+    % NV: TODO: Removed repeated failed restarts handling. Will do that some
+    % other way in scheduler code
+    Pid = spawn_link(?MODULE, start_replication, [Rep]),
     State#state{
         rep_start_pids = [{rep_start, Pid} | State#state.rep_start_pids]
     }.
@@ -673,8 +400,8 @@ stop_all_replications() ->
         end,
         ok, ?DOC_TO_REP),
     true = ets:delete_all_objects(?REP_TO_STATE),
-    true = ets:delete_all_objects(?DOC_TO_REP),
-    true = ets:delete_all_objects(?DB_TO_SEQ).
+    true = ets:delete_all_objects(?DOC_TO_REP).
+
 
 clean_up_replications(DbName) ->
     ets:foldl(
@@ -685,133 +412,8 @@ clean_up_replications(DbName) ->
            ({_,_}, _) ->
             ok
         end,
-        ok, ?DOC_TO_REP),
-    ets:delete(?DB_TO_SEQ,DbName).
-
-
-update_rep_doc(RepDbName, RepDocId, KVs) ->
-    update_rep_doc(RepDbName, RepDocId, KVs, 1).
-
-update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
-    try
-        case open_rep_doc(RepDbName, RepDocId) of
-            {ok, LastRepDoc} ->
-                update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
-            _ ->
-                ok
-        end
-    catch
-        throw:conflict ->
-            Msg = "Conflict when updating replication document `~s`. Retrying.",
-            couch_log:error(Msg, [RepDocId]),
-            ok = timer:sleep(random:uniform(erlang:min(128, Wait)) * 100),
-            update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
-    end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
-    NewRepDocBody = lists:foldl(
-        fun({K, undefined}, Body) ->
-                lists:keydelete(K, 1, Body);
-           ({<<"_replication_state">> = K, State} = KV, Body) ->
-                case get_json_value(K, Body) of
-                State ->
-                    Body;
-                _ ->
-                    Body1 = lists:keystore(K, 1, Body, KV),
-                    lists:keystore(
-                        <<"_replication_state_time">>, 1, Body1,
-                        {<<"_replication_state_time">>, timestamp()})
-                end;
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody, KVs),
-    case NewRepDocBody of
-    RepDocBody ->
-        ok;
-    _ ->
-        % Might not succeed - when the replication doc is deleted right
-        % before this update (not an error, ignore).
-        save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
-    end.
-
-open_rep_doc(DbName, DocId) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
-    try
-        couch_db:open_doc(Db, DocId, [ejson_body])
-    after
-        couch_db:close(Db)
-    end.
-
-save_rep_doc(DbName, Doc) ->
-    {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
-    try
-        couch_db:update_doc(Db, Doc, [])
-    after
-        couch_db:close(Db)
-    end.
-
-% RFC3339 timestamps.
-% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
-timestamp() ->
-    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
-    UTime = erlang:universaltime(),
-    LocalTime = calendar:universal_time_to_local_time(UTime),
-    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
-        calendar:datetime_to_gregorian_seconds(UTime),
-    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
-    iolist_to_binary(
-        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
-            [Year, Month, Day, Hour, Min, Sec,
-                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
-
-zone(Hr, Min) when Hr >= 0, Min >= 0 ->
-    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
-zone(Hr, Min) ->
-    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
-
-
-ensure_rep_db_exists(<<"shards/", _/binary>>=DbName) ->
-    ensure_rep_ddoc_exists(DbName),
-    ok;
-ensure_rep_db_exists(DbName) ->
-    Db = case couch_db:open_int(DbName, [?CTX, sys_db, nologifmissing]) of
-        {ok, Db0} ->
-            Db0;
-        _Error ->
-            {ok, Db0} = couch_db:create(DbName, [?CTX, sys_db]),
-            Db0
-    end,
-    ensure_rep_ddoc_exists(DbName),
-    {ok, Db}.
-
-ensure_rep_ddoc_exists(RepDb) ->
-    DDocId = <<"_design/_replicator">>,
-    case mem3:belongs(RepDb, DDocId) of
-	true ->
-	    ensure_rep_ddoc_exists(RepDb, DDocId);
-	false ->
-	    ok
-    end.
+        ok, ?DOC_TO_REP).
 
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
-    case open_rep_doc(RepDb, DDocId) of
-        {ok, _Doc} ->
-            ok;
-        _ ->
-            DDoc = couch_doc:from_json_obj({[
-                {<<"_id">>, DDocId},
-                {<<"language">>, <<"javascript">>},
-                {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-            ]}),
-            try
-                {ok, _} = save_rep_doc(RepDb, DDoc)
-            catch
-                throw:conflict ->
-                    % NFC what to do about this other than
-                    % not kill the process.
-                    ok
-            end
-    end.
 
 % pretty-print replication id
 pp_rep_id(#rep{id = RepId}) ->
@@ -829,129 +431,11 @@ rep_state(RepId) ->
     end.
 
 
-error_reason({error, {Error, Reason}})
-  when is_atom(Error), is_binary(Reason) ->
-    io_lib:format("~s: ~s", [Error, Reason]);
-error_reason({error, Reason}) ->
-    Reason;
-error_reason(Reason) ->
-    Reason.
-
-
-retries_value("infinity") ->
-    infinity;
-retries_value(Value) ->
-    list_to_integer(Value).
-
-
-state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
-    Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
-    case Left of
-    infinity ->
-        State#rep_state{wait = Wait2};
-    _ ->
-        State#rep_state{retries_left = Left - 1, wait = Wait2}
-    end.
-
-
-before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
-    #user_ctx{roles = Roles, name = Name} = UserCtx,
-    case lists:member(<<"_replicator">>, Roles) of
-    true ->
-        Doc;
-    false ->
-        case couch_util:get_value(?OWNER, Body) of
-        undefined ->
-            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-        Name ->
-            Doc;
-        Other ->
-            case (catch couch_db:check_is_admin(Db)) of
-            ok when Other =:= null ->
-                Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
-            ok ->
-                Doc;
-            _ ->
-                throw({forbidden, <<"Can't update replication documents",
-                    " from other users.">>})
-            end
-        end
-    end.
-
-
-after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
-    Doc;
-after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
-    #user_ctx{name = Name} = UserCtx,
-    case (catch couch_db:check_is_admin(Db)) of
-    ok ->
-        Doc;
-    _ ->
-        case couch_util:get_value(?OWNER, Body) of
-        Name ->
-            Doc;
-        _Other ->
-            Source = strip_credentials(couch_util:get_value(<<"source">>,
-Body)),
-            Target = strip_credentials(couch_util:get_value(<<"target">>,
-Body)),
-            NewBody0 = ?replace(Body, <<"source">>, Source),
-            NewBody = ?replace(NewBody0, <<"target">>, Target),
-            #doc{revs = {Pos, [_ | Revs]}} = Doc,
-            NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
-            NewRevId = couch_db:new_revid(NewDoc),
-            NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
-        end
-    end.
-
+% NV: TODO: This function was moved to couch_replicator_docs
+% but it is still called from fabric_doc_update. Keep it here for now
+% later, update fabric to call couch_replicator_docs instead
+before_doc_update(Doc, Db) ->
+    couch_replicator_docs:before_doc_update(Doc, Db).
 
-strip_credentials(undefined) ->
-    undefined;
-strip_credentials(Url) when is_binary(Url) ->
-    re:replace(Url,
-        "http(s)?://(?:[^:]+):[^@]+@(.*)$",
-        "http\\1://\\2",
-        [{return, binary}]);
-strip_credentials({Props}) ->
-    {lists:keydelete(<<"oauth">>, 1, Props)}.
-
-scan_all_dbs(Server) when is_pid(Server) ->
-    Root = config:get("couchdb", "database_dir", "."),
-    NormRoot = couch_util:normpath(Root),
-    filelib:fold_files(Root, "_replicator(\\.[0-9]{10,})?.couch$", true,
-        fun(Filename, _) ->
-	    % shamelessly stolen from couch_server.erl
-            NormFilename = couch_util:normpath(Filename),
-            case NormFilename -- NormRoot of
-                [$/ | RelativeFilename] -> ok;
-                RelativeFilename -> ok
-            end,
-            DbName = ?l2b(filename:rootname(RelativeFilename, ".couch")),
-	    gen_server:cast(Server, {resume_scan, DbName}),
-	    ok
-	end, ok).
-
-is_replicator_db(DbName) ->
-    <<"_replicator">> =:= couch_db:dbname_suffix(DbName).
-
-get_json_value(Key, Props) ->
-    get_json_value(Key, Props, undefined).
-
-get_json_value(Key, Props, Default) when is_atom(Key) ->
-    Ref = make_ref(),
-    case couch_util:get_value(Key, Props, Ref) of
-        Ref ->
-            couch_util:get_value(?l2b(atom_to_list(Key)), Props, Default);
-        Else ->
-            Else
-    end;
-get_json_value(Key, Props, Default) when is_binary(Key) ->
-    Ref = make_ref(),
-    case couch_util:get_value(Key, Props, Ref) of
-        Ref ->
-            couch_util:get_value(list_to_atom(?b2l(Key)), Props, Default);
-        Else ->
-            Else
-    end.
+after_doc_read(Doc, Db) ->
+    couch_replicator_doc:after_doc_read(Doc, Db).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/5218edf8/src/couch_replicator_scheduler_job.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler_job.erl b/src/couch_replicator_scheduler_job.erl
index 5240ddd..ad600d1 100644
--- a/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator_scheduler_job.erl
@@ -91,7 +91,7 @@ start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
                              [RepChildId, Pid, Source, Target]),
             {ok, Pid};
         {error, Reason} ->
-            couch_log:warn("failed to start replication `~s` (`~s` -> `~s`)",
+            couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)",
                            [RepChildId, Source, Target]),
             {error, Reason}
     end.


Mime
View raw message