couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From beno...@apache.org
Subject [7/8] create couch_replicator application.
Date Mon, 05 Dec 2011 09:33:30 GMT
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couchdb/couch_rep_sup.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_rep_sup.erl b/src/couchdb/couch_rep_sup.erl
deleted file mode 100644
index 1318c59..0000000
--- a/src/couchdb/couch_rep_sup.erl
+++ /dev/null
@@ -1,31 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_rep_sup).
--behaviour(supervisor).
--export([init/1, start_link/0]).
-
--include("couch_db.hrl").
-
-start_link() ->
-    supervisor:start_link({local,?MODULE}, ?MODULE, []).
-
-%%=============================================================================
-%% supervisor callbacks
-%%=============================================================================
-
-init([]) ->
-    {ok, {{one_for_one, 3, 10}, []}}.
-
-%%=============================================================================
-%% internal functions
-%%=============================================================================

http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couchdb/couch_replication_manager.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_replication_manager.erl b/src/couchdb/couch_replication_manager.erl
deleted file mode 100644
index 801af7c..0000000
--- a/src/couchdb/couch_replication_manager.erl
+++ /dev/null
@@ -1,626 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replication_manager).
--behaviour(gen_server).
-
-% public API
--export([replication_started/1, replication_completed/1, replication_error/2]).
-
-% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
--export([code_change/3, terminate/2]).
-
--include("couch_db.hrl").
--include("couch_replicator.hrl").
--include("couch_js_functions.hrl").
-
--define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
--define(REP_TO_STATE, couch_rep_id_to_rep_state).
--define(INITIAL_WAIT, 2.5). % seconds
--define(MAX_WAIT, 600).     % seconds
-
--record(rep_state, {
-    rep,
-    starting,
-    retries_left,
-    max_retries,
-    wait = ?INITIAL_WAIT
-}).
-
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
-]).
-
--record(state, {
-    changes_feed_loop = nil,
-    db_notifier = nil,
-    rep_db_name = nil,
-    rep_start_pids = [],
-    max_retries
-}).
-
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-replication_started(#rep{id = {BaseId, _} = RepId}) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        update_rep_doc(DocId, [
-            {<<"_replication_state">>, <<"triggered">>},
-            {<<"_replication_id">>, ?l2b(BaseId)}]),
-        ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
-        ?LOG_INFO("Document `~s` triggered replication `~s`",
-            [DocId, pp_rep_id(RepId)])
-    end.
-
-
-replication_completed(#rep{id = RepId}) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
-        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
-        ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
-            [pp_rep_id(RepId), DocId])
-    end.
-
-
-replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        % TODO: maybe add error reason to replication document
-        update_rep_doc(DocId, [
-            {<<"_replication_state">>, <<"error">>},
-            {<<"_replication_id">>, ?l2b(BaseId)}]),
-        ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
-    end.
-
-
-init(_) ->
-    process_flag(trap_exit, true),
-    ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
-    ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
-    Server = self(),
-    ok = couch_config:register(
-        fun("replicator", "db", NewName) ->
-            ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
-        ("replicator", "max_replication_retry_count", V) ->
-            ok = gen_server:cast(Server, {set_max_retries, retries_value(V)})
-        end
-    ),
-    {Loop, RepDbName} = changes_feed_loop(),
-    {ok, #state{
-        changes_feed_loop = Loop,
-        rep_db_name = RepDbName,
-        db_notifier = db_update_notifier(),
-        max_retries = retries_value(
-            couch_config:get("replicator", "max_replication_retry_count", "10"))
-    }}.
-
-
-handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) ->
-    NewState = try
-        process_update(State, Change)
-    catch
-    _Tag:Error ->
-        {RepProps} = get_value(doc, ChangeProps),
-        DocId = get_value(<<"_id">>, RepProps),
-        rep_db_update_error(Error, DocId),
-        State
-    end,
-    {reply, ok, NewState};
-
-
-handle_call({rep_started, RepId}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    RepState ->
-        NewRepState = RepState#rep_state{
-            starting = false,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries,
-            wait = ?INITIAL_WAIT
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
-    end,
-    {reply, ok, State};
-
-handle_call({rep_complete, RepId}, _From, State) ->
-    true = ets:delete(?REP_TO_STATE, RepId),
-    {reply, ok, State};
-
-handle_call({rep_error, RepId, Error}, _From, State) ->
-    {reply, ok, replication_error(State, RepId, Error)};
-
-handle_call(Msg, From, State) ->
-    ?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
-        [Msg, From]),
-    {stop, {error, {unexpected_call, Msg}}, State}.
-
-
-handle_cast({rep_db_changed, NewName}, #state{rep_db_name = NewName} = State) ->
-    {noreply, State};
-
-handle_cast({rep_db_changed, _NewName}, State) ->
-    {noreply, restart(State)};
-
-handle_cast({rep_db_created, NewName}, #state{rep_db_name = NewName} = State) ->
-    {noreply, State};
-
-handle_cast({rep_db_created, _NewName}, State) ->
-    {noreply, restart(State)};
-
-handle_cast({set_max_retries, MaxRetries}, State) ->
-    {noreply, State#state{max_retries = MaxRetries}};
-
-handle_cast(Msg, State) ->
-    ?LOG_ERROR("Replication manager received unexpected cast ~p", [Msg]),
-    {stop, {error, {unexpected_cast, Msg}}, State}.
-
-
-handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
-    % replicator DB deleted
-    {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
-
-handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
-    ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
-    {stop, {db_update_notifier_died, Reason}, State};
-
-handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
-    % one of the replication start processes terminated successfully
-    {noreply, State#state{rep_start_pids = Pids -- [From]}};
-
-handle_info({'DOWN', _Ref, _, _, _}, State) ->
-    % From a db monitor created by a replication process. Ignore.
-    {noreply, State};
-
-handle_info(Msg, State) ->
-    ?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]),
-    {stop, {unexpected_msg, Msg}, State}.
-
-
-terminate(_Reason, State) ->
-    #state{
-        rep_start_pids = StartPids,
-        changes_feed_loop = Loop,
-        db_notifier = DbNotifier
-    } = State,
-    stop_all_replications(),
-    lists:foreach(
-        fun(Pid) ->
-            catch unlink(Pid),
-            catch exit(Pid, stop)
-        end,
-        [Loop | StartPids]),
-    true = ets:delete(?REP_TO_STATE),
-    true = ets:delete(?DOC_TO_REP),
-    couch_db_update_notifier:stop(DbNotifier).
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-changes_feed_loop() ->
-    {ok, RepDb} = ensure_rep_db_exists(),
-    RepDbName = couch_db:name(RepDb),
-    couch_db:close(RepDb),
-    Server = self(),
-    Pid = spawn_link(
-        fun() ->
-            DbOpenOptions = [{user_ctx, RepDb#db.user_ctx}, sys_db],
-            {ok, Db} = couch_db:open_int(RepDbName, DbOpenOptions),
-            ChangesFeedFun = couch_changes:handle_changes(
-                #changes_args{
-                    include_docs = true,
-                    feed = "continuous",
-                    timeout = infinity,
-                    db_open_options = [sys_db]
-                },
-                {json_req, null},
-                Db
-            ),
-            ChangesFeedFun(
-                fun({change, Change, _}, _) ->
-                    case has_valid_rep_id(Change) of
-                    true ->
-                        ok = gen_server:call(
-                            Server, {rep_db_update, Change}, infinity);
-                    false ->
-                        ok
-                    end;
-                (_, _) ->
-                    ok
-                end
-            ),
-            couch_db:close(Db)
-        end
-    ),
-    {Pid, RepDbName}.
-
-
-has_valid_rep_id({Change}) ->
-    has_valid_rep_id(get_value(<<"id">>, Change));
-has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
-    false;
-has_valid_rep_id(_Else) ->
-    true.
-
-
-db_update_notifier() ->
-    Server = self(),
-    {ok, Notifier} = couch_db_update_notifier:start_link(
-        fun({created, DbName}) ->
-            case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
-            DbName ->
-                ok = gen_server:cast(Server, {rep_db_created, DbName});
-            _ ->
-                ok
-            end;
-        (_) ->
-            % no need to handle the 'deleted' event - the changes feed loop
-            % dies when the database is deleted
-            ok
-        end
-    ),
-    Notifier.
-
-
-restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
-    stop_all_replications(),
-    lists:foreach(
-        fun(Pid) ->
-            catch unlink(Pid),
-            catch exit(Pid, rep_db_changed)
-        end,
-        [Loop | StartPids]),
-    {NewLoop, NewRepDbName} = changes_feed_loop(),
-    State#state{
-        changes_feed_loop = NewLoop,
-        rep_db_name = NewRepDbName,
-        rep_start_pids = []
-    }.
-
-
-process_update(State, {Change}) ->
-    {RepProps} = JsonRepDoc = get_value(doc, Change),
-    DocId = get_value(<<"_id">>, RepProps),
-    case get_value(<<"deleted">>, Change, false) of
-    true ->
-        rep_doc_deleted(DocId),
-        State;
-    false ->
-        case get_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            maybe_start_replication(State, DocId, JsonRepDoc);
-        <<"triggered">> ->
-            maybe_start_replication(State, DocId, JsonRepDoc);
-        <<"completed">> ->
-            replication_complete(DocId),
-            State;
-        <<"error">> ->
-            case ets:lookup(?DOC_TO_REP, DocId) of
-            [] ->
-                maybe_start_replication(State, DocId, JsonRepDoc);
-            _ ->
-                State
-            end
-        end
-    end.
-
-
-rep_db_update_error(Error, DocId) ->
-    case Error of
-    {bad_rep_doc, Reason} ->
-        ok;
-    _ ->
-        Reason = to_binary(Error)
-    end,
-    ?LOG_ERROR("Replication manager, error processing document `~s`: ~s",
-        [DocId, Reason]),
-    update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]).
-
-
-rep_user_ctx({RepDoc}) ->
-    case get_value(<<"user_ctx">>, RepDoc) of
-    undefined ->
-        #user_ctx{};
-    {UserCtx} ->
-        #user_ctx{
-            name = get_value(<<"name">>, UserCtx, null),
-            roles = get_value(<<"roles">>, UserCtx, [])
-        }
-    end.
-
-
-maybe_start_replication(State, DocId, RepDoc) ->
-    #rep{id = {BaseId, _} = RepId} = Rep = parse_rep_doc(RepDoc),
-    case rep_state(RepId) of
-    nil ->
-        RepState = #rep_state{
-            rep = Rep,
-            starting = true,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
-        true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
-        ?LOG_INFO("Attempting to start replication `~s` (document `~s`).",
-            [pp_rep_id(RepId), DocId]),
-        Pid = spawn_link(fun() -> start_replication(Rep, 0) end),
-        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        State;
-    #rep_state{starting = false, rep = #rep{doc_id = OtherDocId}} ->
-        ?LOG_INFO("The replication specified by the document `~s` was already"
-            " triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
-        State;
-    #rep_state{starting = true, rep = #rep{doc_id = OtherDocId}} ->
-        ?LOG_INFO("The replication specified by the document `~s` is already"
-            " being triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
-        State
-    end.
-
-
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} = try
-        couch_replicator_utils:parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-    catch
-    throw:{error, Reason} ->
-        throw({bad_rep_doc, Reason});
-    Tag:Err ->
-        throw({bad_rep_doc, to_binary({Tag, Err})})
-    end,
-    Rep.
-
-
-maybe_tag_rep_doc(DocId, {RepProps}, RepId) ->
-    case get_value(<<"_replication_id">>, RepProps) of
-    RepId ->
-        ok;
-    _ ->
-        update_rep_doc(DocId, [{<<"_replication_id">>, RepId}])
-    end.
-
-
-start_replication(Rep, Wait) ->
-    ok = timer:sleep(Wait * 1000),
-    case (catch couch_replicator:async_replicate(Rep)) of
-    {ok, _} ->
-        ok;
-    Error ->
-        replication_error(Rep, Error)
-    end.
-
-
-replication_complete(DocId) ->
-    case ets:lookup(?DOC_TO_REP, DocId) of
-    [{DocId, {BaseId, Ext} = RepId}] ->
-        case rep_state(RepId) of
-        nil ->
-            % Prior to OTP R14B02, temporary child specs remain in
-            % in the supervisor after a worker finishes - remove them.
-            % We want to be able to start the same replication but with
-            % eventually different values for parameters that don't
-            % contribute to its ID calculation.
-            _ = supervisor:delete_child(couch_rep_sup, BaseId ++ Ext);
-        #rep_state{} ->
-            ok
-        end,
-        true = ets:delete(?DOC_TO_REP, DocId);
-    _ ->
-        ok
-    end.
-
-
-rep_doc_deleted(DocId) ->
-    case ets:lookup(?DOC_TO_REP, DocId) of
-    [{DocId, RepId}] ->
-        couch_replicator:cancel_replication(RepId),
-        true = ets:delete(?REP_TO_STATE, RepId),
-        true = ets:delete(?DOC_TO_REP, DocId),
-        ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
-            " was deleted", [pp_rep_id(RepId), DocId]);
-    [] ->
-        ok
-    end.
-
-
-replication_error(State, RepId, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        State;
-    RepState ->
-        maybe_retry_replication(RepState, Error, State)
-    end.
-
-maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId},
-        max_retries = MaxRetries
-    } = RepState,
-    couch_replicator:cancel_replication(RepId),
-    true = ets:delete(?REP_TO_STATE, RepId),
-    true = ets:delete(?DOC_TO_REP, DocId),
-    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
-        "~nReached maximum retry attempts (~p).",
-        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
-    State;
-
-maybe_retry_replication(RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId} = Rep
-    } = RepState,
-    #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
-    true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
-    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
-        "~nRestarting replication in ~p seconds.",
-        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
-    Pid = spawn_link(fun() -> start_replication(Rep, Wait) end),
-    State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
-
-
-stop_all_replications() ->
-    ?LOG_INFO("Stopping all ongoing replications because the replicator"
-        " database was deleted or changed", []),
-    ets:foldl(
-        fun({_, RepId}, _) ->
-            couch_replicator:cancel_replication(RepId)
-        end,
-        ok, ?DOC_TO_REP),
-    true = ets:delete_all_objects(?REP_TO_STATE),
-    true = ets:delete_all_objects(?DOC_TO_REP).
-
-
-update_rep_doc(RepDocId, KVs) ->
-    {ok, RepDb} = ensure_rep_db_exists(),
-    try
-        case couch_db:open_doc(RepDb, RepDocId, [ejson_body]) of
-        {ok, LatestRepDoc} ->
-            update_rep_doc(RepDb, LatestRepDoc, KVs);
-        _ ->
-            ok
-        end
-    catch throw:conflict ->
-        % Shouldn't happen, as by default only the role _replicator can
-        % update replication documents.
-        ?LOG_ERROR("Conflict error when updating replication document `~s`."
-            " Retrying.", [RepDocId]),
-        ok = timer:sleep(5),
-        update_rep_doc(RepDocId, KVs)
-    after
-        couch_db:close(RepDb)
-    end.
-
-update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
-    NewRepDocBody = lists:foldl(
-        fun({<<"_replication_state">> = K, State} = KV, Body) ->
-                case get_value(K, Body) of
-                State ->
-                    Body;
-                _ ->
-                    Body1 = lists:keystore(K, 1, Body, KV),
-                    lists:keystore(
-                        <<"_replication_state_time">>, 1, Body1,
-                        {<<"_replication_state_time">>, timestamp()})
-                end;
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody, KVs),
-    case NewRepDocBody of
-    RepDocBody ->
-        ok;
-    _ ->
-        % Might not succeed - when the replication doc is deleted right
-        % before this update (not an error, ignore).
-        couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
-    end.
-
-
-% RFC3339 timestamps.
-% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
-timestamp() ->
-    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
-    UTime = erlang:universaltime(),
-    LocalTime = calendar:universal_time_to_local_time(UTime),
-    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
-        calendar:datetime_to_gregorian_seconds(UTime),
-    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
-    iolist_to_binary(
-        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
-            [Year, Month, Day, Hour, Min, Sec,
-                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
-
-zone(Hr, Min) when Hr >= 0, Min >= 0 ->
-    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
-zone(Hr, Min) ->
-    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
-
-
-
-ensure_rep_db_exists() ->
-    DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
-    UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
-    case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}]) of
-    {ok, Db} ->
-        Db;
-    _Error ->
-        {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}])
-    end,
-    ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
-    {ok, Db}.
-
-
-ensure_rep_ddoc_exists(RepDb, DDocID) ->
-    case couch_db:open_doc(RepDb, DDocID, []) of
-    {ok, _Doc} ->
-        ok;
-    _ ->
-        DDoc = couch_doc:from_json_obj({[
-            {<<"_id">>, DDocID},
-            {<<"language">>, <<"javascript">>},
-            {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-        ]}),
-        {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
-     end.
-
-
-% pretty-print replication id
-pp_rep_id(#rep{id = RepId}) ->
-    pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
-    Base ++ Extension.
-
-
-rep_state(RepId) ->
-    case ets:lookup(?REP_TO_STATE, RepId) of
-    [{RepId, RepState}] ->
-        RepState;
-    [] ->
-        nil
-    end.
-
-
-error_reason({error, Reason}) ->
-    Reason;
-error_reason(Reason) ->
-    Reason.
-
-
-retries_value("infinity") ->
-    infinity;
-retries_value(Value) ->
-    list_to_integer(Value).
-
-
-state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
-    Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
-    case Left of
-    infinity ->
-        State#rep_state{wait = Wait2};
-    _ ->
-        State#rep_state{retries_left = Left - 1, wait = Wait2}
-    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couchdb/couch_replication_notifier.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_replication_notifier.erl b/src/couchdb/couch_replication_notifier.erl
deleted file mode 100644
index c686c2b..0000000
--- a/src/couchdb/couch_replication_notifier.erl
+++ /dev/null
@@ -1,57 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replication_notifier).
-
--behaviour(gen_event).
-
-% public API
--export([start_link/1, stop/1, notify/1]).
-
-% gen_event callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_event/2, handle_call/2, handle_info/2]).
-
--include("couch_db.hrl").
-
-start_link(FunAcc) ->
-    couch_event_sup:start_link(couch_replication,
-        {couch_replication_notifier, make_ref()}, FunAcc).
-
-notify(Event) ->
-    gen_event:notify(couch_replication, Event).
-
-stop(Pid) ->
-    couch_event_sup:stop(Pid).
-
-
-init(FunAcc) ->
-    {ok, FunAcc}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-handle_event(Event, Fun) when is_function(Fun, 1) ->
-    Fun(Event),
-    {ok, Fun};
-handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
-    Acc2 = Fun(Event, Acc),
-    {ok, {Fun, Acc2}}.
-
-handle_call(_Msg, State) ->
-    {reply, ok, State}.
-
-handle_info(_Msg, State) ->
-    {ok, State}.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couchdb/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_replicator.erl b/src/couchdb/couch_replicator.erl
deleted file mode 100644
index 40cb9a4..0000000
--- a/src/couchdb/couch_replicator.erl
+++ /dev/null
@@ -1,942 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator).
--behaviour(gen_server).
-
-% public API
--export([replicate/1]).
-
-% meant to be used only by the replicator database listener
--export([async_replicate/1]).
--export([cancel_replication/1]).
-
-% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
-
--include("couch_db.hrl").
--include("couch_api_wrap.hrl").
--include("couch_replicator.hrl").
-
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
-]).
-
--import(couch_replicator_utils, [
-    start_db_compaction_notifier/2,
-    stop_db_compaction_notifier/1
-]).
-
--record(rep_state, {
-    rep_details,
-    source_name,
-    target_name,
-    source,
-    target,
-    history,
-    checkpoint_history,
-    start_seq,
-    committed_seq,
-    current_through_seq,
-    seqs_in_progress = [],
-    highest_seq_done = {0, ?LOWEST_SEQ},
-    source_log,
-    target_log,
-    rep_starttime,
-    src_starttime,
-    tgt_starttime,
-    timer, % checkpoint timer
-    changes_queue,
-    changes_manager,
-    changes_reader,
-    workers,
-    stats = #rep_stats{},
-    session_id,
-    source_db_compaction_notifier = nil,
-    target_db_compaction_notifier = nil,
-    source_monitor = nil,
-    target_monitor = nil,
-    source_seq = nil
-}).
-
-
-replicate(#rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep) ->
-    case get_value(cancel, Options, false) of
-    true ->
-        case get_value(id, Options, nil) of
-        nil ->
-            cancel_replication(RepId);
-        RepId2 ->
-            cancel_replication(RepId2, UserCtx)
-        end;
-    false ->
-        {ok, Listener} = rep_result_listener(RepId),
-        Result = do_replication_loop(Rep),
-        couch_replication_notifier:stop(Listener),
-        Result
-    end.
-
-
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
-    case async_replicate(Rep) of
-    {ok, _Pid} ->
-        case get_value(continuous, Options, false) of
-        true ->
-            {ok, {continuous, ?l2b(BaseId ++ Ext)}};
-        false ->
-            wait_for_result(Id)
-        end;
-    Error ->
-        Error
-    end.
-
-
-async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
-    RepChildId = BaseId ++ Ext,
-    Source = couch_api_wrap:db_uri(Src),
-    Target = couch_api_wrap:db_uri(Tgt),
-    Timeout = get_value(connection_timeout, Rep#rep.options),
-    ChildSpec = {
-        RepChildId,
-        {gen_server, start_link, [?MODULE, Rep, [{timeout, Timeout}]]},
-        temporary,
-        1,
-        worker,
-        [?MODULE]
-    },
-    % All these nested cases to attempt starting/restarting a replication child
-    % are ugly and not 100% race condition free. The following patch submission
-    % is a solution:
-    %
-    % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
-    %
-    case supervisor:start_child(couch_rep_sup, ChildSpec) of
-    {ok, Pid} ->
-        ?LOG_INFO("starting new replication `~s` at ~p (`~s` -> `~s`)",
-            [RepChildId, Pid, Source, Target]),
-        {ok, Pid};
-    {error, already_present} ->
-        case supervisor:restart_child(couch_rep_sup, RepChildId) of
-        {ok, Pid} ->
-            ?LOG_INFO("restarting replication `~s` at ~p (`~s` -> `~s`)",
-                [RepChildId, Pid, Source, Target]),
-            {ok, Pid};
-        {error, running} ->
-            %% this error occurs if multiple replicators are racing
-            %% each other to start and somebody else won. Just grab
-            %% the Pid by calling start_child again.
-            {error, {already_started, Pid}} =
-                supervisor:start_child(couch_rep_sup, ChildSpec),
-            ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
-                [RepChildId, Pid, Source, Target]),
-            {ok, Pid};
-        {error, {'EXIT', {badarg,
-            [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
-            % Clause to deal with a change in the supervisor module introduced
-            % in R14B02. For more details consult the thread at:
-            %     http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
-            _ = supervisor:delete_child(couch_rep_sup, RepChildId),
-            async_replicate(Rep);
-        {error, _} = Error ->
-            Error
-        end;
-    {error, {already_started, Pid}} ->
-        ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
-            [RepChildId, Pid, Source, Target]),
-        {ok, Pid};
-    {error, {Error, _}} ->
-        {error, Error}
-    end.
-
-
-rep_result_listener(RepId) ->
-    ReplyTo = self(),
-    {ok, _Listener} = couch_replication_notifier:start_link(
-        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
-                ReplyTo ! Ev;
-            (_) ->
-                ok
-        end).
-
-
-wait_for_result(RepId) ->
-    receive
-    {finished, RepId, RepResult} ->
-        {ok, RepResult};
-    {error, RepId, Reason} ->
-        {error, Reason}
-    end.
-
-
-cancel_replication({BaseId, Extension}) ->
-    FullRepId = BaseId ++ Extension,
-    ?LOG_INFO("Canceling replication `~s`...", [FullRepId]),
-    case supervisor:terminate_child(couch_rep_sup, FullRepId) of
-    ok ->
-        ?LOG_INFO("Replication `~s` canceled.", [FullRepId]),
-        case supervisor:delete_child(couch_rep_sup, FullRepId) of
-            ok ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            {error, not_found} ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            Error ->
-                Error
-        end;
-    Error ->
-        ?LOG_ERROR("Error canceling replication `~s`: ~p", [FullRepId, Error]),
-        Error
-    end.
-
-cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
-    case lists:member(<<"_admin">>, Roles) of
-    true ->
-        cancel_replication(RepId);
-    false ->
-        {BaseId, Ext} = RepId,
-        case lists:keysearch(
-            BaseId ++ Ext, 1, supervisor:which_children(couch_rep_sup)) of
-        {value, {_, Pid, _, _}} when is_pid(Pid) ->
-            case (catch gen_server:call(Pid, get_details, infinity)) of
-            {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
-                cancel_replication(RepId);
-            {ok, _} ->
-                throw({unauthorized,
-                    <<"Can't cancel a replication triggered by another user">>});
-            {'EXIT', {noproc, {gen_server, call, _}}} ->
-                {error, not_found};
-            Error ->
-                throw(Error)
-            end;
-        _ ->
-            {error, not_found}
-        end
-    end.
-
-init(InitArgs) ->
-    try
-        do_init(InitArgs)
-    catch
-    throw:{unauthorized, DbUri} ->
-        {stop, {unauthorized,
-            <<"unauthorized to access or create database ", DbUri/binary>>}};
-    throw:{db_not_found, DbUri} ->
-        {stop, {db_not_found, <<"could not open ", DbUri/binary>>}};
-    throw:Error ->
-        {stop, Error}
-    end.
-
-do_init(#rep{options = Options, id = {BaseId, Ext}} = Rep) ->
-    process_flag(trap_exit, true),
-
-    #rep_state{
-        source = Source,
-        target = Target,
-        source_name = SourceName,
-        target_name = TargetName,
-        start_seq = {_Ts, StartSeq},
-        source_seq = SourceCurSeq,
-        committed_seq = {_, CommittedSeq}
-    } = State = init_state(Rep),
-
-    NumWorkers = get_value(worker_processes, Options),
-    BatchSize = get_value(worker_batch_size, Options),
-    {ok, ChangesQueue} = couch_work_queue:new([
-        {max_items, BatchSize * NumWorkers * 2},
-        {max_size, 100 * 1024 * NumWorkers}
-    ]),
-    % This starts the _changes reader process. It adds the changes from
-    % the source db to the ChangesQueue.
-    ChangesReader = spawn_changes_reader(StartSeq, Source, ChangesQueue, Options),
-    % Changes manager - responsible for dequeing batches from the changes queue
-    % and deliver them to the worker processes.
-    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
-    % This starts the worker processes. They ask the changes queue manager for a
-    % a batch of _changes rows to process -> check which revs are missing in the
-    % target, and for the missing ones, it copies them from the source to the target.
-    MaxConns = get_value(http_connections, Options),
-    Workers = lists:map(
-        fun(_) ->
-            {ok, Pid} = couch_replicator_worker:start_link(
-                self(), Source, Target, ChangesManager, MaxConns),
-            Pid
-        end,
-        lists:seq(1, NumWorkers)),
-
-    couch_task_status:add_task([
-        {type, replication},
-        {replication_id, ?l2b(BaseId ++ Ext)},
-        {source, ?l2b(SourceName)},
-        {target, ?l2b(TargetName)},
-        {continuous, get_value(continuous, Options, false)},
-        {revisions_checked, 0},
-        {missing_revisions_found, 0},
-        {docs_read, 0},
-        {docs_written, 0},
-        {doc_write_failures, 0},
-        {source_seq, SourceCurSeq},
-        {checkpointed_source_seq, CommittedSeq},
-        {progress, 0}
-    ]),
-    couch_task_status:set_update_frequency(1000),
-
-    % Until OTP R14B03:
-    %
-    % Restarting a temporary supervised child implies that the original arguments
-    % (#rep{} record) specified in the MFA component of the supervisor
-    % child spec will always be used whenever the child is restarted.
-    % This implies the same replication performance tunning parameters will
-    % always be used. The solution is to delete the child spec (see
-    % cancel_replication/1) and then start the replication again, but this is
-    % unfortunately not immune to race conditions.
-
-    ?LOG_INFO("Replication `~p` is using:~n"
-        "~c~p worker processes~n"
-        "~ca worker batch size of ~p~n"
-        "~c~p HTTP connections~n"
-        "~ca connection timeout of ~p milliseconds~n"
-        "~c~p retries per request~n"
-        "~csocket options are: ~s~s",
-        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
-            MaxConns, $\t, get_value(connection_timeout, Options),
-            $\t, get_value(retries, Options),
-            $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
-            case StartSeq of
-            ?LOWEST_SEQ ->
-                "";
-            _ ->
-                io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
-            end]),
-
-    ?LOG_DEBUG("Worker pids are: ~p", [Workers]),
-
-    couch_replication_manager:replication_started(Rep),
-
-    {ok, State#rep_state{
-            changes_queue = ChangesQueue,
-            changes_manager = ChangesManager,
-            changes_reader = ChangesReader,
-            workers = Workers
-        }
-    }.
-
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    ?LOG_ERROR("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    ?LOG_ERROR("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
-    ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
-    ?LOG_ERROR("ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
-    ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
-    case Workers -- [Pid] of
-    Workers ->
-        {stop, {unknown_process_died, Pid, normal}, State};
-    [] ->
-        catch unlink(State#rep_state.changes_manager),
-        catch exit(State#rep_state.changes_manager, kill),
-        do_last_checkpoint(State);
-    Workers2 ->
-        {noreply, State#rep_state{workers = Workers2}}
-    end;
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
-    State2 = cancel_timer(State),
-    case lists:member(Pid, Workers) of
-    false ->
-        {stop, {unknown_process_died, Pid, Reason}, State2};
-    true ->
-        ?LOG_ERROR("Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
-    end.
-
-
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
-    {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
-    gen_server:reply(From, ok),
-    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
-    {noreply, State#rep_state{stats = NewStats}};
-
-handle_call({report_seq_done, Seq, StatsInc}, From,
-    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
-        current_through_seq = ThroughSeq, stats = Stats} = State) ->
-    gen_server:reply(From, ok),
-    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
-    [Seq | Rest] ->
-        {Seq, Rest};
-    [_ | _] ->
-        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
-    end,
-    NewHighestDone = lists:max([HighestDone, Seq]),
-    NewThroughSeq = case NewSeqsInProgress of
-    [] ->
-        lists:max([NewThroughSeq0, NewHighestDone]);
-    _ ->
-        NewThroughSeq0
-    end,
-    ?LOG_DEBUG("Worker reported seq ~p, through seq was ~p, "
-        "new through seq is ~p, highest seq done was ~p, "
-        "new highest seq done is ~p~n"
-        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
-        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
-            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
-    SourceCurSeq = source_cur_seq(State),
-    NewState = State#rep_state{
-        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
-        current_through_seq = NewThroughSeq,
-        seqs_in_progress = NewSeqsInProgress,
-        highest_seq_done = NewHighestDone,
-        source_seq = SourceCurSeq
-    },
-    update_task(NewState),
-    {noreply, NewState}.
-
-
-handle_cast({db_compacted, DbName},
-    #rep_state{source = #db{name = DbName} = Source} = State) ->
-    {ok, NewSource} = couch_db:reopen(Source),
-    {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
-    #rep_state{target = #db{name = DbName} = Target} = State) ->
-    {ok, NewTarget} = couch_db:reopen(Target),
-    {noreply, State#rep_state{target = NewTarget}};
-
-handle_cast(checkpoint, State) ->
-    case do_checkpoint(State) of
-    {ok, NewState} ->
-        {noreply, NewState#rep_state{timer = start_timer(State)}};
-    Error ->
-        {stop, Error, State}
-    end;
-
-handle_cast({report_seq, Seq},
-    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
-    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
-    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
-    checkpoint_history = CheckpointHistory} = State) ->
-    terminate_cleanup(State),
-    couch_replication_notifier:notify({finished, RepId, CheckpointHistory}),
-    couch_replication_manager:replication_completed(Rep);
-
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
-    % cancelled replication throught ?MODULE:cancel_replication/1
-    couch_replication_notifier:notify({error, RepId, <<"cancelled">>}),
-    terminate_cleanup(State);
-
-terminate(Reason, State) ->
-    #rep_state{
-        source_name = Source,
-        target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
-    } = State,
-    ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
-        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
-    terminate_cleanup(State),
-    couch_replication_notifier:notify({error, RepId, Reason}),
-    couch_replication_manager:replication_error(Rep, Reason).
-
-
-terminate_cleanup(State) ->
-    update_task(State),
-    stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
-    stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
-    couch_api_wrap:db_close(State#rep_state.source),
-    couch_api_wrap:db_close(State#rep_state.target).
-
-
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
-    {stop, normal, cancel_timer(State)};
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = Seq} = State) ->
-    case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
-    {ok, NewState} ->
-        {stop, normal, cancel_timer(NewState)};
-    Error ->
-        {stop, Error, State}
-    end.
-
-
-start_timer(State) ->
-    After = checkpoint_interval(State),
-    case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
-    {ok, Ref} ->
-        Ref;
-    Error ->
-        ?LOG_ERROR("Replicator, error scheduling checkpoint:  ~p", [Error]),
-        nil
-    end.
-
-
-cancel_timer(#rep_state{timer = nil} = State) ->
-    State;
-cancel_timer(#rep_state{timer = Timer} = State) ->
-    {ok, cancel} = timer:cancel(Timer),
-    State#rep_state{timer = nil}.
-
-
-init_state(Rep) ->
-    #rep{
-        source = Src, target = Tgt,
-        options = Options, user_ctx = UserCtx
-    } = Rep,
-    {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
-    {ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
-        get_value(create_target, Options, false)),
-
-    {ok, SourceInfo} = couch_api_wrap:get_db_info(Source),
-    {ok, TargetInfo} = couch_api_wrap:get_db_info(Target),
-
-    [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
-
-    {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
-    StartSeq1 = get_value(since_seq, Options, StartSeq0),
-    StartSeq = {0, StartSeq1},
-    #doc{body={CheckpointHistory}} = SourceLog,
-    State = #rep_state{
-        rep_details = Rep,
-        source_name = couch_api_wrap:db_uri(Source),
-        target_name = couch_api_wrap:db_uri(Target),
-        source = Source,
-        target = Target,
-        history = History,
-        checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
-        start_seq = StartSeq,
-        current_through_seq = StartSeq,
-        committed_seq = StartSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = httpd_util:rfc1123_date(),
-        src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
-        tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
-        session_id = couch_uuids:random(),
-        source_db_compaction_notifier =
-            start_db_compaction_notifier(Source, self()),
-        target_db_compaction_notifier =
-            start_db_compaction_notifier(Target, self()),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
-        source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ)
-    },
-    State#rep_state{timer = start_timer(State)}.
-
-
-find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
-    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
-    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
-
-
-fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
-    lists:reverse(Acc);
-
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
-    case couch_api_wrap:open_doc(Db, LogId, [ejson_body]) of
-    {error, <<"not_found">>} when Vsn > 1 ->
-        OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
-        fold_replication_logs(Dbs, Vsn - 1,
-            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
-    {error, <<"not_found">>} ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
-    {ok, Doc} when LogId =:= NewId ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
-    {ok, Doc} ->
-        MigratedLog = #doc{id = NewId, body = Doc#doc.body},
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
-    end.
-
-
-spawn_changes_reader(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
-    spawn_link(fun() ->
-        put(last_seq, StartSeq),
-        put(retries_left, Db#httpdb.retries),
-        read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
-    end);
-spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
-    spawn_link(fun() ->
-        read_changes(StartSeq, Db, ChangesQueue, Options)
-    end).
-
-read_changes(StartSeq, Db, ChangesQueue, Options) ->
-    try
-        couch_api_wrap:changes_since(Db, all_docs, StartSeq,
-            fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
-                case Id of
-                <<>> ->
-                    % Previous CouchDB releases had a bug which allowed a doc
-                    % with an empty ID to be inserted into databases. Such doc
-                    % is impossible to GET.
-                    ?LOG_ERROR("Replicator: ignoring document with empty ID in "
-                        "source database `~s` (_changes sequence ~p)",
-                        [couch_api_wrap:db_uri(Db), Seq]);
-                _ ->
-                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
-                end,
-                put(last_seq, Seq)
-            end, Options),
-        couch_work_queue:close(ChangesQueue)
-    catch exit:{http_request_failed, _, _, _} = Error ->
-        case get(retries_left) of
-        N when N > 0 ->
-            put(retries_left, N - 1),
-            LastSeq = get(last_seq),
-            Db2 = case LastSeq of
-            StartSeq ->
-                ?LOG_INFO("Retrying _changes request to source database ~s"
-                    " with since=~p in ~p seconds",
-                    [couch_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
-                ok = timer:sleep(Db#httpdb.wait),
-                Db#httpdb{wait = 2 * Db#httpdb.wait};
-            _ ->
-                ?LOG_INFO("Retrying _changes request to source database ~s"
-                    " with since=~p", [couch_api_wrap:db_uri(Db), LastSeq]),
-                Db
-            end,
-            read_changes(LastSeq, Db2, ChangesQueue, Options);
-        _ ->
-            exit(Error)
-        end
-    end.
-
-
-spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
-    spawn_link(fun() ->
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
-    end).
-
-changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
-    receive
-    {get_changes, From} ->
-        case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
-        closed ->
-            From ! {closed, self()};
-        {ok, Changes} ->
-            #doc_info{high_seq = Seq} = lists:last(Changes),
-            ReportSeq = {Ts, Seq},
-            ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
-            From ! {changes, self(), Changes, ReportSeq}
-        end,
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
-    end.
-
-
-checkpoint_interval(_State) ->
-    5000.
-
-do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
-    SourceCurSeq = source_cur_seq(State),
-    NewState = State#rep_state{source_seq = SourceCurSeq},
-    update_task(NewState),
-    {ok, NewState};
-do_checkpoint(State) ->
-    #rep_state{
-        source_name=SourceName,
-        target_name=TargetName,
-        source = Source,
-        target = Target,
-        history = OldHistory,
-        start_seq = {_, StartSeq},
-        current_through_seq = {_Ts, NewSeq} = NewTsSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = ReplicationStartTime,
-        src_starttime = SrcInstanceStartTime,
-        tgt_starttime = TgtInstanceStartTime,
-        stats = Stats,
-        rep_details = #rep{options = Options},
-        session_id = SessionId
-    } = State,
-    case commit_to_both(Source, Target) of
-    {source_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
-    {target_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
-    {SrcInstanceStartTime, TgtInstanceStartTime} ->
-        ?LOG_INFO("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
-            [SourceName, TargetName, NewSeq]),
-        StartTime = ?l2b(ReplicationStartTime),
-        EndTime = ?l2b(httpd_util:rfc1123_date()),
-        NewHistoryEntry = {[
-            {<<"session_id">>, SessionId},
-            {<<"start_time">>, StartTime},
-            {<<"end_time">>, EndTime},
-            {<<"start_last_seq">>, StartSeq},
-            {<<"end_last_seq">>, NewSeq},
-            {<<"recorded_seq">>, NewSeq},
-            {<<"missing_checked">>, Stats#rep_stats.missing_checked},
-            {<<"missing_found">>, Stats#rep_stats.missing_found},
-            {<<"docs_read">>, Stats#rep_stats.docs_read},
-            {<<"docs_written">>, Stats#rep_stats.docs_written},
-            {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
-        ]},
-        BaseHistory = [
-            {<<"session_id">>, SessionId},
-            {<<"source_last_seq">>, NewSeq},
-            {<<"replication_id_version">>, ?REP_ID_VERSION}
-        ] ++ case get_value(doc_ids, Options) of
-        undefined ->
-            [];
-        _DocIds ->
-            % backwards compatibility with the result of a replication by
-            % doc IDs in versions 0.11.x and 1.0.x
-            % TODO: deprecate (use same history format, simplify code)
-            [
-                {<<"start_time">>, StartTime},
-                {<<"end_time">>, EndTime},
-                {<<"docs_read">>, Stats#rep_stats.docs_read},
-                {<<"docs_written">>, Stats#rep_stats.docs_written},
-                {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
-            ]
-        end,
-        % limit history to 50 entries
-        NewRepHistory = {
-            BaseHistory ++
-            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
-        },
-
-        try
-            {SrcRevPos, SrcRevId} = update_checkpoint(
-                Source, SourceLog#doc{body = NewRepHistory}, source),
-            {TgtRevPos, TgtRevId} = update_checkpoint(
-                Target, TargetLog#doc{body = NewRepHistory}, target),
-            SourceCurSeq = source_cur_seq(State),
-            NewState = State#rep_state{
-                source_seq = SourceCurSeq,
-                checkpoint_history = NewRepHistory,
-                committed_seq = NewTsSeq,
-                source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
-                target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
-            },
-            update_task(NewState),
-            {ok, NewState}
-        catch throw:{checkpoint_commit_failure, _} = Failure ->
-            Failure
-        end;
-    {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Target database out of sync. "
-            "Try to increase max_dbs_open at the target's server.">>};
-    {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source database out of sync. "
-            "Try to increase max_dbs_open at the source's server.">>};
-    {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source and target databases out of "
-            "sync. Try to increase max_dbs_open at both servers.">>}
-    end.
-
-
-update_checkpoint(Db, Doc, DbType) ->
-    try
-        update_checkpoint(Db, Doc)
-    catch throw:{checkpoint_commit_failure, Reason} ->
-        throw({checkpoint_commit_failure,
-            <<"Error updating the ", (to_binary(DbType))/binary,
-                " checkpoint document: ", (to_binary(Reason))/binary>>})
-    end.
-
-update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
-    try
-        case couch_api_wrap:update_doc(Db, Doc, [delay_commit]) of
-        {ok, PosRevId} ->
-            PosRevId;
-        {error, Reason} ->
-            throw({checkpoint_commit_failure, Reason})
-        end
-    catch throw:conflict ->
-        case (catch couch_api_wrap:open_doc(Db, LogId, [ejson_body])) of
-        {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
-            % This means that we were able to update successfully the
-            % checkpoint doc in a previous attempt but we got a connection
-            % error (timeout for e.g.) before receiving the success response.
-            % Therefore the request was retried and we got a conflict, as the
-            % revision we sent is not the current one.
-            % We confirm this by verifying the doc body we just got is the same
-            % that we have just sent.
-            {Pos, RevId};
-        _ ->
-            throw({checkpoint_commit_failure, conflict})
-        end
-    end.
-
-
-commit_to_both(Source, Target) ->
-    % commit the src async
-    ParentPid = self(),
-    SrcCommitPid = spawn_link(
-        fun() ->
-            Result = (catch couch_api_wrap:ensure_full_commit(Source)),
-            ParentPid ! {self(), Result}
-        end),
-
-    % commit tgt sync
-    TargetResult = (catch couch_api_wrap:ensure_full_commit(Target)),
-
-    SourceResult = receive
-    {SrcCommitPid, Result} ->
-        unlink(SrcCommitPid),
-        receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
-        Result;
-    {'EXIT', SrcCommitPid, Reason} ->
-        {error, Reason}
-    end,
-    case TargetResult of
-    {ok, TargetStartTime} ->
-        case SourceResult of
-        {ok, SourceStartTime} ->
-            {SourceStartTime, TargetStartTime};
-        SourceError ->
-            {source_error, SourceError}
-        end;
-    TargetError ->
-        {target_error, TargetError}
-    end.
-
-
-compare_replication_logs(SrcDoc, TgtDoc) ->
-    #doc{body={RepRecProps}} = SrcDoc,
-    #doc{body={RepRecPropsTgt}} = TgtDoc,
-    case get_value(<<"session_id">>, RepRecProps) ==
-            get_value(<<"session_id">>, RepRecPropsTgt) of
-    true ->
-        % if the records have the same session id,
-        % then we have a valid replication history
-        OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
-        OldHistory = get_value(<<"history">>, RepRecProps, []),
-        {OldSeqNum, OldHistory};
-    false ->
-        SourceHistory = get_value(<<"history">>, RepRecProps, []),
-        TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
-        ?LOG_INFO("Replication records differ. "
-                "Scanning histories to find a common ancestor.", []),
-        ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
-                [RepRecProps, RepRecPropsTgt]),
-        compare_rep_history(SourceHistory, TargetHistory)
-    end.
-
-compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
-    ?LOG_INFO("no common ancestry -- performing full replication", []),
-    {?LOWEST_SEQ, []};
-compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
-    SourceId = get_value(<<"session_id">>, S),
-    case has_session_id(SourceId, Target) of
-    true ->
-        RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
-        ?LOG_INFO("found a common replication record with source_seq ~p",
-            [RecordSeqNum]),
-        {RecordSeqNum, SourceRest};
-    false ->
-        TargetId = get_value(<<"session_id">>, T),
-        case has_session_id(TargetId, SourceRest) of
-        true ->
-            RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
-            ?LOG_INFO("found a common replication record with source_seq ~p",
-                [RecordSeqNum]),
-            {RecordSeqNum, TargetRest};
-        false ->
-            compare_rep_history(SourceRest, TargetRest)
-        end
-    end.
-
-
-has_session_id(_SessionId, []) ->
-    false;
-has_session_id(SessionId, [{Props} | Rest]) ->
-    case get_value(<<"session_id">>, Props, nil) of
-    SessionId ->
-        true;
-    _Else ->
-        has_session_id(SessionId, Rest)
-    end.
-
-
-db_monitor(#db{} = Db) ->
-    couch_db:monitor(Db);
-db_monitor(_HttpDb) ->
-    nil.
-
-
-source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
-    case (catch couch_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
-    {ok, Info} ->
-        get_value(<<"update_seq">>, Info, Seq);
-    _ ->
-        Seq
-    end;
-source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
-    {ok, Info} = couch_api_wrap:get_db_info(Db),
-    get_value(<<"update_seq">>, Info, Seq).
-
-
-update_task(State) ->
-    #rep_state{
-        current_through_seq = {_, CurSeq},
-        committed_seq = {_, CommittedSeq},
-        source_seq = SourceCurSeq,
-        stats = Stats
-    } = State,
-    couch_task_status:update([
-        {revisions_checked, Stats#rep_stats.missing_checked},
-        {missing_revisions_found, Stats#rep_stats.missing_found},
-        {docs_read, Stats#rep_stats.docs_read},
-        {docs_written, Stats#rep_stats.docs_written},
-        {doc_write_failures, Stats#rep_stats.doc_write_failures},
-        {source_seq, SourceCurSeq},
-        {checkpointed_source_seq, CommittedSeq},
-        case is_number(CurSeq) andalso is_number(SourceCurSeq) of
-        true ->
-            case SourceCurSeq of
-            0 ->
-                {progress, 0};
-            _ ->
-                {progress, (CurSeq * 100) div SourceCurSeq}
-            end;
-        false ->
-            {progress, null}
-        end
-    ]).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couchdb/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_replicator.hrl b/src/couchdb/couch_replicator.hrl
deleted file mode 100644
index 20c0bc3..0000000
--- a/src/couchdb/couch_replicator.hrl
+++ /dev/null
@@ -1,30 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_ID_VERSION, 2).
-
--record(rep, {
-    id,
-    source,
-    target,
-    options,
-    user_ctx,
-    doc_id
-}).
-
--record(rep_stats, {
-    missing_checked = 0,
-    missing_found = 0,
-    docs_read = 0,
-    docs_written = 0,
-    doc_write_failures = 0
-}).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couchdb/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_replicator_utils.erl b/src/couchdb/couch_replicator_utils.erl
deleted file mode 100644
index 6cc4db8..0000000
--- a/src/couchdb/couch_replicator_utils.erl
+++ /dev/null
@@ -1,382 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_utils).
-
--export([parse_rep_doc/2]).
--export([open_db/1, close_db/1]).
--export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
--export([replication_id/2]).
--export([sum_stats/2]).
-
--include("couch_db.hrl").
--include("couch_api_wrap.hrl").
--include("couch_replicator.hrl").
--include("../ibrowse/ibrowse.hrl").
-
--import(couch_util, [
-    get_value/2,
-    get_value/3
-]).
-
-
-parse_rep_doc({Props}, UserCtx) ->
-    ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
-    Options = make_options(Props),
-    case get_value(cancel, Options, false) andalso
-        (get_value(id, Options, nil) =/= nil) of
-    true ->
-        {ok, #rep{options = Options, user_ctx = UserCtx}};
-    false ->
-        Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
-        Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options),
-        Rep = #rep{
-            source = Source,
-            target = Target,
-            options = Options,
-            user_ctx = UserCtx,
-            doc_id = get_value(<<"_id">>, Props)
-        },
-        {ok, Rep#rep{id = replication_id(Rep)}}
-    end.
-
-
-replication_id(#rep{options = Options} = Rep) ->
-    BaseId = replication_id(Rep, ?REP_ID_VERSION),
-    {BaseId, maybe_append_options([continuous, create_target], Options)}.
-
-
-% Versioned clauses for generating replication IDs.
-% If a change is made to how replications are identified,
-% please add a new clause and increase ?REP_ID_VERSION.
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
-    {ok, HostName} = inet:gethostname(),
-    Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
-    P when is_number(P) ->
-        P;
-    _ ->
-        % On restart we might be called before the couch_httpd process is
-        % started.
-        % TODO: we might be under an SSL socket server only, or both under
-        % SSL and a non-SSL socket.
-        % ... mochiweb_socket_server:get(https, port)
-        list_to_integer(couch_config:get("httpd", "port", "5984"))
-    end,
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
-    maybe_append_filters([HostName, Port, Src, Tgt], Rep);
-
-replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
-    {ok, HostName} = inet:gethostname(),
-    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
-    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
-    maybe_append_filters([HostName, Src, Tgt], Rep).
-
-
-maybe_append_filters(Base,
-        #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
-    Base2 = Base ++
-        case get_value(filter, Options) of
-        undefined ->
-            case get_value(doc_ids, Options) of
-            undefined ->
-                [];
-            DocIds ->
-                [DocIds]
-            end;
-        Filter ->
-            [filter_code(Filter, Source, UserCtx),
-                get_value(query_params, Options, {[]})]
-        end,
-    couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
-
-
-filter_code(Filter, Source, UserCtx) ->
-    {DDocName, FilterName} =
-    case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
-    {match, [DDocName0, FilterName0]} ->
-        {DDocName0, FilterName0};
-    _ ->
-        throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
-    end,
-    Db = case (catch couch_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of
-    {ok, Db0} ->
-        Db0;
-    DbError ->
-        DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
-           [couch_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]),
-        throw({error, iolist_to_binary(DbErrorMsg)})
-    end,
-    try
-        Body = case (catch couch_api_wrap:open_doc(
-            Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
-        {ok, #doc{body = Body0}} ->
-            Body0;
-        DocError ->
-            DocErrorMsg = io_lib:format(
-                "Couldn't open document `_design/~s` from source "
-                "database `~s`: ~s", [DDocName, couch_api_wrap:db_uri(Source),
-                    couch_util:to_binary(DocError)]),
-            throw({error, iolist_to_binary(DocErrorMsg)})
-        end,
-        Code = couch_util:get_nested_json_value(
-            Body, [<<"filters">>, FilterName]),
-        re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
-    after
-        couch_api_wrap:db_close(Db)
-    end.
-
-
-maybe_append_options(Options, RepOptions) ->
-    lists:foldl(fun(Option, Acc) ->
-        Acc ++
-        case get_value(Option, RepOptions, false) of
-        true ->
-            "+" ++ atom_to_list(Option);
-        false ->
-            ""
-        end
-    end, [], Options).
-
-
-get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    case OAuth of
-    nil ->
-        {remote, Url, Headers -- DefaultHeaders};
-    #oauth{} ->
-        {remote, Url, Headers -- DefaultHeaders, OAuth}
-    end;
-get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
-    {local, DbName, UserCtx}.
-
-
-parse_rep_db({Props}, ProxyParams, Options) ->
-    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
-    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
-    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
-    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
-    DefaultHeaders = (#httpdb{})#httpdb.headers,
-    OAuth = case get_value(<<"oauth">>, AuthProps) of
-    undefined ->
-        nil;
-    {OauthProps} ->
-        #oauth{
-            consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
-            token = ?b2l(get_value(<<"token">>, OauthProps)),
-            token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
-            consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)),
-            signature_method =
-                case get_value(<<"signature_method">>, OauthProps) of
-                undefined ->        hmac_sha1;
-                <<"PLAINTEXT">> ->  plaintext;
-                <<"HMAC-SHA1">> ->  hmac_sha1;
-                <<"RSA-SHA1">> ->   rsa_sha1
-                end
-        }
-    end,
-    #httpdb{
-        url = Url,
-        oauth = OAuth,
-        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
-        ibrowse_options = lists:keysort(1,
-            [{socket_options, get_value(socket_options, Options)} |
-                ProxyParams ++ ssl_params(Url)]),
-        timeout = get_value(connection_timeout, Options),
-        http_connections = get_value(http_connections, Options),
-        retries = get_value(retries, Options)
-    };
-parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
-parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
-    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
-parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
-    DbName.
-
-
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
-    maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
-    case lists:last(Url) of
-    $/ ->
-        Url;
-    _ ->
-        Url ++ "/"
-    end.
-
-
-make_options(Props) ->
-    Options = lists:ukeysort(1, convert_options(Props)),
-    DefWorkers = couch_config:get("replicator", "worker_processes", "4"),
-    DefBatchSize = couch_config:get("replicator", "worker_batch_size", "500"),
-    DefConns = couch_config:get("replicator", "http_connections", "20"),
-    DefTimeout = couch_config:get("replicator", "connection_timeout", "30000"),
-    DefRetries = couch_config:get("replicator", "retries_per_request", "10"),
-    {ok, DefSocketOptions} = couch_util:parse_term(
-        couch_config:get("replicator", "socket_options",
-            "[{keepalive, true}, {nodelay, false}]")),
-    lists:ukeymerge(1, Options, lists:keysort(1, [
-        {connection_timeout, list_to_integer(DefTimeout)},
-        {retries, list_to_integer(DefRetries)},
-        {http_connections, list_to_integer(DefConns)},
-        {socket_options, DefSocketOptions},
-        {worker_batch_size, list_to_integer(DefBatchSize)},
-        {worker_processes, list_to_integer(DefWorkers)}
-    ])).
-
-
-convert_options([])->
-    [];
-convert_options([{<<"cancel">>, V} | R]) ->
-    [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
-        IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
-    Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
-    [{id, Id} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | R]) ->
-    [{create_target, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | R]) ->
-    [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
-    [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
-    [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
-    convert_options(R);
-convert_options([{<<"doc_ids">>, V} | R]) ->
-    % Ensure same behaviour as old replicator: accept a list of percent
-    % encoded doc IDs.
-    DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
-    [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
-    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
-    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
-    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
-    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
-    [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
-    {ok, SocketOptions} = couch_util:parse_term(V),
-    [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
-    [{since_seq, V} | convert_options(R)];
-convert_options([_ | R]) -> % skip unknown option
-    convert_options(R).
-
-
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
-    parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
-    [];
-parse_proxy_params(ProxyUrl) ->
-    #url{
-        host = Host,
-        port = Port,
-        username = User,
-        password = Passwd
-    } = ibrowse_lib:parse_url(ProxyUrl),
-    [{proxy_host, Host}, {proxy_port, Port}] ++
-        case is_list(User) andalso is_list(Passwd) of
-        false ->
-            [];
-        true ->
-            [{proxy_user, User}, {proxy_password, Passwd}]
-        end.
-
-
-ssl_params(Url) ->
-    case ibrowse_lib:parse_url(Url) of
-    #url{protocol = https} ->
-        Depth = list_to_integer(
-            couch_config:get("replicator", "ssl_certificate_max_depth", "3")
-        ),
-        VerifyCerts = couch_config:get("replicator", "verify_ssl_certificates"),
-        CertFile = couch_config:get("replicator", "cert_file", nil),
-        KeyFile = couch_config:get("replicator", "key_file", nil),
-        Password = couch_config:get("replicator", "password", nil),
-        SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
-        SslOpts1 = case CertFile /= nil andalso KeyFile /= nil of
-            true ->
-                case Password of
-                    nil -> 
-                        [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
-                    _ -> 
-                        [{certfile, CertFile}, {keyfile, KeyFile},
-                            {password, Password}] ++ SslOpts
-                end;
-            false -> SslOpts
-        end,
-        [{is_ssl, true}, {ssl_options, SslOpts1}];
-    #url{protocol = http} ->
-        []
-    end.
-
-ssl_verify_options(Value) ->
-    ssl_verify_options(Value, erlang:system_info(otp_release)).
-
-ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
-    CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
-    [{verify, verify_none}];
-ssl_verify_options(true, _OTPVersion) ->
-    CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
-    [{verify, 2}, {cacertfile, CAFile}];
-ssl_verify_options(false, _OTPVersion) ->
-    [{verify, 0}].
-
-
-open_db(#db{name = Name, user_ctx = UserCtx, options = Options}) ->
-    {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | Options]),
-    Db;
-open_db(HttpDb) ->
-    HttpDb.
-
-
-close_db(#db{} = Db) ->
-    couch_db:close(Db);
-close_db(_HttpDb) ->
-    ok.
-
-
-start_db_compaction_notifier(#db{name = DbName}, Server) ->
-    {ok, Notifier} = couch_db_update_notifier:start_link(
-        fun({compacted, DbName1}) when DbName1 =:= DbName ->
-                ok = gen_server:cast(Server, {db_compacted, DbName});
-            (_) ->
-                ok
-        end),
-    Notifier;
-start_db_compaction_notifier(_, _) ->
-    nil.
-
-
-stop_db_compaction_notifier(nil) ->
-    ok;
-stop_db_compaction_notifier(Notifier) ->
-    couch_db_update_notifier:stop(Notifier).
-
-
-sum_stats(#rep_stats{} = S1, #rep_stats{} = S2) ->
-    #rep_stats{
-        missing_checked =
-            S1#rep_stats.missing_checked + S2#rep_stats.missing_checked,
-        missing_found = S1#rep_stats.missing_found + S2#rep_stats.missing_found,
-        docs_read = S1#rep_stats.docs_read + S2#rep_stats.docs_read,
-        docs_written = S1#rep_stats.docs_written + S2#rep_stats.docs_written,
-        doc_write_failures =
-            S1#rep_stats.doc_write_failures + S2#rep_stats.doc_write_failures
-    }.


Mime
View raw message