couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From beno...@apache.org
Subject [37/57] [abbrv] [partial] inital move to rebar compilation
Date Tue, 07 Jan 2014 00:36:57 GMT
http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_httpd.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_httpd.erl b/apps/couch_replicator/src/couch_replicator_httpd.erl
new file mode 100644
index 0000000..0a21d52
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_httpd.erl
@@ -0,0 +1,66 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_httpd).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-import(couch_httpd, [
+    send_json/2,
+    send_json/3,
+    send_method_not_allowed/2
+]).
+
+-import(couch_util, [
+    to_binary/1
+]).
+
+-export([handle_req/1]).
+
+
+handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
+    couch_httpd:validate_ctype(Req, "application/json"),
+    RepDoc = {Props} = couch_httpd:json_body_obj(Req),
+    validate_rep_props(Props),
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepDoc, UserCtx),
+    case couch_replicator:replicate(Rep) of
+    {error, {Error, Reason}} ->
+        send_json(
+            Req, 500,
+            {[{error, to_binary(Error)}, {reason, to_binary(Reason)}]});
+    {error, not_found} ->
+        % Tried to cancel a replication that didn't exist.
+        send_json(Req, 404, {[{error, <<"not found">>}]});
+    {error, Reason} ->
+        send_json(Req, 500, {[{error, to_binary(Reason)}]});
+    {ok, {cancelled, RepId}} ->
+        send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
+    {ok, {continuous, RepId}} ->
+        send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
+    {ok, {HistoryResults}} ->
+        send_json(Req, {[{ok, true} | HistoryResults]})
+    end;
+
+handle_req(Req) ->
+    send_method_not_allowed(Req, "POST").
+
+validate_rep_props([]) ->
+    ok;
+validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
+    lists:foreach(fun
+        ({_,V}) when is_binary(V) -> ok;
+        ({K,_}) -> throw({bad_request,
+            <<K/binary," value must be a string.">>})
+        end, Params),
+    validate_rep_props(Rest);
+validate_rep_props([_|Rest]) ->
+    validate_rep_props(Rest).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_job_sup.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_job_sup.erl b/apps/couch_replicator/src/couch_replicator_job_sup.erl
new file mode 100644
index 0000000..484cc1a
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_job_sup.erl
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_job_sup).
+-behaviour(supervisor).
+-export([init/1, start_link/0]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+start_link() ->
+    supervisor:start_link({local,?MODULE}, ?MODULE, []).
+
+%%=============================================================================
+%% supervisor callbacks
+%%=============================================================================
+
+init([]) ->
+    {ok, {{one_for_one, 3, 10}, []}}.
+
+%%=============================================================================
+%% internal functions
+%%=============================================================================

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_js_functions.hrl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_js_functions.hrl b/apps/couch_replicator/src/couch_replicator_js_functions.hrl
new file mode 100644
index 0000000..3f1db7c
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_js_functions.hrl
@@ -0,0 +1,151 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(REP_DB_DOC_VALIDATE_FUN, <<"
+    function(newDoc, oldDoc, userCtx) {
+        function reportError(error_msg) {
+            log('Error writing document `' + newDoc._id +
+                '\\' to the replicator database: ' + error_msg);
+            throw({forbidden: error_msg});
+        }
+
+        function validateEndpoint(endpoint, fieldName) {
+            if ((typeof endpoint !== 'string') &&
+                ((typeof endpoint !== 'object') || (endpoint === null))) {
+
+                reportError('The `' + fieldName + '\\' property must exist' +
+                    ' and be either a string or an object.');
+            }
+
+            if (typeof endpoint === 'object') {
+                if ((typeof endpoint.url !== 'string') || !endpoint.url) {
+                    reportError('The url property must exist in the `' +
+                        fieldName + '\\' field and must be a non-empty string.');
+                }
+
+                if ((typeof endpoint.auth !== 'undefined') &&
+                    ((typeof endpoint.auth !== 'object') ||
+                        endpoint.auth === null)) {
+
+                    reportError('`' + fieldName +
+                        '.auth\\' must be a non-null object.');
+                }
+
+                if ((typeof endpoint.headers !== 'undefined') &&
+                    ((typeof endpoint.headers !== 'object') ||
+                        endpoint.headers === null)) {
+
+                    reportError('`' + fieldName +
+                        '.headers\\' must be a non-null object.');
+                }
+            }
+        }
+
+        var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
+        var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
+
+        if (oldDoc && !newDoc._deleted && !isReplicator &&
+            (oldDoc._replication_state === 'triggered')) {
+            reportError('Only the replicator can edit replication documents ' +
+                'that are in the triggered state.');
+        }
+
+        if (!newDoc._deleted) {
+            validateEndpoint(newDoc.source, 'source');
+            validateEndpoint(newDoc.target, 'target');
+
+            if ((typeof newDoc.create_target !== 'undefined') &&
+                (typeof newDoc.create_target !== 'boolean')) {
+
+                reportError('The `create_target\\' field must be a boolean.');
+            }
+
+            if ((typeof newDoc.continuous !== 'undefined') &&
+                (typeof newDoc.continuous !== 'boolean')) {
+
+                reportError('The `continuous\\' field must be a boolean.');
+            }
+
+            if ((typeof newDoc.doc_ids !== 'undefined') &&
+                !isArray(newDoc.doc_ids)) {
+
+                reportError('The `doc_ids\\' field must be an array of strings.');
+            }
+
+            if ((typeof newDoc.filter !== 'undefined') &&
+                ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
+
+                reportError('The `filter\\' field must be a non-empty string.');
+            }
+
+            if ((typeof newDoc.query_params !== 'undefined') &&
+                ((typeof newDoc.query_params !== 'object') ||
+                    newDoc.query_params === null)) {
+
+                reportError('The `query_params\\' field must be an object.');
+            }
+
+            if (newDoc.user_ctx) {
+                var user_ctx = newDoc.user_ctx;
+
+                if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
+                    reportError('The `user_ctx\\' property must be a ' +
+                        'non-null object.');
+                }
+
+                if (!(user_ctx.name === null ||
+                    (typeof user_ctx.name === 'undefined') ||
+                    ((typeof user_ctx.name === 'string') &&
+                        user_ctx.name.length > 0))) {
+
+                    reportError('The `user_ctx.name\\' property must be a ' +
+                        'non-empty string or null.');
+                }
+
+                if (!isAdmin && (user_ctx.name !== userCtx.name)) {
+                    reportError('The given `user_ctx.name\\' is not valid');
+                }
+
+                if (user_ctx.roles && !isArray(user_ctx.roles)) {
+                    reportError('The `user_ctx.roles\\' property must be ' +
+                        'an array of strings.');
+                }
+
+                if (!isAdmin && user_ctx.roles) {
+                    for (var i = 0; i < user_ctx.roles.length; i++) {
+                        var role = user_ctx.roles[i];
+
+                        if (typeof role !== 'string' || role.length === 0) {
+                            reportError('Roles must be non-empty strings.');
+                        }
+                        if (userCtx.roles.indexOf(role) === -1) {
+                            reportError('Invalid role (`' + role +
+                                '\\') in the `user_ctx\\'');
+                        }
+                    }
+                }
+            } else {
+                if (!isAdmin) {
+                    reportError('The `user_ctx\\' property is missing (it is ' +
+                       'optional for admins only).');
+                }
+            }
+        } else {
+            if (!isAdmin) {
+                if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
+                    reportError('Replication documents can only be deleted by ' +
+                        'admins or by the users who created them.');
+                }
+            }
+        }
+    }
+">>).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_manager.erl b/apps/couch_replicator/src/couch_replicator_manager.erl
new file mode 100644
index 0000000..8055727
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_manager.erl
@@ -0,0 +1,709 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_manager).
+-behaviour(gen_server).
+
+% public API
+-export([replication_started/1, replication_completed/2, replication_error/2]).
+
+-export([before_doc_update/2, after_doc_read/2]).
+
+% gen_server callbacks
+-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_js_functions.hrl").
+
+-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
+-define(REP_TO_STATE, couch_rep_id_to_rep_state).
+-define(INITIAL_WAIT, 2.5). % seconds
+-define(MAX_WAIT, 600).     % seconds
+-define(OWNER, <<"owner">>).
+
+-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
+
+-record(rep_state, {
+    rep,
+    starting,
+    retries_left,
+    max_retries,
+    wait = ?INITIAL_WAIT
+}).
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3,
+    to_binary/1
+]).
+
+-record(state, {
+    changes_feed_loop = nil,
+    db_notifier = nil,
+    rep_db_name = nil,
+    rep_start_pids = [],
+    max_retries
+}).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+replication_started(#rep{id = {BaseId, _} = RepId}) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        update_rep_doc(DocId, [
+            {<<"_replication_state">>, <<"triggered">>},
+            {<<"_replication_state_reason">>, undefined},
+            {<<"_replication_id">>, ?l2b(BaseId)},
+            {<<"_replication_stats">>, undefined}]),
+        ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+        ?LOG_INFO("Document `~s` triggered replication `~s`",
+            [DocId, pp_rep_id(RepId)])
+    end.
+
+
+replication_completed(#rep{id = RepId}, Stats) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        update_rep_doc(DocId, [
+            {<<"_replication_state">>, <<"completed">>},
+            {<<"_replication_state_reason">>, undefined},
+            {<<"_replication_stats">>, {Stats}}]),
+        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+        ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+            [pp_rep_id(RepId), DocId])
+    end.
+
+
+replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        update_rep_doc(DocId, [
+            {<<"_replication_state">>, <<"error">>},
+            {<<"_replication_state_reason">>, to_binary(error_reason(Error))},
+            {<<"_replication_id">>, ?l2b(BaseId)}]),
+        ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
+    end.
+
+
+init(_) ->
+    process_flag(trap_exit, true),
+    ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
+    ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
+    Server = self(),
+    ok = couch_config:register(
+        fun("replicator", "db", NewName) ->
+            ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
+        ("replicator", "max_replication_retry_count", V) ->
+            ok = gen_server:cast(Server, {set_max_retries, retries_value(V)})
+        end
+    ),
+    {Loop, RepDbName} = changes_feed_loop(),
+    {ok, #state{
+        changes_feed_loop = Loop,
+        rep_db_name = RepDbName,
+        db_notifier = db_update_notifier(),
+        max_retries = retries_value(
+            couch_config:get("replicator", "max_replication_retry_count", "10"))
+    }}.
+
+
+handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) ->
+    NewState = try
+        process_update(State, Change)
+    catch
+    _Tag:Error ->
+        {RepProps} = get_value(doc, ChangeProps),
+        DocId = get_value(<<"_id">>, RepProps),
+        rep_db_update_error(Error, DocId),
+        State
+    end,
+    {reply, ok, NewState};
+
+
+handle_call({rep_started, RepId}, _From, State) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    RepState ->
+        NewRepState = RepState#rep_state{
+            starting = false,
+            retries_left = State#state.max_retries,
+            max_retries = State#state.max_retries,
+            wait = ?INITIAL_WAIT
+        },
+        true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
+    end,
+    {reply, ok, State};
+
+handle_call({rep_complete, RepId}, _From, State) ->
+    true = ets:delete(?REP_TO_STATE, RepId),
+    {reply, ok, State};
+
+handle_call({rep_error, RepId, Error}, _From, State) ->
+    {reply, ok, replication_error(State, RepId, Error)};
+
+handle_call(Msg, From, State) ->
+    ?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
+        [Msg, From]),
+    {stop, {error, {unexpected_call, Msg}}, State}.
+
+
+handle_cast({rep_db_changed, NewName}, #state{rep_db_name = NewName} = State) ->
+    {noreply, State};
+
+handle_cast({rep_db_changed, _NewName}, State) ->
+    {noreply, restart(State)};
+
+handle_cast({rep_db_created, NewName}, #state{rep_db_name = NewName} = State) ->
+    {noreply, State};
+
+handle_cast({rep_db_created, _NewName}, State) ->
+    {noreply, restart(State)};
+
+handle_cast({set_max_retries, MaxRetries}, State) ->
+    {noreply, State#state{max_retries = MaxRetries}};
+
+handle_cast(Msg, State) ->
+    ?LOG_ERROR("Replication manager received unexpected cast ~p", [Msg]),
+    {stop, {error, {unexpected_cast, Msg}}, State}.
+
+
+handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
+    % replicator DB deleted
+    {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
+
+handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
+    ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
+    {stop, {db_update_notifier_died, Reason}, State};
+
+handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
+    % one of the replication start processes terminated successfully
+    {noreply, State#state{rep_start_pids = Pids -- [From]}};
+
+handle_info({'DOWN', _Ref, _, _, _}, State) ->
+    % From a db monitor created by a replication process. Ignore.
+    {noreply, State};
+
+handle_info(Msg, State) ->
+    ?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]),
+    {stop, {unexpected_msg, Msg}, State}.
+
+
+terminate(_Reason, State) ->
+    #state{
+        rep_start_pids = StartPids,
+        changes_feed_loop = Loop,
+        db_notifier = DbNotifier
+    } = State,
+    stop_all_replications(),
+    lists:foreach(
+        fun(Pid) ->
+            catch unlink(Pid),
+            catch exit(Pid, stop)
+        end,
+        [Loop | StartPids]),
+    true = ets:delete(?REP_TO_STATE),
+    true = ets:delete(?DOC_TO_REP),
+    couch_db_update_notifier:stop(DbNotifier).
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+changes_feed_loop() ->
+    {ok, RepDb} = ensure_rep_db_exists(),
+    RepDbName = couch_db:name(RepDb),
+    couch_db:close(RepDb),
+    Server = self(),
+    Pid = spawn_link(
+        fun() ->
+            DbOpenOptions = [{user_ctx, RepDb#db.user_ctx}, sys_db],
+            {ok, Db} = couch_db:open_int(RepDbName, DbOpenOptions),
+            ChangesFeedFun = couch_changes:handle_changes(
+                #changes_args{
+                    include_docs = true,
+                    feed = "continuous",
+                    timeout = infinity
+                },
+                {json_req, null},
+                Db
+            ),
+            ChangesFeedFun(
+                fun({change, Change, _}, _) ->
+                    case has_valid_rep_id(Change) of
+                    true ->
+                        ok = gen_server:call(
+                            Server, {rep_db_update, Change}, infinity);
+                    false ->
+                        ok
+                    end;
+                (_, _) ->
+                    ok
+                end
+            )
+        end
+    ),
+    {Pid, RepDbName}.
+
+
+has_valid_rep_id({Change}) ->
+    has_valid_rep_id(get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+    false;
+has_valid_rep_id(_Else) ->
+    true.
+
+
+db_update_notifier() ->
+    Server = self(),
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({created, DbName}) ->
+            case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
+            DbName ->
+                ok = gen_server:cast(Server, {rep_db_created, DbName});
+            _ ->
+                ok
+            end;
+        (_) ->
+            % no need to handle the 'deleted' event - the changes feed loop
+            % dies when the database is deleted
+            ok
+        end
+    ),
+    Notifier.
+
+
+restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
+    stop_all_replications(),
+    lists:foreach(
+        fun(Pid) ->
+            catch unlink(Pid),
+            catch exit(Pid, rep_db_changed)
+        end,
+        [Loop | StartPids]),
+    {NewLoop, NewRepDbName} = changes_feed_loop(),
+    State#state{
+        changes_feed_loop = NewLoop,
+        rep_db_name = NewRepDbName,
+        rep_start_pids = []
+    }.
+
+
+process_update(State, {Change}) ->
+    {RepProps} = JsonRepDoc = get_value(doc, Change),
+    DocId = get_value(<<"_id">>, RepProps),
+    case get_value(<<"deleted">>, Change, false) of
+    true ->
+        rep_doc_deleted(DocId),
+        State;
+    false ->
+        case get_value(<<"_replication_state">>, RepProps) of
+        undefined ->
+            maybe_start_replication(State, DocId, JsonRepDoc);
+        <<"triggered">> ->
+            maybe_start_replication(State, DocId, JsonRepDoc);
+        <<"completed">> ->
+            replication_complete(DocId),
+            State;
+        <<"error">> ->
+            case ets:lookup(?DOC_TO_REP, DocId) of
+            [] ->
+                maybe_start_replication(State, DocId, JsonRepDoc);
+            _ ->
+                State
+            end
+        end
+    end.
+
+
+rep_db_update_error(Error, DocId) ->
+    case Error of
+    {bad_rep_doc, Reason} ->
+        ok;
+    _ ->
+        Reason = to_binary(Error)
+    end,
+    ?LOG_ERROR("Replication manager, error processing document `~s`: ~s",
+        [DocId, Reason]),
+    update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>},
+                           {<<"_replication_state_reason">>, Reason}]).
+
+
+rep_user_ctx({RepDoc}) ->
+    case get_value(<<"user_ctx">>, RepDoc) of
+    undefined ->
+        #user_ctx{};
+    {UserCtx} ->
+        #user_ctx{
+            name = get_value(<<"name">>, UserCtx, null),
+            roles = get_value(<<"roles">>, UserCtx, [])
+        }
+    end.
+
+
+maybe_start_replication(State, DocId, RepDoc) ->
+    #rep{id = {BaseId, _} = RepId} = Rep = parse_rep_doc(RepDoc),
+    case rep_state(RepId) of
+    nil ->
+        RepState = #rep_state{
+            rep = Rep,
+            starting = true,
+            retries_left = State#state.max_retries,
+            max_retries = State#state.max_retries
+        },
+        true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
+        true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
+        ?LOG_INFO("Attempting to start replication `~s` (document `~s`).",
+            [pp_rep_id(RepId), DocId]),
+        Pid = spawn_link(fun() -> start_replication(Rep, 0) end),
+        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        State;
+    #rep_state{starting = false, rep = #rep{doc_id = OtherDocId}} ->
+        ?LOG_INFO("The replication specified by the document `~s` was already"
+            " triggered by the document `~s`", [DocId, OtherDocId]),
+        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
+        State;
+    #rep_state{starting = true, rep = #rep{doc_id = OtherDocId}} ->
+        ?LOG_INFO("The replication specified by the document `~s` is already"
+            " being triggered by the document `~s`", [DocId, OtherDocId]),
+        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
+        State
+    end.
+
+
+parse_rep_doc(RepDoc) ->
+    {ok, Rep} = try
+        couch_replicator_utils:parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
+    catch
+    throw:{error, Reason} ->
+        throw({bad_rep_doc, Reason});
+    Tag:Err ->
+        throw({bad_rep_doc, to_binary({Tag, Err})})
+    end,
+    Rep.
+
+
+maybe_tag_rep_doc(DocId, {RepProps}, RepId) ->
+    case get_value(<<"_replication_id">>, RepProps) of
+    RepId ->
+        ok;
+    _ ->
+        update_rep_doc(DocId, [{<<"_replication_id">>, RepId}])
+    end.
+
+
+start_replication(Rep, Wait) ->
+    ok = timer:sleep(Wait * 1000),
+    case (catch couch_replicator:async_replicate(Rep)) of
+    {ok, _} ->
+        ok;
+    Error ->
+        replication_error(Rep, Error)
+    end.
+
+
+replication_complete(DocId) ->
+    case ets:lookup(?DOC_TO_REP, DocId) of
+    [{DocId, {BaseId, Ext} = RepId}] ->
+        case rep_state(RepId) of
+        nil ->
+            % Prior to OTP R14B02, temporary child specs remain in
+            % in the supervisor after a worker finishes - remove them.
+            % We want to be able to start the same replication but with
+            % eventually different values for parameters that don't
+            % contribute to its ID calculation.
+            case erlang:system_info(otp_release) < "R14B02" of
+            true ->
+                spawn(fun() ->
+                    _ = supervisor:delete_child(couch_replicator_job_sup, BaseId ++ Ext)
+                end);
+            false ->
+                ok
+            end;
+        #rep_state{} ->
+            ok
+        end,
+        true = ets:delete(?DOC_TO_REP, DocId);
+    _ ->
+        ok
+    end.
+
+
+rep_doc_deleted(DocId) ->
+    case ets:lookup(?DOC_TO_REP, DocId) of
+    [{DocId, RepId}] ->
+        couch_replicator:cancel_replication(RepId),
+        true = ets:delete(?REP_TO_STATE, RepId),
+        true = ets:delete(?DOC_TO_REP, DocId),
+        ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
+            " was deleted", [pp_rep_id(RepId), DocId]);
+    [] ->
+        ok
+    end.
+
+
+replication_error(State, RepId, Error) ->
+    case rep_state(RepId) of
+    nil ->
+        State;
+    RepState ->
+        maybe_retry_replication(RepState, Error, State)
+    end.
+
+maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
+    #rep_state{
+        rep = #rep{id = RepId, doc_id = DocId},
+        max_retries = MaxRetries
+    } = RepState,
+    couch_replicator:cancel_replication(RepId),
+    true = ets:delete(?REP_TO_STATE, RepId),
+    true = ets:delete(?DOC_TO_REP, DocId),
+    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+        "~nReached maximum retry attempts (~p).",
+        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
+    State;
+
+maybe_retry_replication(RepState, Error, State) ->
+    #rep_state{
+        rep = #rep{id = RepId, doc_id = DocId} = Rep
+    } = RepState,
+    #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
+    true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
+    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+        "~nRestarting replication in ~p seconds.",
+        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
+    Pid = spawn_link(fun() -> start_replication(Rep, Wait) end),
+    State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
+
+
+stop_all_replications() ->
+    ?LOG_INFO("Stopping all ongoing replications because the replicator"
+        " database was deleted or changed", []),
+    ets:foldl(
+        fun({_, RepId}, _) ->
+            couch_replicator:cancel_replication(RepId)
+        end,
+        ok, ?DOC_TO_REP),
+    true = ets:delete_all_objects(?REP_TO_STATE),
+    true = ets:delete_all_objects(?DOC_TO_REP).
+
+
+update_rep_doc(RepDocId, KVs) ->
+    {ok, RepDb} = ensure_rep_db_exists(),
+    try
+        case couch_db:open_doc(RepDb, RepDocId, [ejson_body]) of
+        {ok, LatestRepDoc} ->
+            update_rep_doc(RepDb, LatestRepDoc, KVs);
+        _ ->
+            ok
+        end
+    catch throw:conflict ->
+        % Shouldn't happen, as by default only the role _replicator can
+        % update replication documents.
+        ?LOG_ERROR("Conflict error when updating replication document `~s`."
+            " Retrying.", [RepDocId]),
+        ok = timer:sleep(5),
+        update_rep_doc(RepDocId, KVs)
+    after
+        couch_db:close(RepDb)
+    end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+    NewRepDocBody = lists:foldl(
+        fun({K, undefined}, Body) ->
+                lists:keydelete(K, 1, Body);
+            ({<<"_replication_state">> = K, State} = KV, Body) ->
+                case get_value(K, Body) of
+                State ->
+                    Body;
+                _ ->
+                    Body1 = lists:keystore(K, 1, Body, KV),
+                    lists:keystore(
+                        <<"_replication_state_time">>, 1, Body1,
+                        {<<"_replication_state_time">>, timestamp()})
+                end;
+            ({K, _V} = KV, Body) ->
+                lists:keystore(K, 1, Body, KV)
+        end,
+        RepDocBody, KVs),
+    case NewRepDocBody of
+    RepDocBody ->
+        ok;
+    _ ->
+        % Might not succeed - when the replication doc is deleted right
+        % before this update (not an error, ignore).
+        couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
+    end.
+
+
+% RFC3339 timestamps.
+% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
+timestamp() ->
+    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
+    UTime = erlang:universaltime(),
+    LocalTime = calendar:universal_time_to_local_time(UTime),
+    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
+        calendar:datetime_to_gregorian_seconds(UTime),
+    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
+    iolist_to_binary(
+        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
+            [Year, Month, Day, Hour, Min, Sec,
+                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
+
+zone(Hr, Min) when Hr >= 0, Min >= 0 ->
+    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
+zone(Hr, Min) ->
+    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
+
+
+
+ensure_rep_db_exists() ->
+    DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+    UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
+    case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}, nologifmissing]) of
+    {ok, Db} ->
+        Db;
+    _Error ->
+        {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}])
+    end,
+    ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+    {ok, Db}.
+
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+    case couch_db:open_doc(RepDb, DDocID, []) of
+    {ok, _Doc} ->
+        ok;
+    _ ->
+        DDoc = couch_doc:from_json_obj({[
+            {<<"_id">>, DDocID},
+            {<<"language">>, <<"javascript">>},
+            {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+        ]}),
+        {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+     end.
+
+
+% pretty-print replication id
+pp_rep_id(#rep{id = RepId}) ->
+    pp_rep_id(RepId);
+pp_rep_id({Base, Extension}) ->
+    Base ++ Extension.
+
+
+rep_state(RepId) ->
+    case ets:lookup(?REP_TO_STATE, RepId) of
+    [{RepId, RepState}] ->
+        RepState;
+    [] ->
+        nil
+    end.
+
+
+error_reason({error, {Error, Reason}})
+  when is_atom(Error), is_binary(Reason) ->
+    io_lib:format("~s: ~s", [Error, Reason]);
+error_reason({error, Reason}) ->
+    Reason;
+error_reason(Reason) ->
+    Reason.
+
+
+retries_value("infinity") ->
+    infinity;
+retries_value(Value) ->
+    list_to_integer(Value).
+
+
+state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
+    Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
+    case Left of
+    infinity ->
+        State#rep_state{wait = Wait2};
+    _ ->
+        State#rep_state{retries_left = Left - 1, wait = Wait2}
+    end.
+
+
+before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
+    Doc;
+before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
+    #user_ctx{roles = Roles, name = Name} = UserCtx,
+    case lists:member(<<"_replicator">>, Roles) of
+    true ->
+        Doc;
+    false ->
+        case couch_util:get_value(?OWNER, Body) of
+        undefined ->
+            Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+        Name ->
+            Doc;
+        Other ->
+            case (catch couch_db:check_is_admin(Db)) of
+            ok when Other =:= null ->
+                Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+            ok ->
+                Doc;
+            _ ->
+                throw({forbidden, <<"Can't update replication documents",
+                    " from other users.">>})
+            end
+        end
+    end.
+
+
+after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
+    Doc;
+after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
+    #user_ctx{name = Name} = UserCtx,
+    case (catch couch_db:check_is_admin(Db)) of
+    ok ->
+        Doc;
+    _ ->
+        case couch_util:get_value(?OWNER, Body) of
+        Name ->
+            Doc;
+        _Other ->
+            Source = strip_credentials(couch_util:get_value(<<"source">>, Body)),
+            Target = strip_credentials(couch_util:get_value(<<"target">>, Body)),
+            NewBody0 = ?replace(Body, <<"source">>, Source),
+            NewBody = ?replace(NewBody0, <<"target">>, Target),
+            #doc{revs = {Pos, [_ | Revs]}} = Doc,
+            NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
+            NewRevId = couch_db:new_revid(NewDoc),
+            NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
+        end
+    end.
+
+
+strip_credentials(undefined) ->
+    undefined;
+strip_credentials(Url) when is_binary(Url) ->
+    re:replace(Url,
+        "http(s)?://(?:[^:]+):[^@]+@(.*)$",
+        "http\\1://\\2",
+        [{return, binary}]);
+strip_credentials({Props}) ->
+    {lists:keydelete(<<"oauth">>, 1, Props)}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_notifier.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_notifier.erl b/apps/couch_replicator/src/couch_replicator_notifier.erl
new file mode 100644
index 0000000..39fd68b
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_notifier.erl
@@ -0,0 +1,57 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_notifier).
+
+-behaviour(gen_event).
+
+% public API
+-export([start_link/1, stop/1, notify/1]).
+
+% gen_event callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_event/2, handle_call/2, handle_info/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+start_link(FunAcc) ->
+    couch_event_sup:start_link(couch_replication,
+        {couch_replicator_notifier, make_ref()}, FunAcc).
+
+notify(Event) ->
+    gen_event:notify(couch_replication, Event).
+
+stop(Pid) ->
+    couch_event_sup:stop(Pid).
+
+
+init(FunAcc) ->
+    {ok, FunAcc}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_event(Event, Fun) when is_function(Fun, 1) ->
+    Fun(Event),
+    {ok, Fun};
+handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
+    Acc2 = Fun(Event, Acc),
+    {ok, {Fun, Acc2}}.
+
+handle_call(_Msg, State) ->
+    {reply, ok, State}.
+
+handle_info(_Msg, State) ->
+    {ok, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_utils.erl b/apps/couch_replicator/src/couch_replicator_utils.erl
new file mode 100644
index 0000000..0baddc2
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_utils.erl
@@ -0,0 +1,396 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_utils).
+
+-export([parse_rep_doc/2]).
+-export([open_db/1, close_db/1]).
+-export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
+-export([replication_id/2]).
+-export([sum_stats/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3
+]).
+
+
+parse_rep_doc({Props}, UserCtx) ->
+    ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
+    Options = make_options(Props),
+    case get_value(cancel, Options, false) andalso
+        (get_value(id, Options, nil) =/= nil) of
+    true ->
+        {ok, #rep{options = Options, user_ctx = UserCtx}};
+    false ->
+        Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
+        Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options),
+        Rep = #rep{
+            source = Source,
+            target = Target,
+            options = Options,
+            user_ctx = UserCtx,
+            doc_id = get_value(<<"_id">>, Props, null)
+        },
+        {ok, Rep#rep{id = replication_id(Rep)}}
+    end.
+
+
+replication_id(#rep{options = Options} = Rep) ->
+    BaseId = replication_id(Rep, ?REP_ID_VERSION),
+    {BaseId, maybe_append_options([continuous, create_target], Options)}.
+
+
+% Versioned clauses for generating replication IDs.
+% If a change is made to how replications are identified,
+% please add a new clause and increase ?REP_ID_VERSION.
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 3) ->
+    UUID = couch_server:get_uuid(),
+    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    maybe_append_filters([UUID, Src, Tgt], Rep);
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
+    {ok, HostName} = inet:gethostname(),
+    Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
+    P when is_number(P) ->
+        P;
+    _ ->
+        % On restart we might be called before the couch_httpd process is
+        % started.
+        % TODO: we might be under an SSL socket server only, or both under
+        % SSL and a non-SSL socket.
+        % ... mochiweb_socket_server:get(https, port)
+        list_to_integer(couch_config:get("httpd", "port", "5984"))
+    end,
+    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    maybe_append_filters([HostName, Port, Src, Tgt], Rep);
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
+    {ok, HostName} = inet:gethostname(),
+    Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+    Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+    maybe_append_filters([HostName, Src, Tgt], Rep).
+
+
+maybe_append_filters(Base,
+        #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
+    Base2 = Base ++
+        case get_value(filter, Options) of
+        undefined ->
+            case get_value(doc_ids, Options) of
+            undefined ->
+                [];
+            DocIds ->
+                [DocIds]
+            end;
+        Filter ->
+            [filter_code(Filter, Source, UserCtx),
+                get_value(query_params, Options, {[]})]
+        end,
+    couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
+
+
+filter_code(Filter, Source, UserCtx) ->
+    {DDocName, FilterName} =
+    case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
+    {match, [DDocName0, FilterName0]} ->
+        {DDocName0, FilterName0};
+    _ ->
+        throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
+    end,
+    Db = case (catch couch_replicator_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of
+    {ok, Db0} ->
+        Db0;
+    DbError ->
+        DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
+           [couch_replicator_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]),
+        throw({error, iolist_to_binary(DbErrorMsg)})
+    end,
+    try
+        Body = case (catch couch_replicator_api_wrap:open_doc(
+            Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
+        {ok, #doc{body = Body0}} ->
+            Body0;
+        DocError ->
+            DocErrorMsg = io_lib:format(
+                "Couldn't open document `_design/~s` from source "
+                "database `~s`: ~s", [DDocName, couch_replicator_api_wrap:db_uri(Source),
+                    couch_util:to_binary(DocError)]),
+            throw({error, iolist_to_binary(DocErrorMsg)})
+        end,
+        Code = couch_util:get_nested_json_value(
+            Body, [<<"filters">>, FilterName]),
+        re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
+    after
+        couch_replicator_api_wrap:db_close(Db)
+    end.
+
+
+maybe_append_options(Options, RepOptions) ->
+    lists:foldl(fun(Option, Acc) ->
+        Acc ++
+        case get_value(Option, RepOptions, false) of
+        true ->
+            "+" ++ atom_to_list(Option);
+        false ->
+            ""
+        end
+    end, [], Options).
+
+
+get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
+    DefaultHeaders = (#httpdb{})#httpdb.headers,
+    case OAuth of
+    nil ->
+        {remote, Url, Headers -- DefaultHeaders};
+    #oauth{} ->
+        {remote, Url, Headers -- DefaultHeaders, OAuth}
+    end;
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+    {local, DbName, UserCtx}.
+
+
+parse_rep_db({Props}, ProxyParams, Options) ->
+    Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
+    {AuthProps} = get_value(<<"auth">>, Props, {[]}),
+    {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
+    Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
+    DefaultHeaders = (#httpdb{})#httpdb.headers,
+    OAuth = case get_value(<<"oauth">>, AuthProps) of
+    undefined ->
+        nil;
+    {OauthProps} ->
+        #oauth{
+            consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
+            token = ?b2l(get_value(<<"token">>, OauthProps)),
+            token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
+            consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)),
+            signature_method =
+                case get_value(<<"signature_method">>, OauthProps) of
+                undefined ->        hmac_sha1;
+                <<"PLAINTEXT">> ->  plaintext;
+                <<"HMAC-SHA1">> ->  hmac_sha1;
+                <<"RSA-SHA1">> ->   rsa_sha1
+                end
+        }
+    end,
+    #httpdb{
+        url = Url,
+        oauth = OAuth,
+        headers = lists:ukeymerge(1, Headers, DefaultHeaders),
+        ibrowse_options = lists:keysort(1,
+            [{socket_options, get_value(socket_options, Options)} |
+                ProxyParams ++ ssl_params(Url)]),
+        timeout = get_value(connection_timeout, Options),
+        http_connections = get_value(http_connections, Options),
+        retries = get_value(retries, Options)
+    };
+parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
+    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
+parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
+    parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
+parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
+    DbName.
+
+
+maybe_add_trailing_slash(Url) when is_binary(Url) ->
+    maybe_add_trailing_slash(?b2l(Url));
+maybe_add_trailing_slash(Url) ->
+    case lists:last(Url) of
+    $/ ->
+        Url;
+    _ ->
+        Url ++ "/"
+    end.
+
+
+make_options(Props) ->
+    Options = lists:ukeysort(1, convert_options(Props)),
+    DefWorkers = couch_config:get("replicator", "worker_processes", "4"),
+    DefBatchSize = couch_config:get("replicator", "worker_batch_size", "500"),
+    DefConns = couch_config:get("replicator", "http_connections", "20"),
+    DefTimeout = couch_config:get("replicator", "connection_timeout", "30000"),
+    DefRetries = couch_config:get("replicator", "retries_per_request", "10"),
+    UseCheckpoints = couch_config:get("replicator", "use_checkpoints", "true"),
+    DefCheckpointInterval = couch_config:get("replicator", "checkpoint_interval", "5000"),
+    {ok, DefSocketOptions} = couch_util:parse_term(
+        couch_config:get("replicator", "socket_options",
+            "[{keepalive, true}, {nodelay, false}]")),
+    lists:ukeymerge(1, Options, lists:keysort(1, [
+        {connection_timeout, list_to_integer(DefTimeout)},
+        {retries, list_to_integer(DefRetries)},
+        {http_connections, list_to_integer(DefConns)},
+        {socket_options, DefSocketOptions},
+        {worker_batch_size, list_to_integer(DefBatchSize)},
+        {worker_processes, list_to_integer(DefWorkers)},
+        {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
+        {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
+    ])).
+
+
+convert_options([])->
+    [];
+convert_options([{<<"cancel">>, V} | R]) ->
+    [{cancel, V} | convert_options(R)];
+convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
+        IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
+    Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
+    [{id, Id} | convert_options(R)];
+convert_options([{<<"create_target">>, V} | R]) ->
+    [{create_target, V} | convert_options(R)];
+convert_options([{<<"continuous">>, V} | R]) ->
+    [{continuous, V} | convert_options(R)];
+convert_options([{<<"filter">>, V} | R]) ->
+    [{filter, V} | convert_options(R)];
+convert_options([{<<"query_params">>, V} | R]) ->
+    [{query_params, V} | convert_options(R)];
+convert_options([{<<"doc_ids">>, null} | R]) ->
+    convert_options(R);
+convert_options([{<<"doc_ids">>, V} | R]) ->
+    % Ensure same behaviour as old replicator: accept a list of percent
+    % encoded doc IDs.
+    DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
+    [{doc_ids, DocIds} | convert_options(R)];
+convert_options([{<<"worker_processes">>, V} | R]) ->
+    [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"worker_batch_size">>, V} | R]) ->
+    [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"http_connections">>, V} | R]) ->
+    [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"connection_timeout">>, V} | R]) ->
+    [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"retries_per_request">>, V} | R]) ->
+    [{retries, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"socket_options">>, V} | R]) ->
+    {ok, SocketOptions} = couch_util:parse_term(V),
+    [{socket_options, SocketOptions} | convert_options(R)];
+convert_options([{<<"since_seq">>, V} | R]) ->
+    [{since_seq, V} | convert_options(R)];
+convert_options([{<<"use_checkpoints">>, V} | R]) ->
+    [{use_checkpoints, V} | convert_options(R)];
+convert_options([{<<"checkpoint_interval">>, V} | R]) ->
+    [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([_ | R]) -> % skip unknown option
+    convert_options(R).
+
+
+parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
+    parse_proxy_params(?b2l(ProxyUrl));
+parse_proxy_params([]) ->
+    [];
+parse_proxy_params(ProxyUrl) ->
+    #url{
+        host = Host,
+        port = Port,
+        username = User,
+        password = Passwd
+    } = ibrowse_lib:parse_url(ProxyUrl),
+    [{proxy_host, Host}, {proxy_port, Port}] ++
+        case is_list(User) andalso is_list(Passwd) of
+        false ->
+            [];
+        true ->
+            [{proxy_user, User}, {proxy_password, Passwd}]
+        end.
+
+
+ssl_params(Url) ->
+    case ibrowse_lib:parse_url(Url) of
+    #url{protocol = https} ->
+        Depth = list_to_integer(
+            couch_config:get("replicator", "ssl_certificate_max_depth", "3")
+        ),
+        VerifyCerts = couch_config:get("replicator", "verify_ssl_certificates"),
+        CertFile = couch_config:get("replicator", "cert_file", nil),
+        KeyFile = couch_config:get("replicator", "key_file", nil),
+        Password = couch_config:get("replicator", "password", nil),
+        SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
+        SslOpts1 = case CertFile /= nil andalso KeyFile /= nil of
+            true ->
+                case Password of
+                    nil ->
+                        [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
+                    _ ->
+                        [{certfile, CertFile}, {keyfile, KeyFile},
+                            {password, Password}] ++ SslOpts
+                end;
+            false -> SslOpts
+        end,
+        [{is_ssl, true}, {ssl_options, SslOpts1}];
+    #url{protocol = http} ->
+        []
+    end.
+
+ssl_verify_options(Value) ->
+    ssl_verify_options(Value, erlang:system_info(otp_release)).
+
+ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
+    CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+    [{verify, verify_peer}, {cacertfile, CAFile}];
+ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
+    [{verify, verify_none}];
+ssl_verify_options(true, _OTPVersion) ->
+    CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+    [{verify, 2}, {cacertfile, CAFile}];
+ssl_verify_options(false, _OTPVersion) ->
+    [{verify, 0}].
+
+
+open_db(#db{name = Name, user_ctx = UserCtx, options = Options}) ->
+    {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | Options]),
+    Db;
+open_db(HttpDb) ->
+    HttpDb.
+
+
+close_db(#db{} = Db) ->
+    couch_db:close(Db);
+close_db(_HttpDb) ->
+    ok.
+
+
+start_db_compaction_notifier(#db{name = DbName}, Server) ->
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({compacted, DbName1}) when DbName1 =:= DbName ->
+                ok = gen_server:cast(Server, {db_compacted, DbName});
+            (_) ->
+                ok
+        end),
+    Notifier;
+start_db_compaction_notifier(_, _) ->
+    nil.
+
+
+stop_db_compaction_notifier(nil) ->
+    ok;
+stop_db_compaction_notifier(Notifier) ->
+    couch_db_update_notifier:stop(Notifier).
+
+
+sum_stats(#rep_stats{} = S1, #rep_stats{} = S2) ->
+    #rep_stats{
+        missing_checked =
+            S1#rep_stats.missing_checked + S2#rep_stats.missing_checked,
+        missing_found = S1#rep_stats.missing_found + S2#rep_stats.missing_found,
+        docs_read = S1#rep_stats.docs_read + S2#rep_stats.docs_read,
+        docs_written = S1#rep_stats.docs_written + S2#rep_stats.docs_written,
+        doc_write_failures =
+            S1#rep_stats.doc_write_failures + S2#rep_stats.doc_write_failures
+    }.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_worker.erl b/apps/couch_replicator/src/couch_replicator_worker.erl
new file mode 100644
index 0000000..0f65900
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_worker.erl
@@ -0,0 +1,515 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_worker).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/5]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+% TODO: maybe make both buffer max sizes configurable
+-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024).   % for remote targets
+-define(DOC_BUFFER_LEN, 10).                 % for local targets, # of documents
+-define(MAX_BULK_ATT_SIZE, 64 * 1024).
+-define(MAX_BULK_ATTS_PER_DOC, 8).
+-define(STATS_DELAY, 10000000).              % 10 seconds (in microseconds)
+
+-define(inc_stat(StatPos, Stats, Inc),
+    setelement(StatPos, Stats, element(StatPos, Stats) + Inc)).
+
+-import(couch_replicator_utils, [
+    open_db/1,
+    close_db/1,
+    start_db_compaction_notifier/2,
+    stop_db_compaction_notifier/1
+]).
+-import(couch_util, [
+    to_binary/1,
+    get_value/2,
+    get_value/3
+]).
+
+
+-record(batch, {
+    docs = [],
+    size = 0
+}).
+
+-record(state, {
+    cp,
+    loop,
+    max_parallel_conns,
+    source,
+    target,
+    readers = [],
+    writer = nil,
+    pending_fetch = nil,
+    flush_waiter = nil,
+    stats = #rep_stats{},
+    source_db_compaction_notifier = nil,
+    target_db_compaction_notifier = nil,
+    batch = #batch{}
+}).
+
+
+
+start_link(Cp, #db{} = Source, Target, ChangesManager, _MaxConns) ->
+    Pid = spawn_link(fun() ->
+        erlang:put(last_stats_report, now()),
+        queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager)
+    end),
+    {ok, Pid};
+
+start_link(Cp, Source, Target, ChangesManager, MaxConns) ->
+    gen_server:start_link(
+        ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []).
+
+
+init({Cp, Source, Target, ChangesManager, MaxConns}) ->
+    process_flag(trap_exit, true),
+    Parent = self(),
+    LoopPid = spawn_link(fun() ->
+        queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+    end),
+    erlang:put(last_stats_report, now()),
+    State = #state{
+        cp = Cp,
+        max_parallel_conns = MaxConns,
+        loop = LoopPid,
+        source = open_db(Source),
+        target = open_db(Target),
+        source_db_compaction_notifier =
+            start_db_compaction_notifier(Source, self()),
+        target_db_compaction_notifier =
+            start_db_compaction_notifier(Target, self())
+    },
+    {ok, State}.
+
+
+handle_call({fetch_doc, {_Id, _Revs, _PAs} = Params}, {Pid, _} = From,
+    #state{loop = Pid, readers = Readers, pending_fetch = nil,
+        source = Src, target = Tgt, max_parallel_conns = MaxConns} = State) ->
+    case length(Readers) of
+    Size when Size < MaxConns ->
+        Reader = spawn_doc_reader(Src, Tgt, Params),
+        NewState = State#state{
+            readers = [Reader | Readers]
+        },
+        {reply, ok, NewState};
+    _ ->
+        NewState = State#state{
+            pending_fetch = {From, Params}
+        },
+        {noreply, NewState}
+    end;
+
+handle_call({batch_doc, Doc}, From, State) ->
+    gen_server:reply(From, ok),
+    {noreply, maybe_flush_docs(Doc, State)};
+
+handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) ->
+    gen_server:reply(From, ok),
+    NewStats = couch_replicator_utils:sum_stats(Stats, IncStats),
+    NewStats2 = maybe_report_stats(State#state.cp, NewStats),
+    {noreply, State#state{stats = NewStats2}};
+
+handle_call(flush, {Pid, _} = From,
+    #state{loop = Pid, writer = nil, flush_waiter = nil,
+        target = Target, batch = Batch} = State) ->
+    State2 = case State#state.readers of
+    [] ->
+        State#state{writer = spawn_writer(Target, Batch)};
+    _ ->
+        State
+    end,
+    {noreply, State2#state{flush_waiter = From}}.
+
+
+handle_cast({db_compacted, DbName},
+    #state{source = #db{name = DbName} = Source} = State) ->
+    {ok, NewSource} = couch_db:reopen(Source),
+    {noreply, State#state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+    #state{target = #db{name = DbName} = Target} = State) ->
+    {ok, NewTarget} = couch_db:reopen(Target),
+    {noreply, State#state{target = NewTarget}};
+
+handle_cast(Msg, State) ->
+    {stop, {unexpected_async_call, Msg}, State}.
+
+
+handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) ->
+    #state{
+        batch = #batch{docs = []}, readers = [], writer = nil,
+        pending_fetch = nil, flush_waiter = nil
+    } = State,
+    {stop, normal, State};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = Pid} = State) ->
+    {noreply, after_full_flush(State)};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
+    #state{
+        readers = Readers, writer = Writer, batch = Batch,
+        source = Source, target = Target,
+        pending_fetch = Fetch, flush_waiter = FlushWaiter
+    } = State,
+    case Readers -- [Pid] of
+    Readers ->
+        {noreply, State};
+    Readers2 ->
+        State2 = case Fetch of
+        nil ->
+            case (FlushWaiter =/= nil) andalso (Writer =:= nil) andalso
+                (Readers2 =:= [])  of
+            true ->
+                State#state{
+                    readers = Readers2,
+                    writer = spawn_writer(Target, Batch)
+                };
+            false ->
+                State#state{readers = Readers2}
+            end;
+        {From, FetchParams} ->
+            Reader = spawn_doc_reader(Source, Target, FetchParams),
+            gen_server:reply(From, ok),
+            State#state{
+                readers = [Reader | Readers2],
+                pending_fetch = nil
+            }
+        end,
+        {noreply, State2}
+    end;
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+   {stop, {process_died, Pid, Reason}, State}.
+
+
+terminate(_Reason, State) ->
+    close_db(State#state.source),
+    close_db(State#state.target),
+    stop_db_compaction_notifier(State#state.source_db_compaction_notifier),
+    stop_db_compaction_notifier(State#state.target_db_compaction_notifier).
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
+    ChangesManager ! {get_changes, self()},
+    receive
+    {closed, ChangesManager} ->
+        ok;
+    {changes, ChangesManager, Changes, ReportSeq} ->
+        Target2 = open_db(Target),
+        {IdRevs, Stats0} = find_missing(Changes, Target2),
+        case Source of
+        #db{} ->
+            Source2 = open_db(Source),
+            Stats = local_process_batch(
+                IdRevs, Cp, Source2, Target2, #batch{}, Stats0),
+            close_db(Source2);
+        #httpdb{} ->
+            ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
+            remote_process_batch(IdRevs, Parent),
+            {ok, Stats} = gen_server:call(Parent, flush, infinity)
+        end,
+        close_db(Target2),
+        ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
+        erlang:put(last_stats_report, now()),
+        ?LOG_DEBUG("Worker reported completion of seq ~p", [ReportSeq]),
+        queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+    end.
+
+
+local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) ->
+    Stats;
+
+local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
+    case Target of
+    #httpdb{} ->
+        ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
+    #db{} ->
+        ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size])
+    end,
+    Stats2 = flush_docs(Target, Docs),
+    Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2),
+    local_process_batch([], Cp, Source, Target, #batch{}, Stats3);
+
+local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) ->
+    {ok, {_, DocList, Stats2, _}} = fetch_doc(
+        Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}),
+    {Batch2, Stats3} = lists:foldl(
+        fun(Doc, {Batch0, Stats0}) ->
+            {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc),
+            {Batch1, couch_replicator_utils:sum_stats(Stats0, S)}
+        end,
+        {Batch, Stats2}, DocList),
+    local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3).
+
+
+remote_process_batch([], _Parent) ->
+    ok;
+
+remote_process_batch([{Id, Revs, PAs} | Rest], Parent) ->
+    % When the source is a remote database, we fetch a single document revision
+    % per HTTP request. This is mostly to facilitate retrying of HTTP requests
+    % due to network transient failures. It also helps not exceeding the maximum
+    % URL length allowed by proxies and Mochiweb.
+    lists:foreach(
+        fun(Rev) ->
+            ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity)
+        end,
+        Revs),
+    remote_process_batch(Rest, Parent).
+
+
+spawn_doc_reader(Source, Target, FetchParams) ->
+    Parent = self(),
+    spawn_link(fun() ->
+        Source2 = open_db(Source),
+        fetch_doc(
+            Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
+        close_db(Source2)
+    end).
+
+
+fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
+    try
+        couch_replicator_api_wrap:open_doc_revs(
+            Source, Id, Revs, [{atts_since, PAs}, latest], DocHandler, Acc)
+    catch
+    throw:{missing_stub, _} ->
+        ?LOG_ERROR("Retrying fetch and update of document `~s` due to out of "
+            "sync attachment stubs. Missing revisions are: ~s",
+            [Id, couch_doc:revs_to_strs(Revs)]),
+        couch_replicator_api_wrap:open_doc_revs(Source, Id, Revs, [latest], DocHandler, Acc)
+    end.
+
+
+local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
+    Stats2 = ?inc_stat(#rep_stats.docs_read, Stats, 1),
+    case batch_doc(Doc) of
+    true ->
+        {ok, {Target, [Doc | DocList], Stats2, Cp}};
+    false ->
+        ?LOG_DEBUG("Worker flushing doc with attachments", []),
+        Target2 = open_db(Target),
+        Success = (flush_doc(Target2, Doc) =:= ok),
+        close_db(Target2),
+        Stats3 = case Success of
+        true ->
+            ?inc_stat(#rep_stats.docs_written, Stats2, 1);
+        false ->
+            ?inc_stat(#rep_stats.doc_write_failures, Stats2, 1)
+        end,
+        Stats4 = maybe_report_stats(Cp, Stats3),
+        {ok, {Target, DocList, Stats4, Cp}}
+    end;
+local_doc_handler(_, Acc) ->
+    {ok, Acc}.
+
+
+remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
+    ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
+    {ok, Acc};
+remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
+    % Immediately flush documents with attachments received from a remote
+    % source. The data property of each attachment is a function that starts
+    % streaming the attachment data from the remote source, therefore it's
+    % convenient to call it ASAP to avoid ibrowse inactivity timeouts.
+    Stats = #rep_stats{docs_read = 1},
+    ?LOG_DEBUG("Worker flushing doc with attachments", []),
+    Target2 = open_db(Target),
+    Success = (flush_doc(Target2, Doc) =:= ok),
+    close_db(Target2),
+    {Result, Stats2} = case Success of
+    true ->
+        {{ok, Acc}, ?inc_stat(#rep_stats.docs_written, Stats, 1)};
+    false ->
+        {{skip, Acc}, ?inc_stat(#rep_stats.doc_write_failures, Stats, 1)}
+    end,
+    ok = gen_server:call(Parent, {add_stats, Stats2}, infinity),
+    Result;
+remote_doc_handler(_, Acc) ->
+    {ok, Acc}.
+
+
+spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
+    case {Target, Size > 0} of
+    {#httpdb{}, true} ->
+        ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
+    {#db{}, true} ->
+        ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]);
+    _ ->
+        ok
+    end,
+    Parent = self(),
+    spawn_link(
+        fun() ->
+            Target2 = open_db(Target),
+            Stats = flush_docs(Target2, DocList),
+            close_db(Target2),
+            ok = gen_server:call(Parent, {add_stats, Stats}, infinity)
+        end).
+
+
+after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
+    gen_server:reply(Waiter, {ok, Stats}),
+    erlang:put(last_stats_report, now()),
+    State#state{
+        stats = #rep_stats{},
+        flush_waiter = nil,
+        writer = nil,
+        batch = #batch{}
+    }.
+
+
+maybe_flush_docs(Doc,State) ->
+    #state{
+        target = Target, batch = Batch,
+        stats = Stats, cp = Cp
+    } = State,
+    {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc),
+    Stats2 = couch_replicator_utils:sum_stats(Stats, WStats),
+    Stats3 = ?inc_stat(#rep_stats.docs_read, Stats2, 1),
+    Stats4 = maybe_report_stats(Cp, Stats3),
+    State#state{stats = Stats4, batch = Batch2}.
+
+
+maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
+    #batch{docs = DocAcc, size = SizeAcc} = Batch,
+    case batch_doc(Doc) of
+    false ->
+        ?LOG_DEBUG("Worker flushing doc with attachments", []),
+        case flush_doc(Target, Doc) of
+        ok ->
+            {Batch, #rep_stats{docs_written = 1}};
+        _ ->
+            {Batch, #rep_stats{doc_write_failures = 1}}
+        end;
+    true ->
+        JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
+        case SizeAcc + iolist_size(JsonDoc) of
+        SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE ->
+            ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [SizeAcc2]),
+            Stats = flush_docs(Target, [JsonDoc | DocAcc]),
+            {#batch{}, Stats};
+        SizeAcc2 ->
+            {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, #rep_stats{}}
+        end
+    end;
+
+maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
+    case SizeAcc + 1 of
+    SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
+        ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [SizeAcc2]),
+        Stats = flush_docs(Target, [Doc | DocAcc]),
+        {#batch{}, Stats};
+    SizeAcc2 ->
+        {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, #rep_stats{}}
+    end.
+
+
+batch_doc(#doc{atts = []}) ->
+    true;
+batch_doc(#doc{atts = Atts}) ->
+    (length(Atts) =< ?MAX_BULK_ATTS_PER_DOC) andalso
+        lists:all(
+            fun(#att{disk_len = L, data = Data}) ->
+                (L =< ?MAX_BULK_ATT_SIZE) andalso (Data =/= stub)
+            end, Atts).
+
+
+flush_docs(_Target, []) ->
+    #rep_stats{};
+
+flush_docs(Target, DocList) ->
+    {ok, Errors} = couch_replicator_api_wrap:update_docs(
+        Target, DocList, [delay_commit], replicated_changes),
+    DbUri = couch_replicator_api_wrap:db_uri(Target),
+    lists:foreach(
+        fun({Props}) ->
+            ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`,"
+                " to target database `~s`. Error: `~s`, reason: `~s`.",
+                [get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
+                    get_value(error, Props, ""), get_value(reason, Props, "")])
+        end, Errors),
+    #rep_stats{
+        docs_written = length(DocList) - length(Errors),
+        doc_write_failures = length(Errors)
+    }.
+
+flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
+    try couch_replicator_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
+    {ok, _} ->
+        ok;
+    Error ->
+        ?LOG_ERROR("Replicator: error writing document `~s` to `~s`: ~s",
+            [Id, couch_replicator_api_wrap:db_uri(Target), couch_util:to_binary(Error)]),
+        Error
+    catch
+    throw:{missing_stub, _} = MissingStub ->
+        throw(MissingStub);
+    throw:{Error, Reason} ->
+        ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`,"
+            " to target database `~s`. Error: `~s`, reason: `~s`.",
+            [Id, couch_doc:rev_to_str({Pos, RevId}),
+                couch_replicator_api_wrap:db_uri(Target), to_binary(Error), to_binary(Reason)]),
+        {error, Error};
+    throw:Err ->
+        ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`,"
+            " to target database `~s`. Error: `~s`.",
+            [Id, couch_doc:rev_to_str({Pos, RevId}),
+                couch_replicator_api_wrap:db_uri(Target), to_binary(Err)]),
+        {error, Err}
+    end.
+
+
+find_missing(DocInfos, Target) ->
+    {IdRevs, AllRevsCount} = lists:foldr(
+        fun(#doc_info{id = Id, revs = RevsInfo}, {IdRevAcc, CountAcc}) ->
+            Revs = [Rev || #rev_info{rev = Rev} <- RevsInfo],
+            {[{Id, Revs} | IdRevAcc], CountAcc + length(Revs)}
+        end,
+        {[], 0}, DocInfos),
+    {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs),
+    MissingRevsCount = lists:foldl(
+        fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
+        0, Missing),
+    Stats = #rep_stats{
+        missing_checked = AllRevsCount,
+        missing_found = MissingRevsCount
+    },
+    {Missing, Stats}.
+
+
+maybe_report_stats(Cp, Stats) ->
+    Now = now(),
+    case timer:now_diff(erlang:get(last_stats_report), Now) >= ?STATS_DELAY of
+    true ->
+        ok = gen_server:call(Cp, {add_stats, Stats}, infinity),
+        erlang:put(last_stats_report, Now),
+        #rep_stats{};
+    false ->
+        Stats
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/test/01-load.t
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/test/01-load.t b/apps/couch_replicator/test/01-load.t
new file mode 100644
index 0000000..8bd82dd
--- /dev/null
+++ b/apps/couch_replicator/test/01-load.t
@@ -0,0 +1,37 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Test that we can load each module.
+
+main(_) ->
+    test_util:init_code_path(),
+    Modules = [
+        couch_replicator_api_wrap,
+        couch_replicator_httpc,
+        couch_replicator_httpd,
+        couch_replicator_manager,
+        couch_replicator_notifier,
+        couch_replicator,
+        couch_replicator_worker,
+        couch_replicator_utils,
+        couch_replicator_job_sup
+    ],
+
+    etap:plan(length(Modules)),
+    lists:foreach(
+        fun(Module) ->
+            etap:loaded_ok(Module, lists:concat(["Loaded: ", Module]))
+        end, Modules),
+    etap:end_tests().

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/test/02-httpc-pool.t
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/test/02-httpc-pool.t b/apps/couch_replicator/test/02-httpc-pool.t
new file mode 100755
index 0000000..a7bde6c
--- /dev/null
+++ b/apps/couch_replicator/test/02-httpc-pool.t
@@ -0,0 +1,250 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+main(_) ->
+    test_util:init_code_path(),
+
+    etap:plan(55),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    ok.
+
+
+test() ->
+    couch_server_sup:start_link(test_util:config_files()),
+    ibrowse:start(),
+
+    test_pool_full(),
+    test_worker_dead_pool_non_full(),
+    test_worker_dead_pool_full(),
+
+    couch_server_sup:stop(),
+    ok.
+
+
+test_pool_full() ->
+    Pool = spawn_pool(),
+    Client1 = spawn_client(Pool),
+    Client2 = spawn_client(Pool),
+    Client3 = spawn_client(Pool),
+
+    etap:diag("Check that we can spawn the max number of connections."),
+    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+    etap:is(ping_client(Client3), ok, "Client 3 started ok."),
+
+    Worker1 = get_client_worker(Client1, "1"),
+    Worker2 = get_client_worker(Client2, "2"),
+    Worker3 = get_client_worker(Client3, "3"),
+    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+    etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
+
+    etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
+    etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
+    etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
+
+    etap:diag("Check that client 4 blocks waiting for a worker."),
+    Client4 = spawn_client(Pool),
+    etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
+
+    etap:diag("Check that stopping a client gives up its worker."),
+    etap:is(stop_client(Client1), ok, "First client stopped."),
+
+    etap:diag("And check that our blocked client has been unblocked."),
+    etap:is(ping_client(Client4), ok, "Client 4 was unblocked."),
+
+    Worker4 = get_client_worker(Client4, "4"),
+    etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
+    etap:is(Worker4, Worker1, "Client 4 got worker that client 1 got before."),
+
+    lists:foreach(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]),
+    stop_pool(Pool).
+
+
+test_worker_dead_pool_non_full() ->
+    Pool = spawn_pool(),
+    Client1 = spawn_client(Pool),
+
+    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+    Worker1 = get_client_worker(Client1, "1"),
+    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+
+    etap:diag("Kill client's 1 worker."),
+    etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
+    etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
+
+    etap:is(stop_client(Client1), ok, "First client stopped and released its worker."),
+
+    Client2 = spawn_client(Pool),
+    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+    Worker2 = get_client_worker(Client2, "2"),
+    etap:isnt(Worker2, Worker1, "Client 2 got a different worker from client 1"),
+    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+
+    etap:is(stop_client(Client2), ok, "Second client stopped."),
+    stop_pool(Pool).
+
+
+test_worker_dead_pool_full() ->
+    Pool = spawn_pool(),
+    Client1 = spawn_client(Pool),
+    Client2 = spawn_client(Pool),
+    Client3 = spawn_client(Pool),
+
+    etap:diag("Check that we can spawn the max number of connections."),
+    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+    etap:is(ping_client(Client3), ok, "Client 3 started ok."),
+
+    Worker1 = get_client_worker(Client1, "1"),
+    Worker2 = get_client_worker(Client2, "2"),
+    Worker3 = get_client_worker(Client3, "3"),
+    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+    etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
+
+    etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
+    etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
+    etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
+
+    etap:diag("Check that client 4 blocks waiting for a worker."),
+    Client4 = spawn_client(Pool),
+    etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
+
+    etap:diag("Kill client's 1 worker."),
+    etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
+    etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
+
+    etap:diag("Check client 4 got unblocked after first worker's death"),
+    etap:is(ping_client(Client4), ok, "Client 4 not blocked anymore."),
+
+    Worker4 = get_client_worker(Client4, "4"),
+    etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
+    etap:isnt(Worker4, Worker1, "Client 4 got a worker different from client 1."),
+    etap:isnt(Worker4, Worker2, "Client 4 got a worker different from client 2."),
+    etap:isnt(Worker4, Worker3, "Client 4 got a worker different from client 3."),
+
+    etap:diag("Check that stopping client 1 is a noop."),
+    etap:is(stop_client(Client1), ok, "First client stopped."),
+
+    etap:is(is_process_alive(Worker2), true, "Client's 2 worker still alive."),
+    etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
+    etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
+
+    etap:diag("Check that client 5 blocks waiting for a worker."),
+    Client5 = spawn_client(Pool),
+    etap:is(ping_client(Client5), timeout, "Client 5 blocked while waiting."),
+
+    etap:diag("Check that stopping client 2 gives up its worker."),
+    etap:is(stop_client(Client2), ok, "Second client stopped."),
+
+    etap:diag("Now check that client 5 has been unblocked."),
+    etap:is(ping_client(Client5), ok, "Client 5 was unblocked."),
+
+    Worker5 = get_client_worker(Client5, "5"),
+    etap:is(is_process_alive(Worker5), true, "Client's 5 worker is alive."),
+    etap:isnt(Worker5, Worker1, "Client 5 got a worker different from client 1."),
+    etap:is(Worker5, Worker2, "Client 5 got same worker as client 2."),
+    etap:isnt(Worker5, Worker3, "Client 5 got a worker different from client 3."),
+    etap:isnt(Worker5, Worker4, "Client 5 got a worker different from client 4."),
+
+    etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
+    etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
+    etap:is(is_process_alive(Worker5), true, "Client's 5 worker still alive."),
+
+    lists:foreach(fun(C) -> ok = stop_client(C) end, [Client3, Client4, Client5]),
+    stop_pool(Pool).
+
+
+spawn_client(Pool) ->
+    Parent = self(),
+    Ref = make_ref(),
+    Pid = spawn(fun() ->
+        {ok, Worker} = couch_replicator_httpc_pool:get_worker(Pool),
+        loop(Parent, Ref, Worker, Pool)
+    end),
+    {Pid, Ref}.
+
+
+ping_client({Pid, Ref}) ->
+    Pid ! ping,
+    receive
+        {pong, Ref} ->
+            ok
+    after 3000 ->
+        timeout
+    end.
+
+
+get_client_worker({Pid, Ref}, ClientName) ->
+    Pid ! get_worker,
+    receive
+        {worker, Ref, Worker} ->
+            Worker
+    after 3000 ->
+        etap:bail("Timeout getting client " ++ ClientName ++ " worker.")
+    end.
+
+
+stop_client({Pid, Ref}) ->
+    Pid ! stop,
+    receive
+        {stop, Ref} ->
+            ok
+    after 3000 ->
+        timeout
+    end.
+
+
+kill_client_worker({Pid, Ref}) ->
+    Pid ! get_worker,
+    receive
+        {worker, Ref, Worker} ->
+            exit(Worker, kill),
+            ok
+    after 3000 ->
+        timeout
+    end.
+
+
+loop(Parent, Ref, Worker, Pool) ->
+    receive
+        ping ->
+            Parent ! {pong, Ref},
+            loop(Parent, Ref, Worker, Pool);
+        get_worker  ->
+            Parent ! {worker, Ref, Worker},
+            loop(Parent, Ref, Worker, Pool);
+        stop ->
+            couch_replicator_httpc_pool:release_worker(Pool, Worker),
+            Parent ! {stop, Ref}
+    end.
+
+
+spawn_pool() ->
+    Host = couch_config:get("httpd", "bind_address", "127.0.0.1"),
+    Port = couch_config:get("httpd", "port", "5984"),
+    {ok, Pool} = couch_replicator_httpc_pool:start_link(
+        "http://" ++ Host ++ ":5984", [{max_connections, 3}]),
+    Pool.
+
+
+stop_pool(Pool) ->
+    ok = couch_replicator_httpc_pool:stop(Pool).


Mime
View raw message