couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vatam...@apache.org
Subject [couchdb] 07/09: Implement replication document processor
Date Wed, 19 Apr 2017 01:53:51 GMT
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit df18550db5f79119c82079b8ea5e20f689f3483e
Author: Nick Vatamaniuc <vatamane@apache.org>
AuthorDate: Fri May 13 18:12:53 2016 -0400

    Implement replication document processor
    
    Document processor listens for `_replicator` db document updates, parses those
    changes then tries to add replication jobs to the scheduler.
    
    Listening for changes happens in `couch_multidb_changes module`. That module is
    generic and is set up to listen to shards with `_replicator` suffix by
    `couch_replicator_db_changes`. Updates are then passed to the document
    processor's `process_change/2` function.
    
    Document replication ID calculation, which can involve fetching filter code
    from the source DB, and addition to the scheduler, is done in a separate
    worker process: `couch_replicator_doc_processor_worker`.
    
    Before couch replicator manager did most of this work. There are a few
    improvement over previous implementation:
    
     * Invalid (malformed) replication documents are immediately failed and will
     not be continuously retried.
    
     * Replication manager message queue backups is unfortunately a common issue
     in production. This is because processing document updates is a serial
     (blocking)  operation. Most of that blocking code was moved to separate worker
     processes.
    
     * Failing filter fetches have an exponential backoff.
    
     * Replication documents don't have to be deleted first then re-added in order
     update the replication. Document processor on update will compare new and
     previous replication related document fields and update the replication job
     if those changed. Users can freely update unlrelated (custom) fields in their
     replication docs.
    
     * In case of filtered replications using custom functions, document processor
     will periodically check if filter code on the source has changed. Filter code
     contents is factored into replication ID calculation. If filter code changes
     replication ID will change as well.
    
    Jira: COUCHDB-3324
---
 .../src/couch_replicator_db_changes.erl            | 108 +++
 .../src/couch_replicator_doc_processor.erl         | 972 +++++++++++++++++++++
 .../src/couch_replicator_doc_processor_worker.erl  | 276 ++++++
 3 files changed, 1356 insertions(+)

diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
new file mode 100644
index 0000000..92b0222
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_db_changes.erl
@@ -0,0 +1,108 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_db_changes).
+
+-behaviour(gen_server).
+
+-export([
+   start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3
+]).
+
+-export([
+   notify_cluster_event/2
+]).
+
+-record(state, {
+   event_listener :: pid(),
+   mdb_changes :: pid() | nil
+}).
+
+
+-spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
+notify_cluster_event(Server, {cluster, _} = Event) ->
+    gen_server:cast(Server, Event).
+
+
+-spec start_link() ->
+    {ok, pid()} | ignore | {error, any()}.
+start_link() ->
+    gen_server:start_link(?MODULE, [], []).
+
+
+init([]) ->
+    EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE,
+        notify_cluster_event, [self()]),
+    State = #state{event_listener = EvtPid, mdb_changes = nil},
+    case couch_replicator_clustering:is_stable() of
+        true ->
+            {ok, restart_mdb_changes(State)};
+        false ->
+            {ok, State}
+    end.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+handle_call(_Msg, _From, State) ->
+    {reply, {error, invalid_call}, State}.
+
+
+handle_cast({cluster, unstable}, State) ->
+    {noreply, stop_mdb_changes(State)};
+
+handle_cast({cluster, stable}, State) ->
+    {noreply, restart_mdb_changes(State)}.
+
+
+handle_info(_Msg, State) ->
+    {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+-spec restart_mdb_changes(#state{}) -> #state{}.
+restart_mdb_changes(#state{mdb_changes = nil} = State) ->
+    Suffix = <<"_replicator">>,
+    CallbackMod = couch_replicator_doc_processor,
+    Options = [skip_ddocs],
+    {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil,
+        Options),
+    couch_stats:increment_counter([couch_replicator, db_scans]),
+    couch_log:notice("Started replicator db changes listener ~p", [Pid]),
+    State#state{mdb_changes = Pid};
+
+restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
+    restart_mdb_changes(stop_mdb_changes(State)).
+
+
+-spec stop_mdb_changes(#state{}) -> #state{}.
+stop_mdb_changes(#state{mdb_changes = nil} = State) ->
+    State;
+stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
+    couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
+    unlink(Pid),
+    exit(Pid, kill),
+    State#state{mdb_changes = nil}.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
new file mode 100644
index 0000000..5ca3246
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -0,0 +1,972 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_doc_processor).
+
+-behaviour(gen_server).
+-behaviour(couch_multidb_changes).
+
+-export([
+    start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3
+]).
+
+-export([
+    db_created/2,
+    db_deleted/2,
+    db_found/2,
+    db_change/3
+]).
+
+-export([
+    docs/1,
+    doc/2,
+    doc_lookup/3,
+    update_docs/0,
+    get_worker_ref/1,
+    notify_cluster_event/2
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-import(couch_replicator_utils, [
+    get_json_value/2,
+    get_json_value/3
+]).
+
+-define(DEFAULT_UPDATE_DOCS, false).
+-define(ERROR_MAX_BACKOFF_EXPONENT, 12).  % ~ 1 day on average
+-define(TS_DAY_SEC, 86400).
+-define(INITIAL_BACKOFF_EXPONENT, 64).
+-define(MIN_FILTER_DELAY_SEC, 60).
+
+-type filter_type() ::  nil | view | user | docids | mango.
+-type repstate() :: initializing | error | scheduled.
+
+
+-record(rdoc, {
+    id :: db_doc_id() | '_' | {any(), '_'},
+    state :: repstate() | '_',
+    rep :: #rep{} | nil | '_',
+    rid :: rep_id() | nil | '_',
+    filter :: filter_type() | '_',
+    info :: binary() | nil | '_',
+    errcnt :: non_neg_integer() | '_',
+    worker :: reference() | nil | '_',
+    last_updated :: erlang:timestamp() | '_'
+}).
+
+
+% couch_multidb_changes API callbacks
+
+db_created(DbName, Server) ->
+    couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    Server.
+
+
+db_deleted(DbName, Server) ->
+    couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
+    ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
+    Server.
+
+
+db_found(DbName, Server) ->
+    couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    Server.
+
+
+db_change(DbName, {ChangeProps} = Change, Server) ->
+    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+    try
+        ok = process_change(DbName, Change)
+    catch
+    _Tag:Error ->
+        {RepProps} = get_json_value(doc, ChangeProps),
+        DocId = get_json_value(<<"_id">>, RepProps),
+        couch_replicator_docs:update_failed(DbName, DocId, Error)
+    end,
+    Server.
+
+
+-spec get_worker_ref(db_doc_id()) -> reference() | nil.
+get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
+    case ets:lookup(?MODULE, {DbName, DocId}) of
+        [#rdoc{worker = WRef}] when is_reference(WRef) ->
+            WRef;
+        [#rdoc{worker = nil}] ->
+            nil;
+        [] ->
+            nil
+    end.
+
+
+% Cluster membership change notification callback
+-spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
+notify_cluster_event(Server, {cluster, _} = Event) ->
+    gen_server:cast(Server, Event).
+
+
+process_change(DbName, {Change}) ->
+    {RepProps} = JsonRepDoc = get_json_value(doc, Change),
+    DocId = get_json_value(<<"_id">>, RepProps),
+    Owner = couch_replicator_clustering:owner(DbName, DocId),
+    Id = {DbName, DocId},
+    case {Owner, get_json_value(deleted, Change, false)} of
+    {_, true} ->
+        ok = gen_server:call(?MODULE, {removed, Id}, infinity);
+    {unstable, false} ->
+        couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
+    {ThisNode, false} when ThisNode =:= node() ->
+        case get_json_value(<<"_replication_state">>, RepProps) of
+        undefined ->
+            ok = process_updated(Id, JsonRepDoc);
+        <<"triggered">> ->
+            maybe_remove_state_fields(DbName, DocId),
+            ok = process_updated(Id, JsonRepDoc);
+        <<"completed">> ->
+            ok = gen_server:call(?MODULE, {completed, Id}, infinity);
+        <<"error">> ->
+            % Handle replications started from older versions of replicator
+            % which wrote transient errors to replication docs
+            maybe_remove_state_fields(DbName, DocId),
+            ok = process_updated(Id, JsonRepDoc);
+        <<"failed">> ->
+            ok
+        end;
+    {Owner, false} ->
+        ok
+    end,
+    ok.
+
+
+maybe_remove_state_fields(DbName, DocId) ->
+    case update_docs() of
+        true ->
+            ok;
+        false ->
+            couch_replicator_docs:remove_state_fields(DbName, DocId)
+    end.
+
+
+process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
+    % Parsing replication doc (but not calculating the id) could throw an
+    % exception which would indicate this document is malformed. This exception
+    % should propagate to db_change function and will be recorded as permanent
+    % failure in the document. User will have to update the documet to fix the
+    % problem.
+    Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
+    Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
+    Filter = case couch_replicator_filters:parse(Rep#rep.options) of
+    {ok, nil} ->
+        nil;
+    {ok, {user, _FName, _QP}} ->
+        user;
+    {ok, {view, _FName, _QP}} ->
+        view;
+    {ok, {docids, _DocIds}} ->
+        docids;
+    {ok, {mango, _Selector}} ->
+        mango;
+    {error, FilterError} ->
+        throw(FilterError)
+    end,
+    gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity).
+
+
+% Doc processor gen_server API and callbacks
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [],  []).
+
+
+init([]) ->
+    ?MODULE = ets:new(?MODULE, [set, named_table, {keypos, #rdoc.id}]),
+    couch_replicator_clustering:link_cluster_event_listener(?MODULE,
+        notify_cluster_event, [self()]),
+    {ok, nil}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+handle_call({updated, Id, Rep, Filter}, _From, State) ->
+    ok = updated_doc(Id, Rep, Filter),
+    {reply, ok, State};
+
+handle_call({removed, Id}, _From, State) ->
+    ok = removed_doc(Id),
+    {reply, ok, State};
+
+handle_call({completed, Id}, _From, State) ->
+    true = ets:delete(?MODULE, Id),
+    {reply, ok, State};
+
+handle_call({clean_up_replications, DbName}, _From, State) ->
+    ok = removed_db(DbName),
+    {reply, ok, State}.
+
+handle_cast({cluster, unstable}, State) ->
+    % Ignoring unstable state transition
+    {noreply, State};
+
+handle_cast({cluster, stable}, State) ->
+    % Membership changed recheck all the replication document ownership
+    nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
+    {noreply, State};
+
+handle_cast(Msg, State) ->
+    {stop, {error, unexpected_message, Msg}, State}.
+
+
+handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
+        result = Res}}, State) ->
+    ok = worker_returned(Ref, Id, Res),
+    {noreply, State};
+
+handle_info(_Msg, State) ->
+    {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+% Doc processor gen_server private helper functions
+
+% Handle doc update -- add to ets, then start a worker to try to turn it into
+% a replication job. In most cases it will succeed quickly but for filtered
+% replications or if there are duplicates, it could take longer
+% (theoretically indefinitely) until a replication could be started. Before
+% adding replication job, make sure to delete all old jobs associated with
+% same document.
+-spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok.
+updated_doc(Id, Rep, Filter) ->
+    case normalize_rep(current_rep(Id)) == normalize_rep(Rep) of
+        false ->
+            removed_doc(Id),
+            Row = #rdoc{
+                id = Id,
+                state = initializing,
+                rep = Rep,
+                rid = nil,
+                filter = Filter,
+                info = nil,
+                errcnt = 0,
+                worker = nil,
+                last_updated = os:timestamp()
+            },
+            true = ets:insert(?MODULE, Row),
+            ok = maybe_start_worker(Id);
+        true ->
+            ok
+    end.
+
+
+% Return current #rep{} record if any. If replication hasn't been submitted
+% to the scheduler yet, #rep{} record will be in the document processor's
+% ETS table, otherwise query scheduler for the #rep{} record.
+-spec current_rep({binary(), binary()}) -> #rep{} | nil.
+current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
+    case ets:lookup(?MODULE, {DbName, DocId}) of
+        [] ->
+            nil;
+        [#rdoc{state = scheduled, rep = nil, rid = JobId}] ->
+            % When replication is scheduled, #rep{} record which can be quite
+            % large compared to other bits in #rdoc is removed in order to avoid
+            % having to keep 2 copies of it. So have to fetch it from the
+            % scheduler.
+            couch_replicator_scheduler:rep_state(JobId);
+        [#rdoc{rep = Rep}] ->
+            Rep
+    end.
+
+
+% Normalize a #rep{} record such that it doesn't contain time dependent fields
+% pids (like httpc pools), and options / props are sorted. This function would
+% used during comparisons.
+-spec normalize_rep(#rep{} | nil) -> #rep{} | nil.
+normalize_rep(nil) ->
+    nil;
+
+normalize_rep(#rep{} = Rep)->
+    #rep{
+        source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
+        target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
+        options = Rep#rep.options,  % already sorted in make_options/1
+        type = Rep#rep.type,
+        view = Rep#rep.view,
+        doc_id = Rep#rep.doc_id,
+        db_name = Rep#rep.db_name
+    }.
+
+
+-spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok.
+worker_returned(Ref, Id, {ok, RepId}) ->
+    case ets:lookup(?MODULE, Id) of
+    [#rdoc{worker = Ref} = Row] ->
+        Row0 = Row#rdoc{
+            state = scheduled,
+            errcnt = 0,
+            worker = nil,
+            last_updated = os:timestamp()
+        },
+        NewRow = case Row0 of
+            #rdoc{rid = RepId, filter = user} ->
+                % Filtered replication id didn't change.
+                Row0;
+            #rdoc{rid = nil, filter = user} ->
+                % Calculated new replication id for a filtered replication. Make
+                % sure to schedule another check as filter code could change.
+                % Replication starts could have been failing, so also clear
+                % error count.
+                Row0#rdoc{rid = RepId};
+            #rdoc{rid = OldRepId, filter = user} ->
+                % Replication id of existing replication job with filter has
+                % changed. Remove old replication job from scheduler and
+                % schedule check to check for future changes.
+                ok = couch_replicator_scheduler:remove_job(OldRepId),
+                Msg = io_lib:format("Replication id changed: ~p -> ~p", [
+                    OldRepId, RepId]),
+                Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)};
+            #rdoc{rid = nil} ->
+                % Calculated new replication id for non-filtered replication.
+                % Remove replication doc body, after this we won't need it
+                % anymore.
+                Row0#rdoc{rep=nil, rid=RepId, info=nil}
+        end,
+        true = ets:insert(?MODULE, NewRow),
+        ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId),
+        ok = maybe_start_worker(Id);
+    _ ->
+        ok  % doc could have been deleted, ignore
+    end,
+    ok;
+
+worker_returned(_Ref, _Id, ignore) ->
+    ok;
+
+worker_returned(Ref, Id, {temporary_error, Reason}) ->
+    case ets:lookup(?MODULE, Id) of
+    [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
+        NewRow = Row#rdoc{
+            rid = nil,
+            state = error,
+            info = Reason,
+            errcnt = ErrCnt + 1,
+            worker = nil,
+            last_updated = os:timestamp()
+        },
+        true = ets:insert(?MODULE, NewRow),
+        ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason),
+        ok = maybe_start_worker(Id);
+    _ ->
+        ok  % doc could have been deleted, ignore
+    end,
+    ok;
+
+worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
+    case ets:lookup(?MODULE, Id) of
+    [#rdoc{worker = Ref}] ->
+        true = ets:delete(?MODULE, Id);
+    _ ->
+        ok  % doc could have been deleted, ignore
+    end,
+    ok.
+
+
+-spec maybe_update_doc_error(#rep{}, any()) -> ok.
+maybe_update_doc_error(Rep, Reason) ->
+    case update_docs() of
+        true ->
+            couch_replicator_docs:update_error(Rep, Reason);
+        false ->
+            ok
+    end.
+
+
+-spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
+maybe_update_doc_triggered(Rep, RepId) ->
+    case update_docs() of
+        true ->
+            couch_replicator_docs:update_triggered(Rep, RepId);
+        false ->
+            ok
+    end.
+
+
+-spec error_backoff(non_neg_integer()) -> seconds().
+error_backoff(ErrCnt) ->
+    Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT),
+    % ErrCnt is the exponent here. The reason 64 is used is to start at
+    % 64 (about a minute) max range. Then first backoff would be 30 sec
+    % on average. Then 1 minute and so on.
+    random:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
+
+
+-spec filter_backoff() -> seconds().
+filter_backoff() ->
+    Total = ets:info(?MODULE, size),
+    % This value scaled by the number of replications. If the are a lot of
+    % them wait is longer, but not more than a day (?TS_DAY_SEC). If there
+    % are just few, wait is shorter, starting at about 30 seconds. `2 *` is
+    % used since the expected wait would then be 0.5 * Range so it is easier
+    % to see the average wait. `1 +` is used because random:uniform only
+    % accepts >= 1 values and crashes otherwise.
+    Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC),
+    ?MIN_FILTER_DELAY_SEC + random:uniform(round(Range)).
+
+
+% Document removed from db -- clear ets table and remove all scheduled jobs
+-spec removed_doc(db_doc_id()) -> ok.
+removed_doc({DbName, DocId} = Id) ->
+    ets:delete(?MODULE, Id),
+    RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId),
+    lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
+
+
+% Whole db shard is gone -- remove all its ets rows and stop jobs
+-spec removed_db(binary()) -> ok.
+removed_db(DbName) ->
+    EtsPat = #rdoc{id = {DbName, '_'}, _ = '_'},
+    ets:match_delete(?MODULE, EtsPat),
+    RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName),
+    lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
+
+
+% Spawn a worker process which will attempt to calculate a replication id, then
+% start a replication. Returns a process monitor reference. The worker is
+% guaranteed to exit with rep_start_result() type only.
+-spec maybe_start_worker(db_doc_id()) -> ok.
+maybe_start_worker(Id) ->
+    case ets:lookup(?MODULE, Id) of
+    [] ->
+        ok;
+    [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user ->
+        ok;
+    [#rdoc{rep = Rep} = Doc] ->
+        % For any replication with a user created filter function, periodically
+        % (every `filter_backoff/0` seconds) to try to see if the user filter
+        % has changed by using a worker to check for changes. When the worker
+        % returns check if replication ID has changed. If it hasn't keep
+        % checking (spawn another worker and so on). If it has stop the job
+        % with the old ID and continue checking.
+        Wait = get_worker_wait(Doc),
+        Ref = make_ref(),
+        true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
+        couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
+        ok
+    end.
+
+
+-spec get_worker_wait(#rdoc{}) -> seconds().
+get_worker_wait(#rdoc{state = scheduled, filter = user}) ->
+    filter_backoff();
+get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
+    error_backoff(ErrCnt);
+get_worker_wait(#rdoc{state = initializing}) ->
+    0.
+
+
+-spec update_docs() -> boolean().
+update_docs() ->
+    config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS).
+
+
+% _scheduler/docs HTTP endpoint helpers
+
+-spec docs([atom()]) -> [{[_]}] | [].
+docs(States) ->
+    HealthThreshold = couch_replicator_scheduler:health_threshold(),
+    ets:foldl(fun(RDoc, Acc) ->
+        case ejson_doc(RDoc, HealthThreshold) of
+            nil ->
+                Acc;  % Could have been deleted if job just completed
+            {Props} = EJson ->
+                {state, DocState} = lists:keyfind(state, 1, Props),
+                case ejson_doc_state_filter(DocState, States) of
+                    true ->
+                        [EJson | Acc];
+                    false ->
+                        Acc
+                end
+        end
+    end, [], ?MODULE).
+
+
+-spec doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
+doc(Db, DocId) ->
+    HealthThreshold = couch_replicator_scheduler:health_threshold(),
+    Res = (catch ets:foldl(fun(RDoc, nil) ->
+        {Shard, RDocId} = RDoc#rdoc.id,
+        case {mem3:dbname(Shard), RDocId} of
+            {Db, DocId} ->
+                throw({found, ejson_doc(RDoc, HealthThreshold)});
+            {_OtherDb, _OtherDocId} ->
+                nil
+        end
+    end, nil, ?MODULE)),
+    case Res of
+        {found, DocInfo} ->
+            {ok, DocInfo};
+        nil ->
+            {error, not_found}
+    end.
+
+
+-spec doc_lookup(binary(), binary(), integer()) ->
+    {ok, {[_]}} | {error, not_found}.
+doc_lookup(Db, DocId, HealthThreshold) ->
+    case ets:lookup(?MODULE, {Db, DocId}) of
+        [#rdoc{} = RDoc] ->
+            {ok, ejson_doc(RDoc, HealthThreshold)};
+        [] ->
+            {error, not_found}
+    end.
+
+
+-spec ejson_state_info(binary() | nil) -> binary() | null.
+ejson_state_info(nil) ->
+    null;
+ejson_state_info(Info) when is_binary(Info) ->
+    Info;
+ejson_state_info(Info) ->
+    couch_replicator_utils:rep_error_to_binary(Info).
+
+
+-spec ejson_rep_id(rep_id() | nil) -> binary() | null.
+ejson_rep_id(nil) ->
+    null;
+ejson_rep_id({BaseId, Ext}) ->
+    iolist_to_binary([BaseId, Ext]).
+
+
+-spec ejson_doc(#rdoc{}, non_neg_integer()) -> {[_]} | nil.
+ejson_doc(#rdoc{state = scheduled} = RDoc, HealthThreshold) ->
+    #rdoc{id = {DbName, DocId}, rid = RepId} = RDoc,
+    JobProps = couch_replicator_scheduler:job_summary(RepId, HealthThreshold),
+    case JobProps of
+        nil ->
+            nil;
+        [{_, _} | _] ->
+            {[
+                {doc_id, DocId},
+                {database, DbName},
+                {id, ejson_rep_id(RepId)},
+                {node, node()} | JobProps
+            ]}
+    end;
+
+ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
+    #rdoc{
+       id = {DbName, DocId},
+       info = StateInfo,
+       rid = RepId,
+       errcnt = ErrorCount,
+       last_updated = StateTime,
+       rep = Rep
+    } = RDoc,
+    {[
+        {doc_id, DocId},
+        {database, DbName},
+        {id, ejson_rep_id(RepId)},
+        {state, RepState},
+        {info, ejson_state_info(StateInfo)},
+        {error_count, ErrorCount},
+        {node, node()},
+        {last_updated, couch_replicator_utils:iso8601(StateTime)},
+        {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
+    ]}.
+
+
+-spec ejson_doc_state_filter(atom(), [atom()]) -> boolean().
+ejson_doc_state_filter(_DocState, []) ->
+    true;
+ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
+    lists:member(State, States).
+
+
+-spec cluster_membership_foldl(#rdoc{}, nil) -> nil.
+cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
+    case couch_replicator_clustering:owner(DbName, DocId) of
+        unstable ->
+            nil;
+        ThisNode when ThisNode =:= node() ->
+            nil;
+        OtherNode ->
+            Msg = "Replication doc ~p:~p with id ~p usurped by node ~p",
+            couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]),
+            removed_doc(Id),
+            nil
+    end.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(DB, <<"db">>).
+-define(DOC1, <<"doc1">>).
+-define(DOC2, <<"doc2">>).
+-define(R1, {"1", ""}).
+-define(R2, {"2", ""}).
+
+
+doc_processor_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_bad_change(),
+            t_regular_change(),
+            t_change_with_existing_job(),
+            t_deleted_change(),
+            t_triggered_change(),
+            t_completed_change(),
+            t_active_replication_completed(),
+            t_error_change(),
+            t_failed_change(),
+            t_change_for_different_node(),
+            t_change_when_cluster_unstable(),
+            t_ejson_docs(),
+            t_cluster_membership_foldl()
+        ]
+    }.
+
+
+% Can't parse replication doc, so should write failure state to document.
+t_bad_change() ->
+    ?_test(begin
+        ?assertEqual(acc, db_change(?DB, bad_change(), acc)),
+        ?assert(updated_doc_with_failed_state())
+    end).
+
+
+% Regular change, parse to a #rep{} and then add job.
+t_regular_change() ->
+    ?_test(begin
+        mock_existing_jobs_lookup([]),
+        ?assertEqual(ok, process_change(?DB, change())),
+        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
+        ?assert(started_worker({?DB, ?DOC1}))
+    end).
+
+
+% Regular change, parse to a #rep{} and then add job but there is already
+% a running job with same Id found.
+t_change_with_existing_job() ->
+    ?_test(begin
+        mock_existing_jobs_lookup([test_rep(?R2)]),
+        ?assertEqual(ok, process_change(?DB, change())),
+        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
+        ?assert(started_worker({?DB, ?DOC1}))
+    end).
+
+
+% Change is a deletion, and job is running, so remove job.
+t_deleted_change() ->
+    ?_test(begin
+        mock_existing_jobs_lookup([test_rep(?R2)]),
+        ?assertEqual(ok, process_change(?DB, deleted_change())),
+        ?assert(removed_job(?R2))
+    end).
+
+
+% Change is in `triggered` state. Remove legacy state and add job.
+t_triggered_change() ->
+    ?_test(begin
+        mock_existing_jobs_lookup([]),
+        ?assertEqual(ok, process_change(?DB, change(<<"triggered">>))),
+        ?assert(removed_state_fields()),
+        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
+        ?assert(started_worker({?DB, ?DOC1}))
+    end).
+
+
+% Change is in `completed` state, so skip over it.
+t_completed_change() ->
+    ?_test(begin
+        ?assertEqual(ok, process_change(?DB, change(<<"completed">>))),
+        ?assert(did_not_remove_state_fields()),
+        ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
+        ?assert(did_not_spawn_worker())
+    end).
+
+
+% Completed change comes for what used to be an active job. In this case
+% remove entry from doc_processor's ets (because there is no linkage or
+% callback mechanism for scheduler to tell doc_processsor a replication just
+% completed).
+t_active_replication_completed() ->
+    ?_test(begin
+        mock_existing_jobs_lookup([]),
+        ?assertEqual(ok, process_change(?DB, change())),
+        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
+        ?assertEqual(ok, process_change(?DB, change(<<"completed">>))),
+        ?assert(did_not_remove_state_fields()),
+        ?assertNot(ets:member(?MODULE, {?DB, ?DOC1}))
+    end).
+
+
+% Change is in `error` state. Remove legacy state and retry
+% running the job. This state was used for transient erorrs which are not
+% written to the document anymore.
+t_error_change() ->
+    ?_test(begin
+        mock_existing_jobs_lookup([]),
+        ?assertEqual(ok, process_change(?DB, change(<<"error">>))),
+        ?assert(removed_state_fields()),
+        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
+        ?assert(started_worker({?DB, ?DOC1}))
+    end).
+
+
+% Change is in `failed` state. This is a terminal state and it will not
+% be tried again, so skip over it.
+t_failed_change() ->
+    ?_test(begin
+        ?assertEqual(ok, process_change(?DB, change(<<"failed">>))),
+        ?assert(did_not_remove_state_fields()),
+        ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
+        ?assert(did_not_spawn_worker())
+    end).
+
+
+% Normal change, but according to cluster ownership algorithm, replication
+% belongs to a different node, so this node should skip it.
+t_change_for_different_node() ->
+   ?_test(begin
+        meck:expect(couch_replicator_clustering, owner, 2, different_node),
+        ?assertEqual(ok, process_change(?DB, change())),
+        ?assert(did_not_spawn_worker())
+   end).
+
+
+% Change handled when cluster is unstable (nodes are added or removed), so
+% job is not added. A rescan will be triggered soon and change will be
+% evaluated again.
+t_change_when_cluster_unstable() ->
+   ?_test(begin
+       meck:expect(couch_replicator_clustering, owner, 2, unstable),
+       ?assertEqual(ok, process_change(?DB, change())),
+       ?assert(did_not_spawn_worker())
+   end).
+
+
+% Check if docs/0 function produces expected ejson after adding a job
+t_ejson_docs() ->
+    ?_test(begin
+        mock_existing_jobs_lookup([]),
+        ?assertEqual(ok, process_change(?DB, change())),
+        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
+        EJsonDocs = docs([]),
+        ?assertMatch([{[_|_]}], EJsonDocs),
+        [{DocProps}] = EJsonDocs,
+        {value, StateTime, DocProps1} = lists:keytake(last_updated, 1,
+            DocProps),
+        ?assertMatch({last_updated, BinVal1} when is_binary(BinVal1),
+            StateTime),
+        {value, StartTime, DocProps2} = lists:keytake(start_time, 1, DocProps1),
+        ?assertMatch({start_time, BinVal2} when is_binary(BinVal2), StartTime),
+        ExpectedProps = [
+            {database, ?DB},
+            {doc_id, ?DOC1},
+            {error_count, 0},
+            {id, null},
+            {info, null},
+            {node, node()},
+            {state, initializing}
+        ],
+        ?assertEqual(ExpectedProps, lists:usort(DocProps2))
+    end).
+
+
+% Check that when cluster membership changes records from doc processor and job
+% scheduler get removed
+t_cluster_membership_foldl() ->
+   ?_test(begin
+        mock_existing_jobs_lookup([test_rep(?R1)]),
+        ?assertEqual(ok, process_change(?DB, change())),
+        meck:expect(couch_replicator_clustering, owner, 2, different_node),
+        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
+        gen_server:cast(?MODULE, {cluster, stable}),
+        meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000),
+        ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
+        ?assert(removed_job(?R1))
+   end).
+
+
+normalize_rep_test_() ->
+    {
+        setup,
+        fun() -> meck:expect(config, get,
+            fun(_, _, Default) -> Default end)
+        end,
+        fun(_) -> meck:unload() end,
+        ?_test(begin
+            EJson1 = {[
+                {<<"source">>, <<"http://host.com/source_db">>},
+                {<<"target">>, <<"local">>},
+                {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
+                {<<"other_field">>, <<"some_value">>}
+            ]},
+            Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
+            EJson2 = {[
+                {<<"other_field">>, <<"unrelated">>},
+                {<<"target">>, <<"local">>},
+                {<<"source">>, <<"http://host.com/source_db">>},
+                {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
+                {<<"other_field2">>, <<"unrelated2">>}
+            ]},
+            Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
+            ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
+        end)
+    }.
+
+
+get_worker_ref_test_() ->
+    {
+        setup,
+        fun() ->
+            ets:new(?MODULE, [named_table, public, {keypos, #rdoc.id}])
+        end,
+        fun(_) -> ets:delete(?MODULE) end,
+        ?_test(begin
+            Id = {<<"db">>, <<"doc">>},
+            ?assertEqual(nil, get_worker_ref(Id)),
+            ets:insert(?MODULE, #rdoc{id = Id, worker = nil}),
+            ?assertEqual(nil, get_worker_ref(Id)),
+            Ref = make_ref(),
+            ets:insert(?MODULE, #rdoc{id = Id, worker = Ref}),
+            ?assertEqual(Ref, get_worker_ref(Id))
+        end)
+    }.
+
+
+% Test helper functions
+
+
+setup() ->
+    meck:expect(couch_log, info, 2, ok),
+    meck:expect(couch_log, notice, 2, ok),
+    meck:expect(couch_log, warning, 2, ok),
+    meck:expect(couch_log, error, 2, ok),
+    meck:expect(config, get, fun(_, _, Default) -> Default end),
+    meck:expect(config, listen_for_changes, 2, ok),
+    meck:expect(couch_replicator_clustering, owner, 2, node()),
+    meck:expect(couch_replicator_clustering, link_cluster_event_listener, 3,
+        ok),
+    meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 4, pid),
+    meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
+    meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
+    meck:expect(couch_replicator_docs, update_failed, 3, ok),
+    {ok, Pid} = start_link(),
+    Pid.
+
+
+teardown(Pid) ->
+    unlink(Pid),
+    exit(Pid, kill),
+    meck:unload().
+
+
+removed_state_fields() ->
+    meck:called(couch_replicator_docs, remove_state_fields, [?DB, ?DOC1]).
+
+
+started_worker(_Id) ->
+    1 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker, 4).
+
+
+removed_job(Id) ->
+    meck:called(couch_replicator_scheduler, remove_job, [test_rep(Id)]).
+
+
+did_not_remove_state_fields() ->
+    0 == meck:num_calls(couch_replicator_docs, remove_state_fields, '_').
+
+
+did_not_spawn_worker() ->
+    0 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker,
+        '_').
+
+updated_doc_with_failed_state() ->
+    1 == meck:num_calls(couch_replicator_docs, update_failed, '_').
+
+
+mock_existing_jobs_lookup(ExistingJobs) ->
+    meck:expect(couch_replicator_scheduler, find_jobs_by_doc,
+        fun(?DB, ?DOC1) -> ExistingJobs end).
+
+
+test_rep(Id) ->
+  #rep{id = Id, start_time = {0, 0, 0}}.
+
+
+change() ->
+    {[
+        {<<"id">>, ?DOC1},
+        {doc, {[
+            {<<"_id">>, ?DOC1},
+            {<<"source">>, <<"src">>},
+            {<<"target">>, <<"tgt">>}
+        ]}}
+    ]}.
+
+
+change(State) ->
+    {[
+        {<<"id">>, ?DOC1},
+        {doc, {[
+            {<<"_id">>, ?DOC1},
+            {<<"source">>, <<"src">>},
+            {<<"target">>, <<"tgt">>},
+            {<<"_replication_state">>, State}
+        ]}}
+    ]}.
+
+
+deleted_change() ->
+    {[
+        {<<"id">>, ?DOC1},
+        {<<"deleted">>, true},
+        {doc, {[
+            {<<"_id">>, ?DOC1},
+            {<<"source">>, <<"src">>},
+            {<<"target">>, <<"tgt">>}
+        ]}}
+    ]}.
+
+
+bad_change() ->
+    {[
+        {<<"id">>, ?DOC2},
+        {doc, {[
+            {<<"_id">>, ?DOC2},
+            {<<"source">>, <<"src">>}
+        ]}}
+    ]}.
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
new file mode 100644
index 0000000..e623795
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
@@ -0,0 +1,276 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_doc_processor_worker).
+
+-export([
+    spawn_worker/4
+]).
+
+-include("couch_replicator.hrl").
+
+-import(couch_replicator_utils, [
+    pp_rep_id/1
+]).
+
+-define(WORKER_TIMEOUT_MSEC, 61000).
+
+
+% Spawn a worker which attempts to calculate replication id then add a
+% replication job to scheduler. This function create a monitor to the worker
+% a worker will then exit with the #doc_worker_result{} record within
+% ?WORKER_TIMEOUT_MSEC timeout period.A timeout is considered a
+%`temporary_error`. Result will be sent as the `Reason` in the {'DOWN',...}
+% message.
+-spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid().
+spawn_worker(Id, Rep, WaitSec, WRef) ->
+    {Pid, _Ref} = spawn_monitor(fun() ->
+        worker_fun(Id, Rep, WaitSec, WRef)
+    end),
+    Pid.
+
+
+% Private functions
+
+-spec worker_fun(db_doc_id(), #rep{}, seconds(), reference()) -> no_return().
+worker_fun(Id, Rep, WaitSec, WRef) ->
+    timer:sleep(WaitSec * 1000),
+    Fun = fun() ->
+        try maybe_start_replication(Id, Rep, WRef) of
+            Res ->
+                exit(Res)
+        catch
+            throw:{filter_fetch_error, Reason} ->
+                exit({temporary_error, Reason});
+            _Tag:Reason ->
+                exit({temporary_error, Reason})
+        end
+    end,
+    {Pid, Ref} = spawn_monitor(Fun),
+    receive
+        {'DOWN', Ref, _, Pid, Result} ->
+            exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
+    after ?WORKER_TIMEOUT_MSEC ->
+        erlang:demonitor(Ref, [flush]),
+        exit(Pid, kill),
+        {DbName, DocId} = Id,
+        TimeoutSec = round(?WORKER_TIMEOUT_MSEC / 1000),
+        Msg = io_lib:format("Replication for db ~p doc ~p failed to start due "
+            "to timeout after ~B seconds", [DbName, DocId, TimeoutSec]),
+        Result = {temporary_error, couch_util:to_binary(Msg)},
+        exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
+    end.
+
+
+% Try to start a replication. Used by a worker. This function should return
+% rep_start_result(), also throws {filter_fetch_error, Reason} if cannot fetch
+% filter.It can also block for an indeterminate amount of time while fetching
+% filter.
+maybe_start_replication(Id, RepWithoutId, WRef) ->
+    Rep = couch_replicator_docs:update_rep_id(RepWithoutId),
+    case maybe_add_job_to_scheduler(Id, Rep, WRef) of
+    ignore ->
+        ignore;
+    {ok, RepId} ->
+        {ok, RepId};
+    {temporary_error, Reason} ->
+        {temporary_error, Reason};
+    {permanent_failure, Reason} ->
+        {DbName, DocId} = Id,
+        couch_replicator_docs:update_failed(DbName, DocId, Reason),
+        {permanent_failure, Reason}
+    end.
+
+
+-spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}, reference()) ->
+   rep_start_result().
+maybe_add_job_to_scheduler({DbName, DocId}, Rep, WRef) ->
+    RepId = Rep#rep.id,
+    case couch_replicator_scheduler:rep_state(RepId) of
+    nil ->
+        % Before adding a job check that this worker is still the current
+        % worker. This is to handle a race condition where a worker which was
+        % sleeping and then checking a replication filter may inadvertently
+        % re-add a replication which was already deleted.
+        case couch_replicator_doc_processor:get_worker_ref({DbName, DocId}) of
+        WRef ->
+            ok = couch_replicator_scheduler:add_job(Rep),
+            {ok, RepId};
+        _NilOrOtherWRef ->
+            ignore
+        end;
+    #rep{doc_id = DocId} ->
+        {ok, RepId};
+    #rep{doc_id = null} ->
+        Msg = io_lib:format("Replication `~s` specified by document `~s`"
+            " already running as a transient replication, started via"
+            " `_replicate` API endpoint", [pp_rep_id(RepId), DocId]),
+        {temporary_error, couch_util:to_binary(Msg)};
+    #rep{db_name = OtherDb, doc_id = OtherDocId} ->
+        Msg = io_lib:format("Replication `~s` specified by document `~s`"
+            " already started, triggered by document `~s` from db `~s`",
+            [pp_rep_id(RepId), DocId, OtherDocId, mem3:dbname(OtherDb)]),
+        {permanent_failure, couch_util:to_binary(Msg)}
+    end.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(DB, <<"db">>).
+-define(DOC1, <<"doc1">>).
+-define(R1, {"0b7831e9a41f9322a8600ccfa02245f2", ""}).
+
+
+doc_processor_worker_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_should_add_job(),
+            t_already_running_same_docid(),
+            t_already_running_transient(),
+            t_already_running_other_db_other_doc(),
+            t_spawn_worker(),
+            t_ignore_if_doc_deleted(),
+            t_ignore_if_worker_ref_does_not_match()
+        ]
+    }.
+
+
+% Replication is already running, with same doc id. Ignore change.
+t_should_add_job() ->
+   ?_test(begin
+       Id = {?DB, ?DOC1},
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
+       ?assert(added_job())
+   end).
+
+
+% Replication is already running, with same doc id. Ignore change.
+t_already_running_same_docid() ->
+   ?_test(begin
+       Id = {?DB, ?DOC1},
+       mock_already_running(?DB, ?DOC1),
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
+       ?assert(did_not_add_job())
+   end).
+
+
+% There is a transient replication with same replication id running. Ignore.
+t_already_running_transient() ->
+   ?_test(begin
+       Id = {?DB, ?DOC1},
+       mock_already_running(null, null),
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       ?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep,
+           nil)),
+       ?assert(did_not_add_job())
+   end).
+
+
+% There is a duplicate replication potentially from a different db and doc.
+% Write permanent failure to doc.
+t_already_running_other_db_other_doc() ->
+   ?_test(begin
+       Id = {?DB, ?DOC1},
+       mock_already_running(<<"otherdb">>, <<"otherdoc">>),
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       ?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep,
+           nil)),
+       ?assert(did_not_add_job()),
+       1 == meck:num_calls(couch_replicator_docs, update_failed, '_')
+   end).
+
+
+% Should spawn worker
+t_spawn_worker() ->
+   ?_test(begin
+       Id = {?DB, ?DOC1},
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       WRef = make_ref(),
+       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
+       Pid = spawn_worker(Id, Rep, 0, WRef),
+       Res = receive  {'DOWN', _Ref, process, Pid, Reason} -> Reason
+           after 1000 -> timeout end,
+       Expect = #doc_worker_result{id = Id, wref = WRef, result = {ok, ?R1}},
+       ?assertEqual(Expect, Res),
+       ?assert(added_job())
+   end).
+
+
+% Should not add job if by the time worker got to fetching the filter
+% and getting a replication id, replication doc was deleted
+t_ignore_if_doc_deleted() ->
+   ?_test(begin
+       Id = {?DB, ?DOC1},
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
+       ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
+       ?assertNot(added_job())
+   end).
+
+
+% Should not add job if by the time worker got to fetchign the filter
+% and building a replication id, another worker was spawned.
+t_ignore_if_worker_ref_does_not_match() ->
+    ?_test(begin
+       Id = {?DB, ?DOC1},
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1,
+           make_ref()),
+       ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
+       ?assertNot(added_job())
+   end).
+
+
+% Test helper functions
+
+setup() ->
+    meck:expect(couch_replicator_scheduler, add_job, 1, ok),
+    meck:expect(config, get, fun(_, _, Default) -> Default end),
+    meck:expect(couch_server, get_uuid, 0, this_is_snek),
+    meck:expect(couch_replicator_docs, update_failed, 3, ok),
+    meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
+    meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
+    ok.
+
+
+teardown(_) ->
+    meck:unload().
+
+
+mock_already_running(DbName, DocId) ->
+    meck:expect(couch_replicator_scheduler, rep_state,
+         fun(RepId) -> #rep{id = RepId, doc_id = DocId, db_name = DbName} end).
+
+
+added_job() ->
+    1 == meck:num_calls(couch_replicator_scheduler, add_job, '_').
+
+
+did_not_add_job() ->
+    0 == meck:num_calls(couch_replicator_scheduler, add_job, '_').
+
+
+change() ->
+    {[
+         {<<"_id">>, ?DOC1},
+         {<<"source">>, <<"src">>},
+         {<<"target">>, <<"tgt">>}
+     ]}.
+
+-endif.

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <commits@couchdb.apache.org>.

Mime
View raw message