couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From beno...@apache.org
Subject [2/8] refactor couch_replicator. close #COUCHDB-1323 .
Date Mon, 05 Dec 2011 09:33:29 GMT
http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 40cb9a4..f321bf8 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -25,7 +25,7 @@
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 
 -include("couch_db.hrl").
--include("couch_api_wrap.hrl").
+-include("couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
 
 -import(couch_util, [
@@ -84,7 +84,7 @@ replicate(#rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep) ->
     false ->
         {ok, Listener} = rep_result_listener(RepId),
         Result = do_replication_loop(Rep),
-        couch_replication_notifier:stop(Listener),
+        couch_replicator_notifier:stop(Listener),
         Result
     end.
 
@@ -105,8 +105,8 @@ do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
 
 async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
     RepChildId = BaseId ++ Ext,
-    Source = couch_api_wrap:db_uri(Src),
-    Target = couch_api_wrap:db_uri(Tgt),
+    Source = couch_replicator_api_wrap:db_uri(Src),
+    Target = couch_replicator_api_wrap:db_uri(Tgt),
     Timeout = get_value(connection_timeout, Rep#rep.options),
     ChildSpec = {
         RepChildId,
@@ -122,13 +122,13 @@ async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
     %
     % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
     %
-    case supervisor:start_child(couch_rep_sup, ChildSpec) of
+    case supervisor:start_child(couch_replicator_job_sup, ChildSpec) of
     {ok, Pid} ->
         ?LOG_INFO("starting new replication `~s` at ~p (`~s` -> `~s`)",
             [RepChildId, Pid, Source, Target]),
         {ok, Pid};
     {error, already_present} ->
-        case supervisor:restart_child(couch_rep_sup, RepChildId) of
+        case supervisor:restart_child(couch_replicator_job_sup, RepChildId) of
         {ok, Pid} ->
             ?LOG_INFO("restarting replication `~s` at ~p (`~s` -> `~s`)",
                 [RepChildId, Pid, Source, Target]),
@@ -138,7 +138,7 @@ async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
             %% each other to start and somebody else won. Just grab
             %% the Pid by calling start_child again.
             {error, {already_started, Pid}} =
-                supervisor:start_child(couch_rep_sup, ChildSpec),
+                supervisor:start_child(couch_replicator_job_sup, ChildSpec),
             ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
                 [RepChildId, Pid, Source, Target]),
             {ok, Pid};
@@ -147,7 +147,7 @@ async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
             % Clause to deal with a change in the supervisor module introduced
             % in R14B02. For more details consult the thread at:
             %     http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
-            _ = supervisor:delete_child(couch_rep_sup, RepChildId),
+            _ = supervisor:delete_child(couch_replicator_job_sup, RepChildId),
             async_replicate(Rep);
         {error, _} = Error ->
             Error
@@ -163,7 +163,7 @@ async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
 
 rep_result_listener(RepId) ->
     ReplyTo = self(),
-    {ok, _Listener} = couch_replication_notifier:start_link(
+    {ok, _Listener} = couch_replicator_notifier:start_link(
         fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
                 ReplyTo ! Ev;
             (_) ->
@@ -183,10 +183,10 @@ wait_for_result(RepId) ->
 cancel_replication({BaseId, Extension}) ->
     FullRepId = BaseId ++ Extension,
     ?LOG_INFO("Canceling replication `~s`...", [FullRepId]),
-    case supervisor:terminate_child(couch_rep_sup, FullRepId) of
+    case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
     ok ->
         ?LOG_INFO("Replication `~s` canceled.", [FullRepId]),
-        case supervisor:delete_child(couch_rep_sup, FullRepId) of
+        case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
             ok ->
                 {ok, {cancelled, ?l2b(FullRepId)}};
             {error, not_found} ->
@@ -206,7 +206,7 @@ cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
     false ->
         {BaseId, Ext} = RepId,
         case lists:keysearch(
-            BaseId ++ Ext, 1, supervisor:which_children(couch_rep_sup)) of
+            BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
         {value, {_, Pid, _, _}} when is_pid(Pid) ->
             case (catch gen_server:call(Pid, get_details, infinity)) of
             {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
@@ -321,7 +321,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}} = Rep) ->
 
     ?LOG_DEBUG("Worker pids are: ~p", [Workers]),
 
-    couch_replication_manager:replication_started(Rep),
+    couch_replicator_manager:replication_started(Rep),
 
     {ok, State#rep_state{
             changes_queue = ChangesQueue,
@@ -458,12 +458,12 @@ code_change(_OldVsn, State, _Extra) ->
 terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
     checkpoint_history = CheckpointHistory} = State) ->
     terminate_cleanup(State),
-    couch_replication_notifier:notify({finished, RepId, CheckpointHistory}),
-    couch_replication_manager:replication_completed(Rep);
+    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
+    couch_replicator_manager:replication_completed(Rep);
 
 terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
     % cancelled replication throught ?MODULE:cancel_replication/1
-    couch_replication_notifier:notify({error, RepId, <<"cancelled">>}),
+    couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
     terminate_cleanup(State);
 
 terminate(Reason, State) ->
@@ -475,16 +475,16 @@ terminate(Reason, State) ->
     ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
         [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
     terminate_cleanup(State),
-    couch_replication_notifier:notify({error, RepId, Reason}),
-    couch_replication_manager:replication_error(Rep, Reason).
+    couch_replicator_notifier:notify({error, RepId, Reason}),
+    couch_replicator_manager:replication_error(Rep, Reason).
 
 
 terminate_cleanup(State) ->
     update_task(State),
     stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
     stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
-    couch_api_wrap:db_close(State#rep_state.source),
-    couch_api_wrap:db_close(State#rep_state.target).
+    couch_replicator_api_wrap:db_close(State#rep_state.source),
+    couch_replicator_api_wrap:db_close(State#rep_state.target).
 
 
 do_last_checkpoint(#rep_state{seqs_in_progress = [],
@@ -523,12 +523,12 @@ init_state(Rep) ->
         source = Src, target = Tgt,
         options = Options, user_ctx = UserCtx
     } = Rep,
-    {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
-    {ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
+    {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
+    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
         get_value(create_target, Options, false)),
 
-    {ok, SourceInfo} = couch_api_wrap:get_db_info(Source),
-    {ok, TargetInfo} = couch_api_wrap:get_db_info(Target),
+    {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
+    {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
 
     [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
 
@@ -538,8 +538,8 @@ init_state(Rep) ->
     #doc{body={CheckpointHistory}} = SourceLog,
     State = #rep_state{
         rep_details = Rep,
-        source_name = couch_api_wrap:db_uri(Source),
-        target_name = couch_api_wrap:db_uri(Target),
+        source_name = couch_replicator_api_wrap:db_uri(Source),
+        target_name = couch_replicator_api_wrap:db_uri(Target),
         source = Source,
         target = Target,
         history = History,
@@ -573,7 +573,7 @@ fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
     lists:reverse(Acc);
 
 fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
-    case couch_api_wrap:open_doc(Db, LogId, [ejson_body]) of
+    case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
     {error, <<"not_found">>} when Vsn > 1 ->
         OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
         fold_replication_logs(Dbs, Vsn - 1,
@@ -604,7 +604,7 @@ spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
 
 read_changes(StartSeq, Db, ChangesQueue, Options) ->
     try
-        couch_api_wrap:changes_since(Db, all_docs, StartSeq,
+        couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
             fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
                 case Id of
                 <<>> ->
@@ -613,7 +613,7 @@ read_changes(StartSeq, Db, ChangesQueue, Options) ->
                     % is impossible to GET.
                     ?LOG_ERROR("Replicator: ignoring document with empty ID in "
                         "source database `~s` (_changes sequence ~p)",
-                        [couch_api_wrap:db_uri(Db), Seq]);
+                        [couch_replicator_api_wrap:db_uri(Db), Seq]);
                 _ ->
                     ok = couch_work_queue:queue(ChangesQueue, DocInfo)
                 end,
@@ -629,12 +629,12 @@ read_changes(StartSeq, Db, ChangesQueue, Options) ->
             StartSeq ->
                 ?LOG_INFO("Retrying _changes request to source database ~s"
                     " with since=~p in ~p seconds",
-                    [couch_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
+                    [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
                 ok = timer:sleep(Db#httpdb.wait),
                 Db#httpdb{wait = 2 * Db#httpdb.wait};
             _ ->
                 ?LOG_INFO("Retrying _changes request to source database ~s"
-                    " with since=~p", [couch_api_wrap:db_uri(Db), LastSeq]),
+                    " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
                 Db
             end,
             read_changes(LastSeq, Db2, ChangesQueue, Options);
@@ -782,14 +782,14 @@ update_checkpoint(Db, Doc, DbType) ->
 
 update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
     try
-        case couch_api_wrap:update_doc(Db, Doc, [delay_commit]) of
+        case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
         {ok, PosRevId} ->
             PosRevId;
         {error, Reason} ->
             throw({checkpoint_commit_failure, Reason})
         end
     catch throw:conflict ->
-        case (catch couch_api_wrap:open_doc(Db, LogId, [ejson_body])) of
+        case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
         {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
             % This means that we were able to update successfully the
             % checkpoint doc in a previous attempt but we got a connection
@@ -810,12 +810,12 @@ commit_to_both(Source, Target) ->
     ParentPid = self(),
     SrcCommitPid = spawn_link(
         fun() ->
-            Result = (catch couch_api_wrap:ensure_full_commit(Source)),
+            Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
             ParentPid ! {self(), Result}
         end),
 
     % commit tgt sync
-    TargetResult = (catch couch_api_wrap:ensure_full_commit(Target)),
+    TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
 
     SourceResult = receive
     {SrcCommitPid, Result} ->
@@ -902,14 +902,14 @@ db_monitor(_HttpDb) ->
 
 
 source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
-    case (catch couch_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
+    case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
     {ok, Info} ->
         get_value(<<"update_seq">>, Info, Seq);
     _ ->
         Seq
     end;
 source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
-    {ok, Info} = couch_api_wrap:get_db_info(Db),
+    {ok, Info} = couch_replicator_api_wrap:get_db_info(Db),
     get_value(<<"update_seq">>, Info, Seq).
 
 

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
new file mode 100644
index 0000000..a29fe94
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -0,0 +1,775 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_api_wrap).
+
+% This module wraps the native erlang API, and allows for performing
+% operations on a remote vs. local databases via the same API.
+%
+% Notes:
+% Many options and apis aren't yet supported here, they are added as needed.
+
+-include("couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+
+-export([
+    db_open/2,
+    db_open/3,
+    db_close/1,
+    get_db_info/1,
+    update_doc/3,
+    update_doc/4,
+    update_docs/3,
+    update_docs/4,
+    ensure_full_commit/1,
+    get_missing_revs/2,
+    open_doc/3,
+    open_doc_revs/6,
+    changes_since/5,
+    db_uri/1
+    ]).
+
+-import(couch_replicator_httpc, [
+    send_req/3
+    ]).
+
+-import(couch_util, [
+    encode_doc_id/1,
+    get_value/2,
+    get_value/3
+    ]).
+
+
+db_uri(#httpdb{url = Url}) ->
+    couch_util:url_strip_password(Url);
+
+db_uri(#db{name = Name}) ->
+    db_uri(Name);
+
+db_uri(DbName) ->
+    ?b2l(DbName).
+
+
+db_open(Db, Options) ->
+    db_open(Db, Options, false).
+
+db_open(#httpdb{} = Db1, _Options, Create) ->
+    {ok, Db} = couch_replicator_httpc:setup(Db1),
+    case Create of
+    false ->
+        ok;
+    true ->
+        send_req(Db, [{method, put}], fun(_, _, _) -> ok end)
+    end,
+    send_req(Db, [{method, head}],
+        fun(200, _, _) ->
+            {ok, Db};
+        (401, _, _) ->
+            throw({unauthorized, ?l2b(db_uri(Db))});
+        (_, _, _) ->
+            throw({db_not_found, ?l2b(db_uri(Db))})
+        end);
+db_open(DbName, Options, Create) ->
+    try
+        case Create of
+        false ->
+            ok;
+        true ->
+            ok = couch_httpd:verify_is_server_admin(
+                get_value(user_ctx, Options)),
+            couch_db:create(DbName, Options)
+        end,
+        case couch_db:open(DbName, Options) of
+        {not_found, _Reason} ->
+            throw({db_not_found, DbName});
+        {ok, _Db} = Success ->
+            Success
+        end
+    catch
+    throw:{unauthorized, _} ->
+        throw({unauthorized, DbName})
+    end.
+
+db_close(#httpdb{httpc_pool = Pool}) ->
+    unlink(Pool),
+    ok = couch_replicator_httpc_pool:stop(Pool);
+db_close(DbName) ->
+    catch couch_db:close(DbName).
+
+
+get_db_info(#httpdb{} = Db) ->
+    send_req(Db, [],
+        fun(200, _, {Props}) ->
+            {ok, Props}
+        end);
+get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
+    {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+    {ok, Info} = couch_db:get_db_info(Db),
+    couch_db:close(Db),
+    {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
+
+
+ensure_full_commit(#httpdb{} = Db) ->
+    send_req(
+        Db,
+        [{method, post}, {path, "_ensure_full_commit"},
+            {headers, [{"Content-Type", "application/json"}]}],
+        fun(201, _, {Props}) ->
+            {ok, get_value(<<"instance_start_time">>, Props)};
+        (_, _, {Props}) ->
+            {error, get_value(<<"error">>, Props)}
+        end);
+ensure_full_commit(Db) ->
+    couch_db:ensure_full_commit(Db).
+
+
+get_missing_revs(#httpdb{} = Db, IdRevs) ->
+    JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]},
+    send_req(
+        Db,
+        [{method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)}],
+        fun(200, _, {Props}) ->
+            ConvertToNativeFun = fun({Id, {Result}}) ->
+                MissingRevs = couch_doc:parse_revs(
+                    get_value(<<"missing">>, Result)
+                ),
+                PossibleAncestors = couch_doc:parse_revs(
+                    get_value(<<"possible_ancestors">>, Result, [])
+                ),
+                {Id, MissingRevs, PossibleAncestors}
+            end,
+            {ok, lists:map(ConvertToNativeFun, Props)}
+        end);
+get_missing_revs(Db, IdRevs) ->
+    couch_db:get_missing_revs(Db, IdRevs).
+
+
+
+open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
+    Path = encode_doc_id(Id),
+    QArgs = options_to_query_args(
+        HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+    Self = self(),
+    Streamer = spawn_link(fun() ->
+            send_req(
+                HttpDb,
+                [{path, Path}, {qs, QArgs},
+                    {ibrowse_options, [{stream_to, {self(), once}}]},
+                    {headers, [{"Accept", "multipart/mixed"}]}],
+                fun(200, Headers, StreamDataFun) ->
+                    remote_open_doc_revs_streamer_start(Self),
+                    {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+                        get_value("Content-Type", Headers),
+                        StreamDataFun,
+                        fun mp_parse_mixed/1)
+                end),
+            unlink(Self)
+        end),
+    receive
+    {started_open_doc_revs, Ref} ->
+        receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc)
+    end;
+open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
+    {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
+    {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
+
+
+open_doc(#httpdb{} = Db, Id, Options) ->
+    send_req(
+        Db,
+        [{path, encode_doc_id(Id)}, {qs, options_to_query_args(Options, [])}],
+        fun(200, _, Body) ->
+            {ok, couch_doc:from_json_obj(Body)};
+        (_, _, {Props}) ->
+            {error, get_value(<<"error">>, Props)}
+        end);
+open_doc(Db, Id, Options) ->
+    case couch_db:open_doc(Db, Id, Options) of
+    {ok, _} = Ok ->
+        Ok;
+    {not_found, _Reason} ->
+        {error, <<"not_found">>}
+    end.
+
+
+update_doc(Db, Doc, Options) ->
+    update_doc(Db, Doc, Options, interactive_edit).
+
+update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
+    QArgs = case Type of
+    replicated_changes ->
+        [{"new_edits", "false"}];
+    _ ->
+        []
+    end ++ options_to_query_args(Options, []),
+    Boundary = couch_uuids:random(),
+    JsonBytes = ?JSON_ENCODE(
+        couch_doc:to_json_obj(
+          Doc, [revs, attachments, follows, att_encoding_info | Options])),
+    {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
+        JsonBytes, Doc#doc.atts, true),
+    Headers = case lists:member(delay_commit, Options) of
+    true ->
+        [{"X-Couch-Full-Commit", "false"}];
+    false ->
+        []
+    end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
+    Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
+    send_req(
+        HttpDb,
+        [{method, put}, {path, encode_doc_id(DocId)},
+            {qs, QArgs}, {headers, Headers}, {body, Body}],
+        fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->
+                {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
+            (409, _, _) ->
+                throw(conflict);
+            (Code, _, {Props}) ->
+                case {Code, get_value(<<"error">>, Props)} of
+                {401, <<"unauthorized">>} ->
+                    throw({unauthorized, get_value(<<"reason">>, Props)});
+                {403, <<"forbidden">>} ->
+                    throw({forbidden, get_value(<<"reason">>, Props)});
+                {412, <<"missing_stub">>} ->
+                    throw({missing_stub, get_value(<<"reason">>, Props)});
+                {_, Error} ->
+                    {error, Error}
+                end
+        end);
+update_doc(Db, Doc, Options, Type) ->
+    couch_db:update_doc(Db, Doc, Options, Type).
+
+
+update_docs(Db, DocList, Options) ->
+    update_docs(Db, DocList, Options, interactive_edit).
+
+update_docs(_Db, [], _Options, _UpdateType) ->
+    {ok, []};
+update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
+    FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
+    Prefix = case UpdateType of
+    replicated_changes ->
+        <<"{\"new_edits\":false,\"docs\":[">>;
+    interactive_edit ->
+        <<"{\"docs\":[">>
+    end,
+    Suffix = <<"]}">>,
+    % Note: nginx and other servers don't like PUT/POST requests without
+    % a Content-Length header, so we can't do a chunked transfer encoding
+    % and JSON encode each doc only before sending it through the socket.
+    {Docs, Len} = lists:mapfoldl(
+        fun(#doc{} = Doc, Acc) ->
+            Json = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
+            {Json, Acc + iolist_size(Json)};
+        (Doc, Acc) ->
+            {Doc, Acc + iolist_size(Doc)}
+        end,
+        byte_size(Prefix) + byte_size(Suffix) + length(DocList) - 1,
+        DocList),
+    BodyFun = fun(eof) ->
+            eof;
+        ([]) ->
+            {ok, Suffix, eof};
+        ([prefix | Rest]) ->
+            {ok, Prefix, Rest};
+        ([Doc]) ->
+            {ok, Doc, []};
+        ([Doc | RestDocs]) ->
+            {ok, [Doc, ","], RestDocs}
+    end,
+    Headers = [
+        {"Content-Length", Len},
+        {"Content-Type", "application/json"},
+        {"X-Couch-Full-Commit", FullCommit}
+    ],
+    send_req(
+        HttpDb,
+        [{method, post}, {path, "_bulk_docs"},
+            {body, {BodyFun, [prefix | Docs]}}, {headers, Headers}],
+        fun(201, _, Results) when is_list(Results) ->
+                {ok, bulk_results_to_errors(DocList, Results, remote)};
+           (417, _, Results) when is_list(Results) ->
+                {ok, bulk_results_to_errors(DocList, Results, remote)}
+        end);
+update_docs(Db, DocList, Options, UpdateType) ->
+    Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
+    {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
+
+
+changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq,
+    UserFun, Options) ->
+    BaseQArgs = case get_value(continuous, Options, false) of
+    false ->
+        [{"feed", "normal"}];
+    true ->
+        [{"feed", "continuous"}, {"heartbeat", "10000"}]
+    end ++ [
+        {"style", atom_to_list(Style)}, {"since", couch_util:to_list(StartSeq)}
+    ],
+    DocIds = get_value(doc_ids, Options),
+    {QArgs, Method, Body, Headers} = case DocIds of
+    undefined ->
+        QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
+        {QArgs1, get, [], Headers1};
+    _ when is_list(DocIds) ->
+        Headers2 = [{"Content-Type", "application/json"} | Headers1],
+        JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
+        {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
+    end,
+    send_req(
+        HttpDb,
+        [{method, Method}, {path, "_changes"}, {qs, QArgs},
+            {headers, Headers}, {body, Body},
+            {ibrowse_options, [{stream_to, {self(), once}}]}],
+        fun(200, _, DataStreamFun) ->
+                parse_changes_feed(Options, UserFun, DataStreamFun);
+            (405, _, _) when is_list(DocIds) ->
+                % CouchDB versions < 1.1.0 don't have the builtin _changes feed
+                % filter "_doc_ids" neither support POST
+                send_req(HttpDb, [{method, get}, {path, "_changes"},
+                    {qs, BaseQArgs}, {headers, Headers1},
+                    {ibrowse_options, [{stream_to, {self(), once}}]}],
+                    fun(200, _, DataStreamFun2) ->
+                        UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
+                            case lists:member(Id, DocIds) of
+                            true ->
+                                UserFun(DocInfo);
+                            false ->
+                                ok
+                            end
+                        end,
+                        parse_changes_feed(Options, UserFun2, DataStreamFun2)
+                    end)
+        end);
+changes_since(Db, Style, StartSeq, UserFun, Options) ->
+    Filter = case get_value(doc_ids, Options) of
+    undefined ->
+        ?b2l(get_value(filter, Options, <<>>));
+    _DocIds ->
+        "_doc_ids"
+    end,
+    Args = #changes_args{
+        style = Style,
+        since = StartSeq,
+        filter = Filter,
+        feed = case get_value(continuous, Options, false) of
+            true ->
+                "continuous";
+            false ->
+                "normal"
+        end,
+        timeout = infinity
+    },
+    QueryParams = get_value(query_params, Options, {[]}),
+    Req = changes_json_req(Db, Filter, QueryParams, Options),
+    ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db),
+    ChangesFeedFun(fun({change, Change, _}, _) ->
+            UserFun(json_to_doc_info(Change));
+        (_, _) ->
+            ok
+    end).
+
+
+% internal functions
+
+maybe_add_changes_filter_q_args(BaseQS, Options) ->
+    case get_value(filter, Options) of
+    undefined ->
+        BaseQS;
+    FilterName ->
+        {Params} = get_value(query_params, Options, {[]}),
+        [{"filter", ?b2l(FilterName)} | lists:foldl(
+            fun({K, V}, QSAcc) ->
+                Ks = couch_util:to_list(K),
+                case lists:keymember(Ks, 1, QSAcc) of
+                true ->
+                    QSAcc;
+                false ->
+                    [{Ks, couch_util:to_list(V)} | QSAcc]
+                end
+            end,
+            BaseQS, Params)]
+    end.
+
+parse_changes_feed(Options, UserFun, DataStreamFun) ->
+    case get_value(continuous, Options, false) of
+    true ->
+        continuous_changes(DataStreamFun, UserFun);
+    false ->
+        EventFun = fun(Ev) ->
+            changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, [])
+        end,
+        json_stream_parse:events(DataStreamFun, EventFun)
+    end.
+
+changes_json_req(_Db, "", _QueryParams, _Options) ->
+    {[]};
+changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
+    {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
+changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
+    {ok, Info} = couch_db:get_db_info(Db),
+    % simulate a request to db_name/_changes
+    {[
+        {<<"info">>, {Info}},
+        {<<"id">>, null},
+        {<<"method">>, 'GET'},
+        {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+        {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}},
+        {<<"headers">>, []},
+        {<<"body">>, []},
+        {<<"peer">>, <<"replicator">>},
+        {<<"form">>, []},
+        {<<"cookie">>, []},
+        {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+    ]}.
+
+
+options_to_query_args(HttpDb, Path, Options) ->
+    case lists:keytake(atts_since, 1, Options) of
+    false ->
+        options_to_query_args(Options, []);
+    {value, {atts_since, []}, Options2} ->
+        options_to_query_args(Options2, []);
+    {value, {atts_since, PAs}, Options2} ->
+        QueryArgs1 = options_to_query_args(Options2, []),
+        FullUrl = couch_replicator_httpc:full_url(
+            HttpDb, [{path, Path}, {qs, QueryArgs1}]),
+        RevList = atts_since_arg(
+            length("GET " ++ FullUrl ++ " HTTP/1.1\r\n") +
+            length("&atts_since=") + 6,  % +6 = % encoded [ and ]
+            PAs, []),
+        [{"atts_since", ?JSON_ENCODE(RevList)} | QueryArgs1]
+    end.
+
+
+options_to_query_args([], Acc) ->
+    lists:reverse(Acc);
+options_to_query_args([ejson_body | Rest], Acc) ->
+    options_to_query_args(Rest, Acc);
+options_to_query_args([delay_commit | Rest], Acc) ->
+    options_to_query_args(Rest, Acc);
+options_to_query_args([revs | Rest], Acc) ->
+    options_to_query_args(Rest, [{"revs", "true"} | Acc]);
+options_to_query_args([{open_revs, all} | Rest], Acc) ->
+    options_to_query_args(Rest, [{"open_revs", "all"} | Acc]);
+options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
+    JsonRevs = ?b2l(?JSON_ENCODE(couch_doc:revs_to_strs(Revs))),
+    options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).
+
+
+-define(MAX_URL_LEN, 7000).
+
+atts_since_arg(_UrlLen, [], Acc) ->
+    lists:reverse(Acc);
+atts_since_arg(UrlLen, [PA | Rest], Acc) ->
+    RevStr = couch_doc:rev_to_str(PA),
+    NewUrlLen = case Rest of
+    [] ->
+        % plus 2 double quotes (% encoded)
+        UrlLen + size(RevStr) + 6;
+    _ ->
+        % plus 2 double quotes and a comma (% encoded)
+        UrlLen + size(RevStr) + 9
+    end,
+    case NewUrlLen >= ?MAX_URL_LEN of
+    true ->
+        lists:reverse(Acc);
+    false ->
+        atts_since_arg(NewUrlLen, Rest, [RevStr | Acc])
+    end.
+
+
+% TODO: A less verbose, more elegant and automatic restart strategy for
+%       the exported open_doc_revs/6 function. The restart should be
+%       transparent to the caller like any other Couch API function exported
+%       by this module.
+receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) ->
+    try
+        % Left only for debugging purposes via an interactive or remote shell
+        erlang:put(open_doc_revs, {Id, Revs, Ref, Streamer}),
+        receive_docs(Streamer, Fun, Ref, Acc)
+    catch
+    error:{restart_open_doc_revs, NewRef} ->
+        receive_docs_loop(Streamer, Fun, Id, Revs, NewRef, Acc)
+    end.
+
+receive_docs(Streamer, UserFun, Ref, UserAcc) ->
+    Streamer ! {get_headers, Ref, self()},
+    receive
+    {started_open_doc_revs, NewRef} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {headers, Ref, Headers} ->
+        case get_value("content-type", Headers) of
+        {"multipart/related", _} = ContentType ->
+            case doc_from_multi_part_stream(
+                ContentType,
+                fun() -> receive_doc_data(Streamer, Ref) end,
+                Ref) of
+            {ok, Doc, Parser} ->
+                case UserFun({ok, Doc}, UserAcc) of
+                {ok, UserAcc2} ->
+                    ok;
+                {skip, UserAcc2} ->
+                    couch_doc:abort_multi_part_stream(Parser)
+                end,
+                receive_docs(Streamer, UserFun, Ref, UserAcc2)
+            end;
+        {"application/json", []} ->
+            Doc = couch_doc:from_json_obj(
+                    ?JSON_DECODE(receive_all(Streamer, Ref, []))),
+            {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
+            receive_docs(Streamer, UserFun, Ref, UserAcc2);
+        {"application/json", [{"error","true"}]} ->
+            {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
+            Rev = get_value(<<"missing">>, ErrorProps),
+            Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
+            {_, UserAcc2} = UserFun(Result, UserAcc),
+            receive_docs(Streamer, UserFun, Ref, UserAcc2)
+        end;
+    {done, Ref} ->
+        {ok, UserAcc}
+    end.
+
+
+restart_remote_open_doc_revs(Ref, NewRef) ->
+    receive
+    {body_bytes, Ref, _} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {body_done, Ref} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {done, Ref} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {headers, Ref, _} ->
+        restart_remote_open_doc_revs(Ref, NewRef)
+    after 0 ->
+        erlang:error({restart_open_doc_revs, NewRef})
+    end.
+
+
+remote_open_doc_revs_streamer_start(Parent) ->
+    receive
+    {get_headers, _Ref, Parent} ->
+        remote_open_doc_revs_streamer_start(Parent);
+    {next_bytes, _Ref, Parent} ->
+        remote_open_doc_revs_streamer_start(Parent)
+    after 0 ->
+        Parent ! {started_open_doc_revs, make_ref()}
+    end.
+
+
+receive_all(Streamer, Ref, Acc) ->
+    Streamer ! {next_bytes, Ref, self()},
+    receive
+    {started_open_doc_revs, NewRef} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {body_bytes, Ref, Bytes} ->
+        receive_all(Streamer, Ref, [Bytes | Acc]);
+    {body_done, Ref} ->
+        lists:reverse(Acc)
+    end.
+
+
+mp_parse_mixed(eof) ->
+    receive {get_headers, Ref, From} ->
+        From ! {done, Ref}
+    end;
+mp_parse_mixed({headers, H}) ->
+    receive {get_headers, Ref, From} ->
+        From ! {headers, Ref, H}
+    end,
+    fun mp_parse_mixed/1;
+mp_parse_mixed({body, Bytes}) ->
+    receive {next_bytes, Ref, From} ->
+        From ! {body_bytes, Ref, Bytes}
+    end,
+    fun mp_parse_mixed/1;
+mp_parse_mixed(body_end) ->
+    receive {next_bytes, Ref, From} ->
+        From ! {body_done, Ref};
+    {get_headers, Ref, From} ->
+        self() ! {get_headers, Ref, From}
+    end,
+    fun mp_parse_mixed/1.
+
+
+receive_doc_data(Streamer, Ref) ->
+    Streamer ! {next_bytes, Ref, self()},
+    receive
+    {body_bytes, Ref, Bytes} ->
+        {Bytes, fun() -> receive_doc_data(Streamer, Ref) end};
+    {body_done, Ref} ->
+        {<<>>, fun() -> receive_doc_data(Streamer, Ref) end}
+    end.
+
+doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
+    Self = self(),
+    Parser = spawn_link(fun() ->
+        {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+            ContentType, DataFun,
+            fun(Next) -> couch_doc:mp_parse_doc(Next, []) end),
+        unlink(Self)
+        end),
+    Parser ! {get_doc_bytes, Ref, self()},
+    receive
+    {started_open_doc_revs, NewRef} ->
+        unlink(Parser),
+        exit(Parser, kill),
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {doc_bytes, Ref, DocBytes} ->
+        Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
+        ReadAttachmentDataFun = fun() ->
+            Parser ! {get_bytes, Ref, self()},
+            receive
+            {started_open_doc_revs, NewRef} ->
+                unlink(Parser),
+                exit(Parser, kill),
+                receive {bytes, Ref, _} -> ok after 0 -> ok end,
+                restart_remote_open_doc_revs(Ref, NewRef);
+            {bytes, Ref, Bytes} ->
+                Bytes
+            end
+        end,
+        Atts2 = lists:map(
+            fun(#att{data = follows} = A) ->
+                A#att{data = ReadAttachmentDataFun};
+            (A) ->
+                A
+            end, Doc#doc.atts),
+        {ok, Doc#doc{atts = Atts2}, Parser}
+    end.
+
+
+changes_ev1(object_start, UserFun, UserAcc) ->
+    fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
+
+changes_ev2({key, <<"results">>}, UserFun, UserAcc) ->
+    fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end;
+changes_ev2(_, UserFun, UserAcc) ->
+    fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
+
+changes_ev3(array_start, UserFun, UserAcc) ->
+    fun(Ev) -> changes_ev_loop(Ev, UserFun, UserAcc) end.
+
+changes_ev_loop(object_start, UserFun, UserAcc) ->
+    fun(Ev) ->
+        json_stream_parse:collect_object(Ev,
+            fun(Obj) ->
+                UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc),
+                fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end
+            end)
+    end;
+changes_ev_loop(array_end, _UserFun, _UserAcc) ->
+    fun(_Ev) -> changes_ev_done() end.
+
+changes_ev_done() ->
+    fun(_Ev) -> changes_ev_done() end.
+
+continuous_changes(DataFun, UserFun) ->
+    {DataFun2, _, Rest} = json_stream_parse:events(
+        DataFun,
+        fun(Ev) -> parse_changes_line(Ev, UserFun) end),
+    continuous_changes(fun() -> {Rest, DataFun2} end, UserFun).
+
+parse_changes_line(object_start, UserFun) ->
+    fun(Ev) ->
+        json_stream_parse:collect_object(Ev,
+            fun(Obj) -> UserFun(json_to_doc_info(Obj)) end)
+    end.
+
+json_to_doc_info({Props}) ->
+    RevsInfo = lists:map(
+        fun({Change}) ->
+            Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)),
+            Del = (true =:= get_value(<<"deleted">>, Change)),
+            #rev_info{rev=Rev, deleted=Del}
+        end, get_value(<<"changes">>, Props)),
+    #doc_info{
+        id = get_value(<<"id">>, Props),
+        high_seq = get_value(<<"seq">>, Props),
+        revs = RevsInfo
+    }.
+
+
+bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
+    lists:reverse(lists:foldl(
+        fun({_, {ok, _}}, Acc) ->
+            Acc;
+        ({#doc{id = Id, revs = {Pos, [RevId | _]}}, Error}, Acc) ->
+            {_, Error, Reason} = couch_httpd:error_info(Error),
+            [ {[{id, Id}, {rev, rev_to_str({Pos, RevId})},
+                {error, Error}, {reason, Reason}]} | Acc ]
+        end,
+        [], lists:zip(Docs, Results)));
+
+bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) ->
+    bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit);
+
+bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) ->
+    lists:map(
+        fun({{Id, Rev}, Err}) ->
+            {_, Error, Reason} = couch_httpd:error_info(Err),
+            {[{id, Id}, {rev, rev_to_str(Rev)}, {error, Error}, {reason, Reason}]}
+        end,
+        Results);
+
+bulk_results_to_errors(_Docs, Results, remote) ->
+    lists:reverse(lists:foldl(
+        fun({Props}, Acc) ->
+            case get_value(<<"error">>, Props, get_value(error, Props)) of
+            undefined ->
+                Acc;
+            Error ->
+                Id = get_value(<<"id">>, Props, get_value(id, Props)),
+                Rev = get_value(<<"rev">>, Props, get_value(rev, Props)),
+                Reason = get_value(<<"reason">>, Props, get_value(reason, Props)),
+                [ {[{id, Id}, {rev, rev_to_str(Rev)},
+                    {error, Error}, {reason, Reason}]} | Acc ]
+            end
+        end,
+        [], Results)).
+
+
+rev_to_str({_Pos, _Id} = Rev) ->
+    couch_doc:rev_to_str(Rev);
+rev_to_str(Rev) ->
+    Rev.
+
+
+stream_doc({JsonBytes, Atts, Boundary, Len}) ->
+    case erlang:erase({doc_streamer, Boundary}) of
+    Pid when is_pid(Pid) ->
+        unlink(Pid),
+        exit(Pid, kill);
+    _ ->
+        ok
+    end,
+    Self = self(),
+    DocStreamer = spawn_link(fun() ->
+        couch_doc:doc_to_multi_part_stream(
+            Boundary, JsonBytes, Atts,
+            fun(Data) ->
+                receive {get_data, Ref, From} ->
+                    From ! {data, Ref, Data}
+                end
+            end, true),
+        unlink(Self)
+    end),
+    erlang:put({doc_streamer, Boundary}, DocStreamer),
+    {ok, <<>>, {Len, Boundary}};
+stream_doc({0, Id}) ->
+    erlang:erase({doc_streamer, Id}),
+    eof;
+stream_doc({LenLeft, Id}) when LenLeft > 0 ->
+    Ref = make_ref(),
+    erlang:get({doc_streamer, Id}) ! {get_data, Ref, self()},
+    receive {data, Ref, Data} ->
+        {ok, Data, {LenLeft - iolist_size(Data), Id}}
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_api_wrap.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/src/couch_replicator_api_wrap.hrl
new file mode 100644
index 0000000..1a6f27a
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.hrl
@@ -0,0 +1,36 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+
+-record(httpdb, {
+    url,
+    oauth = nil,
+    headers = [
+        {"Accept", "application/json"},
+        {"User-Agent", "CouchDB/" ++ couch_server:get_version()}
+    ],
+    timeout,            % milliseconds
+    ibrowse_options = [],
+    retries = 10,
+    wait = 250,         % milliseconds
+    httpc_pool = nil,
+    http_connections
+}).
+
+-record(oauth, {
+    consumer_key,
+    token,
+    token_secret,
+    consumer_secret,
+    signature_method
+}).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
new file mode 100644
index 0000000..6804448
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -0,0 +1,286 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_httpc).
+
+-include("couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-export([setup/1]).
+-export([send_req/3]).
+-export([full_url/2]).
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3
+]).
+
+-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
+-define(MAX_WAIT, 5 * 60 * 1000).
+
+
+setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
+    {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
+    {ok, Db#httpdb{httpc_pool = Pid}}.
+
+
+send_req(HttpDb, Params1, Callback) ->
+    Params2 = ?replace(Params1, qs,
+        [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
+    Params = ?replace(Params2, ibrowse_options,
+        lists:keysort(1, get_value(ibrowse_options, Params2, []))),
+    {Worker, Response} = send_ibrowse_req(HttpDb, Params),
+    process_response(Response, Worker, HttpDb, Params, Callback).
+
+
+send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
+    Method = get_value(method, Params, get),
+    UserHeaders = lists:keysort(1, get_value(headers, Params, [])),
+    Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders),
+    Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
+    Url = full_url(HttpDb, Params),
+    Body = get_value(body, Params, []),
+    case get_value(path, Params) of
+    "_changes" ->
+        {ok, Worker} = ibrowse:spawn_link_worker_process(Url);
+    _ ->
+        {ok, Worker} = couch_replicator_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool)
+    end,
+    IbrowseOptions = [
+        {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} |
+        lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
+            HttpDb#httpdb.ibrowse_options)
+    ],
+    Response = ibrowse:send_req_direct(
+        Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity),
+    {Worker, Response}.
+
+
+process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
+    send_req(HttpDb, Params, Callback);
+
+process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
+    % ibrowse worker terminated because remote peer closed the socket
+    % -> not an error
+    send_req(HttpDb, Params, Cb);
+
+process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
+    process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
+
+process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
+    release_worker(Worker, HttpDb),
+    case list_to_integer(Code) of
+    Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
+        EJson = case Body of
+        <<>> ->
+            null;
+        Json ->
+            ?JSON_DECODE(Json)
+        end,
+        Callback(Ok, Headers, EJson);
+    R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+        do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
+    Error ->
+        maybe_retry({code, Error}, Worker, HttpDb, Params, Callback)
+    end;
+
+process_response(Error, Worker, HttpDb, Params, Callback) ->
+    maybe_retry(Error, Worker, HttpDb, Params, Callback).
+
+
+process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
+    receive
+    {ibrowse_async_headers, ReqId, Code, Headers} ->
+        case list_to_integer(Code) of
+        Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
+            StreamDataFun = fun() ->
+                stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
+            end,
+            ibrowse:stream_next(ReqId),
+            try
+                Ret = Callback(Ok, Headers, StreamDataFun),
+                release_worker(Worker, HttpDb),
+                clean_mailbox_req(ReqId),
+                Ret
+            catch throw:{maybe_retry_req, Err} ->
+                clean_mailbox_req(ReqId),
+                maybe_retry(Err, Worker, HttpDb, Params, Callback)
+            end;
+        R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+            do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
+        Error ->
+            report_error(Worker, HttpDb, Params, {code, Error})
+        end;
+    {ibrowse_async_response, ReqId, {error, _} = Error} ->
+        maybe_retry(Error, Worker, HttpDb, Params, Callback)
+    after HttpDb#httpdb.timeout + 500 ->
+        % Note: ibrowse should always reply with timeouts, but this doesn't
+        % seem to be always true when there's a very high rate of requests
+        % and many open connections.
+        maybe_retry(timeout, Worker, HttpDb, Params, Callback)
+    end.
+
+
+clean_mailbox_req(ReqId) ->
+    receive
+    {ibrowse_async_response, ReqId, _} ->
+        clean_mailbox_req(ReqId);
+    {ibrowse_async_response_end, ReqId} ->
+        clean_mailbox_req(ReqId)
+    after 0 ->
+        ok
+    end.
+
+
+release_worker(Worker, #httpdb{httpc_pool = Pool}) ->
+    ok = couch_replicator_httpc_pool:release_worker(Pool, Worker).
+
+
+maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
+    report_error(Worker, HttpDb, Params, {error, Error});
+
+maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+    Params, Cb) ->
+    release_worker(Worker, HttpDb),
+    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+    ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~s",
+        [Method, Url, Wait / 1000, error_cause(Error)]),
+    ok = timer:sleep(Wait),
+    Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
+    send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, Params, Cb).
+
+
+report_error(Worker, HttpDb, Params, Error) ->
+    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+    do_report_error(Url, Method, Error),
+    release_worker(Worker, HttpDb),
+    exit({http_request_failed, Method, Url, Error}).
+
+
+do_report_error(Url, Method, {code, Code}) ->
+    ?LOG_ERROR("Replicator, request ~s to ~p failed. The received "
+        "HTTP error code is ~p", [Method, Url, Code]);
+
+do_report_error(FullUrl, Method, Error) ->
+    ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~s",
+        [Method, FullUrl, error_cause(Error)]).
+
+
+error_cause({error, Cause}) ->
+    lists:flatten(io_lib:format("~p", [Cause]));
+error_cause(Cause) ->
+    lists:flatten(io_lib:format("~p", [Cause])).
+
+
+stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
+    receive
+    {ibrowse_async_response, ReqId, {error, Error}} ->
+        throw({maybe_retry_req, Error});
+    {ibrowse_async_response, ReqId, <<>>} ->
+        ibrowse:stream_next(ReqId),
+        stream_data_self(HttpDb, Params, Worker, ReqId, Cb);
+    {ibrowse_async_response, ReqId, Data} ->
+        ibrowse:stream_next(ReqId),
+        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
+    {ibrowse_async_response_end, ReqId} ->
+        {<<>>, fun() -> throw({maybe_retry_req, more_data_expected}) end}
+    after T + 500 ->
+        % Note: ibrowse should always reply with timeouts, but this doesn't
+        % seem to be always true when there's a very high rate of requests
+        % and many open connections.
+        throw({maybe_retry_req, timeout})
+    end.
+
+
+full_url(#httpdb{url = BaseUrl}, Params) ->
+    Path = get_value(path, Params, []),
+    QueryArgs = get_value(qs, Params, []),
+    BaseUrl ++ Path ++ query_args_to_string(QueryArgs, []).
+
+
+query_args_to_string([], []) ->
+    "";
+query_args_to_string([], Acc) ->
+    "?" ++ string:join(lists:reverse(Acc), "&");
+query_args_to_string([{K, V} | Rest], Acc) ->
+    query_args_to_string(Rest, [K ++ "=" ++ couch_httpd:quote(V) | Acc]).
+
+
+oauth_header(#httpdb{oauth = nil}, _ConnParams) ->
+    [];
+oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
+    Consumer = {
+        OAuth#oauth.consumer_key,
+        OAuth#oauth.consumer_secret,
+        OAuth#oauth.signature_method
+    },
+    Method = case get_value(method, ConnParams, get) of
+    get -> "GET";
+    post -> "POST";
+    put -> "PUT";
+    head -> "HEAD"
+    end,
+    QSL = get_value(qs, ConnParams, []),
+    OAuthParams = oauth:signed_params(Method,
+        BaseUrl ++ get_value(path, ConnParams, []),
+        QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL,
+    [{"Authorization",
+        "OAuth " ++ oauth_uri:params_to_header_string(OAuthParams)}].
+
+
+do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
+    release_worker(Worker, HttpDb),
+    RedirectUrl = redirect_url(Headers, Url),
+    {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
+    send_req(HttpDb2, Params2, Cb).
+
+
+redirect_url(RespHeaders, OrigUrl) ->
+    MochiHeaders = mochiweb_headers:make(RespHeaders),
+    RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
+    #url{
+        host = Host,
+        host_type = HostType,
+        port = Port,
+        path = Path,  % includes query string
+        protocol = Proto
+    } = ibrowse_lib:parse_url(RedUrl),
+    #url{
+        username = User,
+        password = Passwd
+    } = ibrowse_lib:parse_url(OrigUrl),
+    Creds = case is_list(User) andalso is_list(Passwd) of
+    true ->
+        User ++ ":" ++ Passwd ++ "@";
+    false ->
+        []
+    end,
+    HostPart = case HostType of
+    ipv6_address ->
+        "[" ++ Host ++ "]";
+    _ ->
+        Host
+    end,
+    atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++
+        integer_to_list(Port) ++ Path.
+
+after_redirect(RedirectUrl, 303, HttpDb, Params) ->
+    after_redirect(RedirectUrl, HttpDb, ?replace(Params, method, get));
+after_redirect(RedirectUrl, _Code, HttpDb, Params) ->
+    after_redirect(RedirectUrl, HttpDb, Params).
+
+after_redirect(RedirectUrl, HttpDb, Params) ->
+    Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
+    {HttpDb#httpdb{url = RedirectUrl}, Params2}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_httpc_pool.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
new file mode 100644
index 0000000..b065b7c
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
@@ -0,0 +1,138 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_httpc_pool).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/2, stop/1]).
+-export([get_worker/1, release_worker/2]).
+
+% gen_server API
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3
+]).
+
+-record(state, {
+    url,
+    limit,                  % max # of workers allowed
+    free = [],              % free workers (connections)
+    busy = [],              % busy workers (connections)
+    waiting = queue:new()   % blocked clients waiting for a worker
+}).
+
+
+start_link(Url, Options) ->
+    gen_server:start_link(?MODULE, {Url, Options}, []).
+
+
+stop(Pool) ->
+    ok = gen_server:call(Pool, stop, infinity).
+
+
+get_worker(Pool) ->
+    {ok, _Worker} = gen_server:call(Pool, get_worker, infinity).
+
+
+release_worker(Pool, Worker) ->
+    ok = gen_server:cast(Pool, {release_worker, Worker}).
+
+
+init({Url, Options}) ->
+    process_flag(trap_exit, true),
+    State = #state{
+        url = Url,
+        limit = get_value(max_connections, Options)
+    },
+    {ok, State}.
+
+
+handle_call(get_worker, From, #state{waiting = Waiting} = State) ->
+    #state{url = Url, limit = Limit, busy = Busy, free = Free} = State,
+    case length(Busy) >= Limit of
+    true ->
+        {noreply, State#state{waiting = queue:in(From, Waiting)}};
+    false ->
+        case Free of
+        [] ->
+           {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
+           Free2 = Free;
+        [Worker | Free2] ->
+           ok
+        end,
+        NewState = State#state{free = Free2, busy = [Worker | Busy]},
+        {reply, {ok, Worker}, NewState}
+    end;
+
+handle_call(stop, _From, State) ->
+    {stop, normal, ok, State}.
+
+
+handle_cast({release_worker, Worker}, #state{waiting = Waiting} = State) ->
+    case is_process_alive(Worker) andalso
+        lists:member(Worker, State#state.busy) of
+    true ->
+        case queue:out(Waiting) of
+        {empty, Waiting2} ->
+            Busy2 = State#state.busy -- [Worker],
+            Free2 = [Worker | State#state.free];
+        {{value, From}, Waiting2} ->
+            gen_server:reply(From, {ok, Worker}),
+            Busy2 = State#state.busy,
+            Free2 = State#state.free
+        end,
+        NewState = State#state{
+           busy = Busy2,
+           free = Free2,
+           waiting = Waiting2
+        },
+        {noreply, NewState};
+   false ->
+        {noreply, State}
+   end.
+
+
+handle_info({'EXIT', Pid, _Reason}, #state{busy = Busy, free = Free} = State) ->
+    case Free -- [Pid] of
+    Free ->
+        case Busy -- [Pid] of
+        Busy ->
+            {noreply, State};
+        Busy2 ->
+            case queue:out(State#state.waiting) of
+            {empty, _} ->
+                {noreply, State#state{busy = Busy2}};
+            {{value, From}, Waiting2} ->
+                {ok, Worker} = ibrowse:spawn_link_worker_process(State#state.url),
+                gen_server:reply(From, {ok, Worker}),
+                {noreply, State#state{busy = [Worker | Busy2], waiting = Waiting2}}
+            end
+        end;
+    Free2 ->
+        {noreply, State#state{free = Free2}}
+    end.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+terminate(_Reason, State) ->
+    lists:foreach(fun ibrowse_http_client:stop/1, State#state.free),
+    lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).
+

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_httpd.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl
new file mode 100644
index 0000000..eccf885
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_httpd.erl
@@ -0,0 +1,66 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_httpd).
+
+-include("couch_db.hrl").
+
+-import(couch_httpd, [
+    send_json/2,
+    send_json/3,
+    send_method_not_allowed/2
+]).
+
+-import(couch_util, [
+    to_binary/1
+]).
+
+-export([handle_req/1]).
+
+
+handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
+    couch_httpd:validate_ctype(Req, "application/json"),
+    RepDoc = {Props} = couch_httpd:json_body_obj(Req),
+    validate_rep_props(Props),
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepDoc, UserCtx),
+    case couch_replicator:replicate(Rep) of
+    {error, {Error, Reason}} ->
+        send_json(
+            Req, 404,
+            {[{error, to_binary(Error)}, {reason, to_binary(Reason)}]});
+    {error, not_found} ->
+        % Tried to cancel a replication that didn't exist.
+        send_json(Req, 404, {[{error, <<"not found">>}]});
+    {error, Reason} ->
+        send_json(Req, 500, {[{error, to_binary(Reason)}]});
+    {ok, {cancelled, RepId}} ->
+        send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
+    {ok, {continuous, RepId}} ->
+        send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
+    {ok, {HistoryResults}} ->
+        send_json(Req, {[{ok, true} | HistoryResults]})
+    end;
+
+handle_req(Req) ->
+    send_method_not_allowed(Req, "POST").
+
+validate_rep_props([]) ->
+    ok;
+validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
+    lists:foreach(fun
+        ({_,V}) when is_binary(V) -> ok;
+        ({K,_}) -> throw({bad_request,
+            <<K/binary," value must be a string.">>})
+        end, Params),
+    validate_rep_props(Rest);
+validate_rep_props([_|Rest]) ->
+    validate_rep_props(Rest).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_job_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_job_sup.erl b/src/couch_replicator/src/couch_replicator_job_sup.erl
new file mode 100644
index 0000000..e8a7b96
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_job_sup.erl
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_job_sup).
+-behaviour(supervisor).
+-export([init/1, start_link/0]).
+
+-include("couch_db.hrl").
+
+start_link() ->
+    supervisor:start_link({local,?MODULE}, ?MODULE, []).
+
+%%=============================================================================
+%% supervisor callbacks
+%%=============================================================================
+
+init([]) ->
+    {ok, {{one_for_one, 3, 10}, []}}.
+
+%%=============================================================================
+%% internal functions
+%%=============================================================================

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_manager.erl b/src/couch_replicator/src/couch_replicator_manager.erl
new file mode 100644
index 0000000..c61b27b
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_manager.erl
@@ -0,0 +1,626 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_manager).
+-behaviour(gen_server).
+
+% public API
+-export([replication_started/1, replication_completed/1, replication_error/2]).
+
+% gen_server callbacks
+-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_js_functions.hrl").
+
+-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
+-define(REP_TO_STATE, couch_rep_id_to_rep_state).
+-define(INITIAL_WAIT, 2.5). % seconds
+-define(MAX_WAIT, 600).     % seconds
+
+-record(rep_state, {
+    rep,
+    starting,
+    retries_left,
+    max_retries,
+    wait = ?INITIAL_WAIT
+}).
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3,
+    to_binary/1
+]).
+
+-record(state, {
+    changes_feed_loop = nil,
+    db_notifier = nil,
+    rep_db_name = nil,
+    rep_start_pids = [],
+    max_retries
+}).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+replication_started(#rep{id = {BaseId, _} = RepId}) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        update_rep_doc(DocId, [
+            {<<"_replication_state">>, <<"triggered">>},
+            {<<"_replication_id">>, ?l2b(BaseId)}]),
+        ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+        ?LOG_INFO("Document `~s` triggered replication `~s`",
+            [DocId, pp_rep_id(RepId)])
+    end.
+
+
+replication_completed(#rep{id = RepId}) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
+        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+        ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+            [pp_rep_id(RepId), DocId])
+    end.
+
+
+replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        % TODO: maybe add error reason to replication document
+        update_rep_doc(DocId, [
+            {<<"_replication_state">>, <<"error">>},
+            {<<"_replication_id">>, ?l2b(BaseId)}]),
+        ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
+    end.
+
+
+init(_) ->
+    process_flag(trap_exit, true),
+    ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
+    ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
+    Server = self(),
+    ok = couch_config:register(
+        fun("replicator", "db", NewName) ->
+            ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
+        ("replicator", "max_replication_retry_count", V) ->
+            ok = gen_server:cast(Server, {set_max_retries, retries_value(V)})
+        end
+    ),
+    {Loop, RepDbName} = changes_feed_loop(),
+    {ok, #state{
+        changes_feed_loop = Loop,
+        rep_db_name = RepDbName,
+        db_notifier = db_update_notifier(),
+        max_retries = retries_value(
+            couch_config:get("replicator", "max_replication_retry_count", "10"))
+    }}.
+
+
+handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) ->
+    NewState = try
+        process_update(State, Change)
+    catch
+    _Tag:Error ->
+        {RepProps} = get_value(doc, ChangeProps),
+        DocId = get_value(<<"_id">>, RepProps),
+        rep_db_update_error(Error, DocId),
+        State
+    end,
+    {reply, ok, NewState};
+
+
+handle_call({rep_started, RepId}, _From, State) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    RepState ->
+        NewRepState = RepState#rep_state{
+            starting = false,
+            retries_left = State#state.max_retries,
+            max_retries = State#state.max_retries,
+            wait = ?INITIAL_WAIT
+        },
+        true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
+    end,
+    {reply, ok, State};
+
+handle_call({rep_complete, RepId}, _From, State) ->
+    true = ets:delete(?REP_TO_STATE, RepId),
+    {reply, ok, State};
+
+handle_call({rep_error, RepId, Error}, _From, State) ->
+    {reply, ok, replication_error(State, RepId, Error)};
+
+handle_call(Msg, From, State) ->
+    ?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
+        [Msg, From]),
+    {stop, {error, {unexpected_call, Msg}}, State}.
+
+
+handle_cast({rep_db_changed, NewName}, #state{rep_db_name = NewName} = State) ->
+    {noreply, State};
+
+handle_cast({rep_db_changed, _NewName}, State) ->
+    {noreply, restart(State)};
+
+handle_cast({rep_db_created, NewName}, #state{rep_db_name = NewName} = State) ->
+    {noreply, State};
+
+handle_cast({rep_db_created, _NewName}, State) ->
+    {noreply, restart(State)};
+
+handle_cast({set_max_retries, MaxRetries}, State) ->
+    {noreply, State#state{max_retries = MaxRetries}};
+
+handle_cast(Msg, State) ->
+    ?LOG_ERROR("Replication manager received unexpected cast ~p", [Msg]),
+    {stop, {error, {unexpected_cast, Msg}}, State}.
+
+
+handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
+    % replicator DB deleted
+    {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
+
+handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
+    ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
+    {stop, {db_update_notifier_died, Reason}, State};
+
+handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
+    % one of the replication start processes terminated successfully
+    {noreply, State#state{rep_start_pids = Pids -- [From]}};
+
+handle_info({'DOWN', _Ref, _, _, _}, State) ->
+    % From a db monitor created by a replication process. Ignore.
+    {noreply, State};
+
+handle_info(Msg, State) ->
+    ?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]),
+    {stop, {unexpected_msg, Msg}, State}.
+
+
+terminate(_Reason, State) ->
+    #state{
+        rep_start_pids = StartPids,
+        changes_feed_loop = Loop,
+        db_notifier = DbNotifier
+    } = State,
+    stop_all_replications(),
+    lists:foreach(
+        fun(Pid) ->
+            catch unlink(Pid),
+            catch exit(Pid, stop)
+        end,
+        [Loop | StartPids]),
+    true = ets:delete(?REP_TO_STATE),
+    true = ets:delete(?DOC_TO_REP),
+    couch_db_update_notifier:stop(DbNotifier).
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+changes_feed_loop() ->
+    {ok, RepDb} = ensure_rep_db_exists(),
+    RepDbName = couch_db:name(RepDb),
+    couch_db:close(RepDb),
+    Server = self(),
+    Pid = spawn_link(
+        fun() ->
+            DbOpenOptions = [{user_ctx, RepDb#db.user_ctx}, sys_db],
+            {ok, Db} = couch_db:open_int(RepDbName, DbOpenOptions),
+            ChangesFeedFun = couch_changes:handle_changes(
+                #changes_args{
+                    include_docs = true,
+                    feed = "continuous",
+                    timeout = infinity,
+                    db_open_options = [sys_db]
+                },
+                {json_req, null},
+                Db
+            ),
+            ChangesFeedFun(
+                fun({change, Change, _}, _) ->
+                    case has_valid_rep_id(Change) of
+                    true ->
+                        ok = gen_server:call(
+                            Server, {rep_db_update, Change}, infinity);
+                    false ->
+                        ok
+                    end;
+                (_, _) ->
+                    ok
+                end
+            ),
+            couch_db:close(Db)
+        end
+    ),
+    {Pid, RepDbName}.
+
+
+has_valid_rep_id({Change}) ->
+    has_valid_rep_id(get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+    false;
+has_valid_rep_id(_Else) ->
+    true.
+
+
+db_update_notifier() ->
+    Server = self(),
+    {ok, Notifier} = couch_db_update_notifier:start_link(
+        fun({created, DbName}) ->
+            case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
+            DbName ->
+                ok = gen_server:cast(Server, {rep_db_created, DbName});
+            _ ->
+                ok
+            end;
+        (_) ->
+            % no need to handle the 'deleted' event - the changes feed loop
+            % dies when the database is deleted
+            ok
+        end
+    ),
+    Notifier.
+
+
+restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
+    stop_all_replications(),
+    lists:foreach(
+        fun(Pid) ->
+            catch unlink(Pid),
+            catch exit(Pid, rep_db_changed)
+        end,
+        [Loop | StartPids]),
+    {NewLoop, NewRepDbName} = changes_feed_loop(),
+    State#state{
+        changes_feed_loop = NewLoop,
+        rep_db_name = NewRepDbName,
+        rep_start_pids = []
+    }.
+
+
+process_update(State, {Change}) ->
+    {RepProps} = JsonRepDoc = get_value(doc, Change),
+    DocId = get_value(<<"_id">>, RepProps),
+    case get_value(<<"deleted">>, Change, false) of
+    true ->
+        rep_doc_deleted(DocId),
+        State;
+    false ->
+        case get_value(<<"_replication_state">>, RepProps) of
+        undefined ->
+            maybe_start_replication(State, DocId, JsonRepDoc);
+        <<"triggered">> ->
+            maybe_start_replication(State, DocId, JsonRepDoc);
+        <<"completed">> ->
+            replication_complete(DocId),
+            State;
+        <<"error">> ->
+            case ets:lookup(?DOC_TO_REP, DocId) of
+            [] ->
+                maybe_start_replication(State, DocId, JsonRepDoc);
+            _ ->
+                State
+            end
+        end
+    end.
+
+
+rep_db_update_error(Error, DocId) ->
+    case Error of
+    {bad_rep_doc, Reason} ->
+        ok;
+    _ ->
+        Reason = to_binary(Error)
+    end,
+    ?LOG_ERROR("Replication manager, error processing document `~s`: ~s",
+        [DocId, Reason]),
+    update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]).
+
+
+rep_user_ctx({RepDoc}) ->
+    case get_value(<<"user_ctx">>, RepDoc) of
+    undefined ->
+        #user_ctx{};
+    {UserCtx} ->
+        #user_ctx{
+            name = get_value(<<"name">>, UserCtx, null),
+            roles = get_value(<<"roles">>, UserCtx, [])
+        }
+    end.
+
+
+maybe_start_replication(State, DocId, RepDoc) ->
+    #rep{id = {BaseId, _} = RepId} = Rep = parse_rep_doc(RepDoc),
+    case rep_state(RepId) of
+    nil ->
+        RepState = #rep_state{
+            rep = Rep,
+            starting = true,
+            retries_left = State#state.max_retries,
+            max_retries = State#state.max_retries
+        },
+        true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
+        true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
+        ?LOG_INFO("Attempting to start replication `~s` (document `~s`).",
+            [pp_rep_id(RepId), DocId]),
+        Pid = spawn_link(fun() -> start_replication(Rep, 0) end),
+        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        State;
+    #rep_state{starting = false, rep = #rep{doc_id = OtherDocId}} ->
+        ?LOG_INFO("The replication specified by the document `~s` was already"
+            " triggered by the document `~s`", [DocId, OtherDocId]),
+        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
+        State;
+    #rep_state{starting = true, rep = #rep{doc_id = OtherDocId}} ->
+        ?LOG_INFO("The replication specified by the document `~s` is already"
+            " being triggered by the document `~s`", [DocId, OtherDocId]),
+        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
+        State
+    end.
+
+
+parse_rep_doc(RepDoc) ->
+    {ok, Rep} = try
+        couch_replicator_utils:parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
+    catch
+    throw:{error, Reason} ->
+        throw({bad_rep_doc, Reason});
+    Tag:Err ->
+        throw({bad_rep_doc, to_binary({Tag, Err})})
+    end,
+    Rep.
+
+
+maybe_tag_rep_doc(DocId, {RepProps}, RepId) ->
+    case get_value(<<"_replication_id">>, RepProps) of
+    RepId ->
+        ok;
+    _ ->
+        update_rep_doc(DocId, [{<<"_replication_id">>, RepId}])
+    end.
+
+
+start_replication(Rep, Wait) ->
+    ok = timer:sleep(Wait * 1000),
+    case (catch couch_replicator:async_replicate(Rep)) of
+    {ok, _} ->
+        ok;
+    Error ->
+        replication_error(Rep, Error)
+    end.
+
+
+replication_complete(DocId) ->
+    case ets:lookup(?DOC_TO_REP, DocId) of
+    [{DocId, {BaseId, Ext} = RepId}] ->
+        case rep_state(RepId) of
+        nil ->
+            % Prior to OTP R14B02, temporary child specs remain in
+            % in the supervisor after a worker finishes - remove them.
+            % We want to be able to start the same replication but with
+            % eventually different values for parameters that don't
+            % contribute to its ID calculation.
+            _ = supervisor:delete_child(couch_replicator_job_sup, BaseId ++ Ext);
+        #rep_state{} ->
+            ok
+        end,
+        true = ets:delete(?DOC_TO_REP, DocId);
+    _ ->
+        ok
+    end.
+
+
+rep_doc_deleted(DocId) ->
+    case ets:lookup(?DOC_TO_REP, DocId) of
+    [{DocId, RepId}] ->
+        couch_replicator:cancel_replication(RepId),
+        true = ets:delete(?REP_TO_STATE, RepId),
+        true = ets:delete(?DOC_TO_REP, DocId),
+        ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
+            " was deleted", [pp_rep_id(RepId), DocId]);
+    [] ->
+        ok
+    end.
+
+
+replication_error(State, RepId, Error) ->
+    case rep_state(RepId) of
+    nil ->
+        State;
+    RepState ->
+        maybe_retry_replication(RepState, Error, State)
+    end.
+
+maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
+    #rep_state{
+        rep = #rep{id = RepId, doc_id = DocId},
+        max_retries = MaxRetries
+    } = RepState,
+    couch_replicator:cancel_replication(RepId),
+    true = ets:delete(?REP_TO_STATE, RepId),
+    true = ets:delete(?DOC_TO_REP, DocId),
+    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+        "~nReached maximum retry attempts (~p).",
+        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
+    State;
+
+maybe_retry_replication(RepState, Error, State) ->
+    #rep_state{
+        rep = #rep{id = RepId, doc_id = DocId} = Rep
+    } = RepState,
+    #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
+    true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
+    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+        "~nRestarting replication in ~p seconds.",
+        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
+    Pid = spawn_link(fun() -> start_replication(Rep, Wait) end),
+    State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
+
+
+stop_all_replications() ->
+    ?LOG_INFO("Stopping all ongoing replications because the replicator"
+        " database was deleted or changed", []),
+    ets:foldl(
+        fun({_, RepId}, _) ->
+            couch_replicator:cancel_replication(RepId)
+        end,
+        ok, ?DOC_TO_REP),
+    true = ets:delete_all_objects(?REP_TO_STATE),
+    true = ets:delete_all_objects(?DOC_TO_REP).
+
+
+update_rep_doc(RepDocId, KVs) ->
+    {ok, RepDb} = ensure_rep_db_exists(),
+    try
+        case couch_db:open_doc(RepDb, RepDocId, [ejson_body]) of
+        {ok, LatestRepDoc} ->
+            update_rep_doc(RepDb, LatestRepDoc, KVs);
+        _ ->
+            ok
+        end
+    catch throw:conflict ->
+        % Shouldn't happen, as by default only the role _replicator can
+        % update replication documents.
+        ?LOG_ERROR("Conflict error when updating replication document `~s`."
+            " Retrying.", [RepDocId]),
+        ok = timer:sleep(5),
+        update_rep_doc(RepDocId, KVs)
+    after
+        couch_db:close(RepDb)
+    end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+    NewRepDocBody = lists:foldl(
+        fun({<<"_replication_state">> = K, State} = KV, Body) ->
+                case get_value(K, Body) of
+                State ->
+                    Body;
+                _ ->
+                    Body1 = lists:keystore(K, 1, Body, KV),
+                    lists:keystore(
+                        <<"_replication_state_time">>, 1, Body1,
+                        {<<"_replication_state_time">>, timestamp()})
+                end;
+            ({K, _V} = KV, Body) ->
+                lists:keystore(K, 1, Body, KV)
+        end,
+        RepDocBody, KVs),
+    case NewRepDocBody of
+    RepDocBody ->
+        ok;
+    _ ->
+        % Might not succeed - when the replication doc is deleted right
+        % before this update (not an error, ignore).
+        couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
+    end.
+
+
+% RFC3339 timestamps.
+% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
+timestamp() ->
+    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
+    UTime = erlang:universaltime(),
+    LocalTime = calendar:universal_time_to_local_time(UTime),
+    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
+        calendar:datetime_to_gregorian_seconds(UTime),
+    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
+    iolist_to_binary(
+        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
+            [Year, Month, Day, Hour, Min, Sec,
+                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
+
+zone(Hr, Min) when Hr >= 0, Min >= 0 ->
+    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
+zone(Hr, Min) ->
+    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
+
+
+
+ensure_rep_db_exists() ->
+    DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+    UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
+    case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}]) of
+    {ok, Db} ->
+        Db;
+    _Error ->
+        {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}])
+    end,
+    ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+    {ok, Db}.
+
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+    case couch_db:open_doc(RepDb, DDocID, []) of
+    {ok, _Doc} ->
+        ok;
+    _ ->
+        DDoc = couch_doc:from_json_obj({[
+            {<<"_id">>, DDocID},
+            {<<"language">>, <<"javascript">>},
+            {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+        ]}),
+        {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+     end.
+
+
+% pretty-print replication id
+pp_rep_id(#rep{id = RepId}) ->
+    pp_rep_id(RepId);
+pp_rep_id({Base, Extension}) ->
+    Base ++ Extension.
+
+
+rep_state(RepId) ->
+    case ets:lookup(?REP_TO_STATE, RepId) of
+    [{RepId, RepState}] ->
+        RepState;
+    [] ->
+        nil
+    end.
+
+
+error_reason({error, Reason}) ->
+    Reason;
+error_reason(Reason) ->
+    Reason.
+
+
+retries_value("infinity") ->
+    infinity;
+retries_value(Value) ->
+    list_to_integer(Value).
+
+
+state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
+    Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
+    case Left of
+    infinity ->
+        State#rep_state{wait = Wait2};
+    _ ->
+        State#rep_state{retries_left = Left - 1, wait = Wait2}
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_notifier.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_notifier.erl b/src/couch_replicator/src/couch_replicator_notifier.erl
new file mode 100644
index 0000000..3b48b71
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_notifier.erl
@@ -0,0 +1,57 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_notifier).
+
+-behaviour(gen_event).
+
+% public API
+-export([start_link/1, stop/1, notify/1]).
+
+% gen_event callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_event/2, handle_call/2, handle_info/2]).
+
+-include("couch_db.hrl").
+
+start_link(FunAcc) ->
+    couch_event_sup:start_link(couch_replication,
+        {couch_replicator_notifier, make_ref()}, FunAcc).
+
+notify(Event) ->
+    gen_event:notify(couch_replication, Event).
+
+stop(Pid) ->
+    couch_event_sup:stop(Pid).
+
+
+init(FunAcc) ->
+    {ok, FunAcc}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_event(Event, Fun) when is_function(Fun, 1) ->
+    Fun(Event),
+    {ok, Fun};
+handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
+    Acc2 = Fun(Event, Acc),
+    {ok, {Fun, Acc2}}.
+
+handle_call(_Msg, State) ->
+    {reply, ok, State}.
+
+handle_info(_Msg, State) ->
+    {ok, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index 6cc4db8..d4fc0e7 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -19,7 +19,7 @@
 -export([sum_stats/2]).
 
 -include("couch_db.hrl").
--include("couch_api_wrap.hrl").
+-include("couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
 -include("../ibrowse/ibrowse.hrl").
 
@@ -109,23 +109,23 @@ filter_code(Filter, Source, UserCtx) ->
     _ ->
         throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
     end,
-    Db = case (catch couch_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of
+    Db = case (catch couch_replicator_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of
     {ok, Db0} ->
         Db0;
     DbError ->
         DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
-           [couch_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]),
+           [couch_replicator_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]),
         throw({error, iolist_to_binary(DbErrorMsg)})
     end,
     try
-        Body = case (catch couch_api_wrap:open_doc(
+        Body = case (catch couch_replicator_api_wrap:open_doc(
             Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
         {ok, #doc{body = Body0}} ->
             Body0;
         DocError ->
             DocErrorMsg = io_lib:format(
                 "Couldn't open document `_design/~s` from source "
-                "database `~s`: ~s", [DDocName, couch_api_wrap:db_uri(Source),
+                "database `~s`: ~s", [DDocName, couch_replicator_api_wrap:db_uri(Source),
                     couch_util:to_binary(DocError)]),
             throw({error, iolist_to_binary(DocErrorMsg)})
         end,
@@ -133,7 +133,7 @@ filter_code(Filter, Source, UserCtx) ->
             Body, [<<"filters">>, FilterName]),
         re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
     after
-        couch_api_wrap:db_close(Db)
+        couch_replicator_api_wrap:db_close(Db)
     end.
 
 


Mime
View raw message