couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From beno...@apache.org
Subject [1/8] git commit: refactor couch_replicator. close #COUCHDB-1323 .
Date Mon, 05 Dec 2011 09:33:29 GMT
Updated Branches:
  refs/heads/master 0c6f529a7 -> f913ca6e8


refactor couch_replicator. close #COUCHDB-1323 .

Move all modules under couch_replicator namespace


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/f913ca6e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/f913ca6e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/f913ca6e

Branch: refs/heads/master
Commit: f913ca6e8efd8ab76babb87d5d69bc6c493833e0
Parents: ad52679
Author: benoitc <benoitc@apache.org>
Authored: Sun Dec 4 14:08:26 2011 +0100
Committer: benoitc <benoitc@apache.org>
Committed: Mon Dec 5 10:33:07 2011 +0100

----------------------------------------------------------------------
 etc/couchdb/default.ini.tpl.in                     |    4 +-
 src/couch_replicator/Makefile.am                   |   32 +-
 src/couch_replicator/src/couch_api_wrap.erl        |  775 ---------------
 src/couch_replicator/src/couch_api_wrap.hrl        |   36 -
 src/couch_replicator/src/couch_api_wrap_httpc.erl  |  286 ------
 src/couch_replicator/src/couch_httpc_pool.erl      |  138 ---
 .../src/couch_httpd_replicator.erl                 |   66 --
 src/couch_replicator/src/couch_rep_sup.erl         |   31 -
 .../src/couch_replication_manager.erl              |  626 ------------
 .../src/couch_replication_notifier.erl             |   57 --
 src/couch_replicator/src/couch_replicator.app.src  |   22 +-
 src/couch_replicator/src/couch_replicator.erl      |   74 +-
 .../src/couch_replicator_api_wrap.erl              |  775 +++++++++++++++
 .../src/couch_replicator_api_wrap.hrl              |   36 +
 .../src/couch_replicator_httpc.erl                 |  286 ++++++
 .../src/couch_replicator_httpc_pool.erl            |  138 +++
 .../src/couch_replicator_httpd.erl                 |   66 ++
 .../src/couch_replicator_job_sup.erl               |   31 +
 .../src/couch_replicator_manager.erl               |  626 ++++++++++++
 .../src/couch_replicator_notifier.erl              |   57 ++
 .../src/couch_replicator_utils.erl                 |   12 +-
 .../src/couch_replicator_worker.erl                |   20 +-
 src/couch_replicator/test/001-httpc-pool.t         |    8 +-
 src/couchdb/couch_primary_sup.erl                  |    6 +-
 24 files changed, 2104 insertions(+), 2104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/etc/couchdb/default.ini.tpl.in
----------------------------------------------------------------------
diff --git a/etc/couchdb/default.ini.tpl.in b/etc/couchdb/default.ini.tpl.in
index d5ee910..4bc485e 100644
--- a/etc/couchdb/default.ini.tpl.in
+++ b/etc/couchdb/default.ini.tpl.in
@@ -87,7 +87,7 @@ stats_aggregator={couch_stats_aggregator, start, []}
 stats_collector={couch_stats_collector, start, []}
 uuids={couch_uuids, start, []}
 auth_cache={couch_auth_cache, start_link, []}
-replication_manager={couch_replication_manager, start_link, []}
+replicator_manager={couch_replicator_manager, start_link, []}
 os_daemons={couch_os_daemons, start_link, []}
 compaction_daemon={couch_compaction_daemon, start_link, []}
 
@@ -99,7 +99,7 @@ _utils = {couch_httpd_misc_handlers, handle_utils_dir_req, "%localdatadir%/www"}
 _all_dbs = {couch_httpd_misc_handlers, handle_all_dbs_req}
 _active_tasks = {couch_httpd_misc_handlers, handle_task_status_req}
 _config = {couch_httpd_misc_handlers, handle_config_req}
-_replicate = {couch_httpd_replicator, handle_req}
+_replicate = {couch_replicator_httpd, handle_req}
 _uuids = {couch_httpd_misc_handlers, handle_uuids_req}
 _restart = {couch_httpd_misc_handlers, handle_restart_req}
 _stats = {couch_httpd_stats_handlers, handle_stats_req}

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/Makefile.am
----------------------------------------------------------------------
diff --git a/src/couch_replicator/Makefile.am b/src/couch_replicator/Makefile.am
index 5a4a0e6..169a7e3 100644
--- a/src/couch_replicator/Makefile.am
+++ b/src/couch_replicator/Makefile.am
@@ -18,18 +18,18 @@ couch_replicatorinclude_DATA = $(include_files)
 couch_replicatorebin_DATA = $(compiled_files)
 
 include_files = \
-	src/couch_api_wrap.hrl \
-    src/couch_replicator.hrl \
+	src/couch_replicator_api_wrap.hrl \
+	src/couch_replicator.hrl \
 	src/couch_replicator_js_functions.hrl
 
 source_files = \
-	src/couch_api_wrap_httpc.erl \
-	src/couch_api_wrap.erl \
-	src/couch_httpc_pool.erl \
-	src/couch_httpd_replicator.erl \
-	src/couch_rep_sup.erl \
-	src/couch_replication_manager.erl \
-	src/couch_replication_notifier.erl \
+	src/couch_replicator_api_wrap.erl \
+	src/couch_replicator_httpc_pool.erl \
+	src/couch_replicator_httpc.erl \
+	src/couch_replicator_httpd.erl \
+	src/couch_replicator_jon_sup.erl \
+	src/couch_replicator_notifier.erl \
+	src/couch_replicator_manager.erl \
 	src/couch_replicator_utils.erl \
 	src/couch_replicator_worker.erl \
 	src/couch_replicator.app.src \
@@ -42,13 +42,13 @@ test_files = \
 	test/004-replication-many-leaves.t
 
 compiled_files = \
-	ebin/couch_api_wrap_httpc.beam \
-	ebin/couch_api_wrap.beam \
-	ebin/couch_httpc_pool.beam \
-	ebin/couch_httpd_replicator.beam \
-	ebin/couch_rep_sup.beam \
-	ebin/couch_replication_manager.beam \
-	ebin/couch_replication_notifier.beam \
+	ebin/couch_replicator_api_wrap.beam \
+	ebin/couch_replicator_httpc_pool.beam \
+	ebin/couch_replicator_httpc.beam \
+	ebin/couch_replicator_httpd.beam \
+	ebin/couch_replicator_job_sup.beam \
+	ebin/couch_replicator_notifier.beam \
+	ebin/couch_replicator_manager.beam \
 	ebin/couch_replicator_utils.beam \
 	ebin/couch_replicator_worker.beam \
 	ebin/couch_replicator.app \

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_api_wrap.erl b/src/couch_replicator/src/couch_api_wrap.erl
deleted file mode 100644
index 2c57008..0000000
--- a/src/couch_replicator/src/couch_api_wrap.erl
+++ /dev/null
@@ -1,775 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_api_wrap).
-
-% This module wraps the native erlang API, and allows for performing
-% operations on a remote vs. local databases via the same API.
-%
-% Notes:
-% Many options and apis aren't yet supported here, they are added as needed.
-
--include("couch_db.hrl").
--include("couch_api_wrap.hrl").
-
--export([
-    db_open/2,
-    db_open/3,
-    db_close/1,
-    get_db_info/1,
-    update_doc/3,
-    update_doc/4,
-    update_docs/3,
-    update_docs/4,
-    ensure_full_commit/1,
-    get_missing_revs/2,
-    open_doc/3,
-    open_doc_revs/6,
-    changes_since/5,
-    db_uri/1
-    ]).
-
--import(couch_api_wrap_httpc, [
-    send_req/3
-    ]).
-
--import(couch_util, [
-    encode_doc_id/1,
-    get_value/2,
-    get_value/3
-    ]).
-
-
-db_uri(#httpdb{url = Url}) ->
-    couch_util:url_strip_password(Url);
-
-db_uri(#db{name = Name}) ->
-    db_uri(Name);
-
-db_uri(DbName) ->
-    ?b2l(DbName).
-
-
-db_open(Db, Options) ->
-    db_open(Db, Options, false).
-
-db_open(#httpdb{} = Db1, _Options, Create) ->
-    {ok, Db} = couch_api_wrap_httpc:setup(Db1),
-    case Create of
-    false ->
-        ok;
-    true ->
-        send_req(Db, [{method, put}], fun(_, _, _) -> ok end)
-    end,
-    send_req(Db, [{method, head}],
-        fun(200, _, _) ->
-            {ok, Db};
-        (401, _, _) ->
-            throw({unauthorized, ?l2b(db_uri(Db))});
-        (_, _, _) ->
-            throw({db_not_found, ?l2b(db_uri(Db))})
-        end);
-db_open(DbName, Options, Create) ->
-    try
-        case Create of
-        false ->
-            ok;
-        true ->
-            ok = couch_httpd:verify_is_server_admin(
-                get_value(user_ctx, Options)),
-            couch_db:create(DbName, Options)
-        end,
-        case couch_db:open(DbName, Options) of
-        {not_found, _Reason} ->
-            throw({db_not_found, DbName});
-        {ok, _Db} = Success ->
-            Success
-        end
-    catch
-    throw:{unauthorized, _} ->
-        throw({unauthorized, DbName})
-    end.
-
-db_close(#httpdb{httpc_pool = Pool}) ->
-    unlink(Pool),
-    ok = couch_httpc_pool:stop(Pool);
-db_close(DbName) ->
-    catch couch_db:close(DbName).
-
-
-get_db_info(#httpdb{} = Db) ->
-    send_req(Db, [],
-        fun(200, _, {Props}) ->
-            {ok, Props}
-        end);
-get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
-    {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
-    {ok, Info} = couch_db:get_db_info(Db),
-    couch_db:close(Db),
-    {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
-
-
-ensure_full_commit(#httpdb{} = Db) ->
-    send_req(
-        Db,
-        [{method, post}, {path, "_ensure_full_commit"},
-            {headers, [{"Content-Type", "application/json"}]}],
-        fun(201, _, {Props}) ->
-            {ok, get_value(<<"instance_start_time">>, Props)};
-        (_, _, {Props}) ->
-            {error, get_value(<<"error">>, Props)}
-        end);
-ensure_full_commit(Db) ->
-    couch_db:ensure_full_commit(Db).
-
-
-get_missing_revs(#httpdb{} = Db, IdRevs) ->
-    JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]},
-    send_req(
-        Db,
-        [{method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)}],
-        fun(200, _, {Props}) ->
-            ConvertToNativeFun = fun({Id, {Result}}) ->
-                MissingRevs = couch_doc:parse_revs(
-                    get_value(<<"missing">>, Result)
-                ),
-                PossibleAncestors = couch_doc:parse_revs(
-                    get_value(<<"possible_ancestors">>, Result, [])
-                ),
-                {Id, MissingRevs, PossibleAncestors}
-            end,
-            {ok, lists:map(ConvertToNativeFun, Props)}
-        end);
-get_missing_revs(Db, IdRevs) ->
-    couch_db:get_missing_revs(Db, IdRevs).
-
-
-
-open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
-    Path = encode_doc_id(Id),
-    QArgs = options_to_query_args(
-        HttpDb, Path, [revs, {open_revs, Revs} | Options]),
-    Self = self(),
-    Streamer = spawn_link(fun() ->
-            send_req(
-                HttpDb,
-                [{path, Path}, {qs, QArgs},
-                    {ibrowse_options, [{stream_to, {self(), once}}]},
-                    {headers, [{"Accept", "multipart/mixed"}]}],
-                fun(200, Headers, StreamDataFun) ->
-                    remote_open_doc_revs_streamer_start(Self),
-                    {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
-                        get_value("Content-Type", Headers),
-                        StreamDataFun,
-                        fun mp_parse_mixed/1)
-                end),
-            unlink(Self)
-        end),
-    receive
-    {started_open_doc_revs, Ref} ->
-        receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc)
-    end;
-open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
-    {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
-    {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
-
-
-open_doc(#httpdb{} = Db, Id, Options) ->
-    send_req(
-        Db,
-        [{path, encode_doc_id(Id)}, {qs, options_to_query_args(Options, [])}],
-        fun(200, _, Body) ->
-            {ok, couch_doc:from_json_obj(Body)};
-        (_, _, {Props}) ->
-            {error, get_value(<<"error">>, Props)}
-        end);
-open_doc(Db, Id, Options) ->
-    case couch_db:open_doc(Db, Id, Options) of
-    {ok, _} = Ok ->
-        Ok;
-    {not_found, _Reason} ->
-        {error, <<"not_found">>}
-    end.
-
-
-update_doc(Db, Doc, Options) ->
-    update_doc(Db, Doc, Options, interactive_edit).
-
-update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
-    QArgs = case Type of
-    replicated_changes ->
-        [{"new_edits", "false"}];
-    _ ->
-        []
-    end ++ options_to_query_args(Options, []),
-    Boundary = couch_uuids:random(),
-    JsonBytes = ?JSON_ENCODE(
-        couch_doc:to_json_obj(
-          Doc, [revs, attachments, follows, att_encoding_info | Options])),
-    {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
-        JsonBytes, Doc#doc.atts, true),
-    Headers = case lists:member(delay_commit, Options) of
-    true ->
-        [{"X-Couch-Full-Commit", "false"}];
-    false ->
-        []
-    end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
-    Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
-    send_req(
-        HttpDb,
-        [{method, put}, {path, encode_doc_id(DocId)},
-            {qs, QArgs}, {headers, Headers}, {body, Body}],
-        fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->
-                {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
-            (409, _, _) ->
-                throw(conflict);
-            (Code, _, {Props}) ->
-                case {Code, get_value(<<"error">>, Props)} of
-                {401, <<"unauthorized">>} ->
-                    throw({unauthorized, get_value(<<"reason">>, Props)});
-                {403, <<"forbidden">>} ->
-                    throw({forbidden, get_value(<<"reason">>, Props)});
-                {412, <<"missing_stub">>} ->
-                    throw({missing_stub, get_value(<<"reason">>, Props)});
-                {_, Error} ->
-                    {error, Error}
-                end
-        end);
-update_doc(Db, Doc, Options, Type) ->
-    couch_db:update_doc(Db, Doc, Options, Type).
-
-
-update_docs(Db, DocList, Options) ->
-    update_docs(Db, DocList, Options, interactive_edit).
-
-update_docs(_Db, [], _Options, _UpdateType) ->
-    {ok, []};
-update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
-    FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
-    Prefix = case UpdateType of
-    replicated_changes ->
-        <<"{\"new_edits\":false,\"docs\":[">>;
-    interactive_edit ->
-        <<"{\"docs\":[">>
-    end,
-    Suffix = <<"]}">>,
-    % Note: nginx and other servers don't like PUT/POST requests without
-    % a Content-Length header, so we can't do a chunked transfer encoding
-    % and JSON encode each doc only before sending it through the socket.
-    {Docs, Len} = lists:mapfoldl(
-        fun(#doc{} = Doc, Acc) ->
-            Json = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
-            {Json, Acc + iolist_size(Json)};
-        (Doc, Acc) ->
-            {Doc, Acc + iolist_size(Doc)}
-        end,
-        byte_size(Prefix) + byte_size(Suffix) + length(DocList) - 1,
-        DocList),
-    BodyFun = fun(eof) ->
-            eof;
-        ([]) ->
-            {ok, Suffix, eof};
-        ([prefix | Rest]) ->
-            {ok, Prefix, Rest};
-        ([Doc]) ->
-            {ok, Doc, []};
-        ([Doc | RestDocs]) ->
-            {ok, [Doc, ","], RestDocs}
-    end,
-    Headers = [
-        {"Content-Length", Len},
-        {"Content-Type", "application/json"},
-        {"X-Couch-Full-Commit", FullCommit}
-    ],
-    send_req(
-        HttpDb,
-        [{method, post}, {path, "_bulk_docs"},
-            {body, {BodyFun, [prefix | Docs]}}, {headers, Headers}],
-        fun(201, _, Results) when is_list(Results) ->
-                {ok, bulk_results_to_errors(DocList, Results, remote)};
-           (417, _, Results) when is_list(Results) ->
-                {ok, bulk_results_to_errors(DocList, Results, remote)}
-        end);
-update_docs(Db, DocList, Options, UpdateType) ->
-    Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
-    {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
-
-
-changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq,
-    UserFun, Options) ->
-    BaseQArgs = case get_value(continuous, Options, false) of
-    false ->
-        [{"feed", "normal"}];
-    true ->
-        [{"feed", "continuous"}, {"heartbeat", "10000"}]
-    end ++ [
-        {"style", atom_to_list(Style)}, {"since", couch_util:to_list(StartSeq)}
-    ],
-    DocIds = get_value(doc_ids, Options),
-    {QArgs, Method, Body, Headers} = case DocIds of
-    undefined ->
-        QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
-        {QArgs1, get, [], Headers1};
-    _ when is_list(DocIds) ->
-        Headers2 = [{"Content-Type", "application/json"} | Headers1],
-        JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
-        {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
-    end,
-    send_req(
-        HttpDb,
-        [{method, Method}, {path, "_changes"}, {qs, QArgs},
-            {headers, Headers}, {body, Body},
-            {ibrowse_options, [{stream_to, {self(), once}}]}],
-        fun(200, _, DataStreamFun) ->
-                parse_changes_feed(Options, UserFun, DataStreamFun);
-            (405, _, _) when is_list(DocIds) ->
-                % CouchDB versions < 1.1.0 don't have the builtin _changes feed
-                % filter "_doc_ids" neither support POST
-                send_req(HttpDb, [{method, get}, {path, "_changes"},
-                    {qs, BaseQArgs}, {headers, Headers1},
-                    {ibrowse_options, [{stream_to, {self(), once}}]}],
-                    fun(200, _, DataStreamFun2) ->
-                        UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
-                            case lists:member(Id, DocIds) of
-                            true ->
-                                UserFun(DocInfo);
-                            false ->
-                                ok
-                            end
-                        end,
-                        parse_changes_feed(Options, UserFun2, DataStreamFun2)
-                    end)
-        end);
-changes_since(Db, Style, StartSeq, UserFun, Options) ->
-    Filter = case get_value(doc_ids, Options) of
-    undefined ->
-        ?b2l(get_value(filter, Options, <<>>));
-    _DocIds ->
-        "_doc_ids"
-    end,
-    Args = #changes_args{
-        style = Style,
-        since = StartSeq,
-        filter = Filter,
-        feed = case get_value(continuous, Options, false) of
-            true ->
-                "continuous";
-            false ->
-                "normal"
-        end,
-        timeout = infinity
-    },
-    QueryParams = get_value(query_params, Options, {[]}),
-    Req = changes_json_req(Db, Filter, QueryParams, Options),
-    ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db),
-    ChangesFeedFun(fun({change, Change, _}, _) ->
-            UserFun(json_to_doc_info(Change));
-        (_, _) ->
-            ok
-    end).
-
-
-% internal functions
-
-maybe_add_changes_filter_q_args(BaseQS, Options) ->
-    case get_value(filter, Options) of
-    undefined ->
-        BaseQS;
-    FilterName ->
-        {Params} = get_value(query_params, Options, {[]}),
-        [{"filter", ?b2l(FilterName)} | lists:foldl(
-            fun({K, V}, QSAcc) ->
-                Ks = couch_util:to_list(K),
-                case lists:keymember(Ks, 1, QSAcc) of
-                true ->
-                    QSAcc;
-                false ->
-                    [{Ks, couch_util:to_list(V)} | QSAcc]
-                end
-            end,
-            BaseQS, Params)]
-    end.
-
-parse_changes_feed(Options, UserFun, DataStreamFun) ->
-    case get_value(continuous, Options, false) of
-    true ->
-        continuous_changes(DataStreamFun, UserFun);
-    false ->
-        EventFun = fun(Ev) ->
-            changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, [])
-        end,
-        json_stream_parse:events(DataStreamFun, EventFun)
-    end.
-
-changes_json_req(_Db, "", _QueryParams, _Options) ->
-    {[]};
-changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
-    {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
-changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
-    {ok, Info} = couch_db:get_db_info(Db),
-    % simulate a request to db_name/_changes
-    {[
-        {<<"info">>, {Info}},
-        {<<"id">>, null},
-        {<<"method">>, 'GET'},
-        {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
-        {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}},
-        {<<"headers">>, []},
-        {<<"body">>, []},
-        {<<"peer">>, <<"replicator">>},
-        {<<"form">>, []},
-        {<<"cookie">>, []},
-        {<<"userCtx">>, couch_util:json_user_ctx(Db)}
-    ]}.
-
-
-options_to_query_args(HttpDb, Path, Options) ->
-    case lists:keytake(atts_since, 1, Options) of
-    false ->
-        options_to_query_args(Options, []);
-    {value, {atts_since, []}, Options2} ->
-        options_to_query_args(Options2, []);
-    {value, {atts_since, PAs}, Options2} ->
-        QueryArgs1 = options_to_query_args(Options2, []),
-        FullUrl = couch_api_wrap_httpc:full_url(
-            HttpDb, [{path, Path}, {qs, QueryArgs1}]),
-        RevList = atts_since_arg(
-            length("GET " ++ FullUrl ++ " HTTP/1.1\r\n") +
-            length("&atts_since=") + 6,  % +6 = % encoded [ and ]
-            PAs, []),
-        [{"atts_since", ?JSON_ENCODE(RevList)} | QueryArgs1]
-    end.
-
-
-options_to_query_args([], Acc) ->
-    lists:reverse(Acc);
-options_to_query_args([ejson_body | Rest], Acc) ->
-    options_to_query_args(Rest, Acc);
-options_to_query_args([delay_commit | Rest], Acc) ->
-    options_to_query_args(Rest, Acc);
-options_to_query_args([revs | Rest], Acc) ->
-    options_to_query_args(Rest, [{"revs", "true"} | Acc]);
-options_to_query_args([{open_revs, all} | Rest], Acc) ->
-    options_to_query_args(Rest, [{"open_revs", "all"} | Acc]);
-options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
-    JsonRevs = ?b2l(?JSON_ENCODE(couch_doc:revs_to_strs(Revs))),
-    options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).
-
-
--define(MAX_URL_LEN, 7000).
-
-atts_since_arg(_UrlLen, [], Acc) ->
-    lists:reverse(Acc);
-atts_since_arg(UrlLen, [PA | Rest], Acc) ->
-    RevStr = couch_doc:rev_to_str(PA),
-    NewUrlLen = case Rest of
-    [] ->
-        % plus 2 double quotes (% encoded)
-        UrlLen + size(RevStr) + 6;
-    _ ->
-        % plus 2 double quotes and a comma (% encoded)
-        UrlLen + size(RevStr) + 9
-    end,
-    case NewUrlLen >= ?MAX_URL_LEN of
-    true ->
-        lists:reverse(Acc);
-    false ->
-        atts_since_arg(NewUrlLen, Rest, [RevStr | Acc])
-    end.
-
-
-% TODO: A less verbose, more elegant and automatic restart strategy for
-%       the exported open_doc_revs/6 function. The restart should be
-%       transparent to the caller like any other Couch API function exported
-%       by this module.
-receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) ->
-    try
-        % Left only for debugging purposes via an interactive or remote shell
-        erlang:put(open_doc_revs, {Id, Revs, Ref, Streamer}),
-        receive_docs(Streamer, Fun, Ref, Acc)
-    catch
-    error:{restart_open_doc_revs, NewRef} ->
-        receive_docs_loop(Streamer, Fun, Id, Revs, NewRef, Acc)
-    end.
-
-receive_docs(Streamer, UserFun, Ref, UserAcc) ->
-    Streamer ! {get_headers, Ref, self()},
-    receive
-    {started_open_doc_revs, NewRef} ->
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {headers, Ref, Headers} ->
-        case get_value("content-type", Headers) of
-        {"multipart/related", _} = ContentType ->
-            case doc_from_multi_part_stream(
-                ContentType,
-                fun() -> receive_doc_data(Streamer, Ref) end,
-                Ref) of
-            {ok, Doc, Parser} ->
-                case UserFun({ok, Doc}, UserAcc) of
-                {ok, UserAcc2} ->
-                    ok;
-                {skip, UserAcc2} ->
-                    couch_doc:abort_multi_part_stream(Parser)
-                end,
-                receive_docs(Streamer, UserFun, Ref, UserAcc2)
-            end;
-        {"application/json", []} ->
-            Doc = couch_doc:from_json_obj(
-                    ?JSON_DECODE(receive_all(Streamer, Ref, []))),
-            {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
-            receive_docs(Streamer, UserFun, Ref, UserAcc2);
-        {"application/json", [{"error","true"}]} ->
-            {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
-            Rev = get_value(<<"missing">>, ErrorProps),
-            Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
-            {_, UserAcc2} = UserFun(Result, UserAcc),
-            receive_docs(Streamer, UserFun, Ref, UserAcc2)
-        end;
-    {done, Ref} ->
-        {ok, UserAcc}
-    end.
-
-
-restart_remote_open_doc_revs(Ref, NewRef) ->
-    receive
-    {body_bytes, Ref, _} ->
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {body_done, Ref} ->
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {done, Ref} ->
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {headers, Ref, _} ->
-        restart_remote_open_doc_revs(Ref, NewRef)
-    after 0 ->
-        erlang:error({restart_open_doc_revs, NewRef})
-    end.
-
-
-remote_open_doc_revs_streamer_start(Parent) ->
-    receive
-    {get_headers, _Ref, Parent} ->
-        remote_open_doc_revs_streamer_start(Parent);
-    {next_bytes, _Ref, Parent} ->
-        remote_open_doc_revs_streamer_start(Parent)
-    after 0 ->
-        Parent ! {started_open_doc_revs, make_ref()}
-    end.
-
-
-receive_all(Streamer, Ref, Acc) ->
-    Streamer ! {next_bytes, Ref, self()},
-    receive
-    {started_open_doc_revs, NewRef} ->
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {body_bytes, Ref, Bytes} ->
-        receive_all(Streamer, Ref, [Bytes | Acc]);
-    {body_done, Ref} ->
-        lists:reverse(Acc)
-    end.
-
-
-mp_parse_mixed(eof) ->
-    receive {get_headers, Ref, From} ->
-        From ! {done, Ref}
-    end;
-mp_parse_mixed({headers, H}) ->
-    receive {get_headers, Ref, From} ->
-        From ! {headers, Ref, H}
-    end,
-    fun mp_parse_mixed/1;
-mp_parse_mixed({body, Bytes}) ->
-    receive {next_bytes, Ref, From} ->
-        From ! {body_bytes, Ref, Bytes}
-    end,
-    fun mp_parse_mixed/1;
-mp_parse_mixed(body_end) ->
-    receive {next_bytes, Ref, From} ->
-        From ! {body_done, Ref};
-    {get_headers, Ref, From} ->
-        self() ! {get_headers, Ref, From}
-    end,
-    fun mp_parse_mixed/1.
-
-
-receive_doc_data(Streamer, Ref) ->
-    Streamer ! {next_bytes, Ref, self()},
-    receive
-    {body_bytes, Ref, Bytes} ->
-        {Bytes, fun() -> receive_doc_data(Streamer, Ref) end};
-    {body_done, Ref} ->
-        {<<>>, fun() -> receive_doc_data(Streamer, Ref) end}
-    end.
-
-doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
-    Self = self(),
-    Parser = spawn_link(fun() ->
-        {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
-            ContentType, DataFun,
-            fun(Next) -> couch_doc:mp_parse_doc(Next, []) end),
-        unlink(Self)
-        end),
-    Parser ! {get_doc_bytes, Ref, self()},
-    receive
-    {started_open_doc_revs, NewRef} ->
-        unlink(Parser),
-        exit(Parser, kill),
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {doc_bytes, Ref, DocBytes} ->
-        Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
-        ReadAttachmentDataFun = fun() ->
-            Parser ! {get_bytes, Ref, self()},
-            receive
-            {started_open_doc_revs, NewRef} ->
-                unlink(Parser),
-                exit(Parser, kill),
-                receive {bytes, Ref, _} -> ok after 0 -> ok end,
-                restart_remote_open_doc_revs(Ref, NewRef);
-            {bytes, Ref, Bytes} ->
-                Bytes
-            end
-        end,
-        Atts2 = lists:map(
-            fun(#att{data = follows} = A) ->
-                A#att{data = ReadAttachmentDataFun};
-            (A) ->
-                A
-            end, Doc#doc.atts),
-        {ok, Doc#doc{atts = Atts2}, Parser}
-    end.
-
-
-changes_ev1(object_start, UserFun, UserAcc) ->
-    fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
-
-changes_ev2({key, <<"results">>}, UserFun, UserAcc) ->
-    fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end;
-changes_ev2(_, UserFun, UserAcc) ->
-    fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
-
-changes_ev3(array_start, UserFun, UserAcc) ->
-    fun(Ev) -> changes_ev_loop(Ev, UserFun, UserAcc) end.
-
-changes_ev_loop(object_start, UserFun, UserAcc) ->
-    fun(Ev) ->
-        json_stream_parse:collect_object(Ev,
-            fun(Obj) ->
-                UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc),
-                fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end
-            end)
-    end;
-changes_ev_loop(array_end, _UserFun, _UserAcc) ->
-    fun(_Ev) -> changes_ev_done() end.
-
-changes_ev_done() ->
-    fun(_Ev) -> changes_ev_done() end.
-
-continuous_changes(DataFun, UserFun) ->
-    {DataFun2, _, Rest} = json_stream_parse:events(
-        DataFun,
-        fun(Ev) -> parse_changes_line(Ev, UserFun) end),
-    continuous_changes(fun() -> {Rest, DataFun2} end, UserFun).
-
-parse_changes_line(object_start, UserFun) ->
-    fun(Ev) ->
-        json_stream_parse:collect_object(Ev,
-            fun(Obj) -> UserFun(json_to_doc_info(Obj)) end)
-    end.
-
-json_to_doc_info({Props}) ->
-    RevsInfo = lists:map(
-        fun({Change}) ->
-            Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)),
-            Del = (true =:= get_value(<<"deleted">>, Change)),
-            #rev_info{rev=Rev, deleted=Del}
-        end, get_value(<<"changes">>, Props)),
-    #doc_info{
-        id = get_value(<<"id">>, Props),
-        high_seq = get_value(<<"seq">>, Props),
-        revs = RevsInfo
-    }.
-
-
-bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
-    lists:reverse(lists:foldl(
-        fun({_, {ok, _}}, Acc) ->
-            Acc;
-        ({#doc{id = Id, revs = {Pos, [RevId | _]}}, Error}, Acc) ->
-            {_, Error, Reason} = couch_httpd:error_info(Error),
-            [ {[{id, Id}, {rev, rev_to_str({Pos, RevId})},
-                {error, Error}, {reason, Reason}]} | Acc ]
-        end,
-        [], lists:zip(Docs, Results)));
-
-bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) ->
-    bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit);
-
-bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) ->
-    lists:map(
-        fun({{Id, Rev}, Err}) ->
-            {_, Error, Reason} = couch_httpd:error_info(Err),
-            {[{id, Id}, {rev, rev_to_str(Rev)}, {error, Error}, {reason, Reason}]}
-        end,
-        Results);
-
-bulk_results_to_errors(_Docs, Results, remote) ->
-    lists:reverse(lists:foldl(
-        fun({Props}, Acc) ->
-            case get_value(<<"error">>, Props, get_value(error, Props)) of
-            undefined ->
-                Acc;
-            Error ->
-                Id = get_value(<<"id">>, Props, get_value(id, Props)),
-                Rev = get_value(<<"rev">>, Props, get_value(rev, Props)),
-                Reason = get_value(<<"reason">>, Props, get_value(reason, Props)),
-                [ {[{id, Id}, {rev, rev_to_str(Rev)},
-                    {error, Error}, {reason, Reason}]} | Acc ]
-            end
-        end,
-        [], Results)).
-
-
-rev_to_str({_Pos, _Id} = Rev) ->
-    couch_doc:rev_to_str(Rev);
-rev_to_str(Rev) ->
-    Rev.
-
-
-stream_doc({JsonBytes, Atts, Boundary, Len}) ->
-    case erlang:erase({doc_streamer, Boundary}) of
-    Pid when is_pid(Pid) ->
-        unlink(Pid),
-        exit(Pid, kill);
-    _ ->
-        ok
-    end,
-    Self = self(),
-    DocStreamer = spawn_link(fun() ->
-        couch_doc:doc_to_multi_part_stream(
-            Boundary, JsonBytes, Atts,
-            fun(Data) ->
-                receive {get_data, Ref, From} ->
-                    From ! {data, Ref, Data}
-                end
-            end, true),
-        unlink(Self)
-    end),
-    erlang:put({doc_streamer, Boundary}, DocStreamer),
-    {ok, <<>>, {Len, Boundary}};
-stream_doc({0, Id}) ->
-    erlang:erase({doc_streamer, Id}),
-    eof;
-stream_doc({LenLeft, Id}) when LenLeft > 0 ->
-    Ref = make_ref(),
-    erlang:get({doc_streamer, Id}) ! {get_data, Ref, self()},
-    receive {data, Ref, Data} ->
-        {ok, Data, {LenLeft - iolist_size(Data), Id}}
-    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_api_wrap.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_api_wrap.hrl b/src/couch_replicator/src/couch_api_wrap.hrl
deleted file mode 100644
index 1a6f27a..0000000
--- a/src/couch_replicator/src/couch_api_wrap.hrl
+++ /dev/null
@@ -1,36 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-
-
--record(httpdb, {
-    url,
-    oauth = nil,
-    headers = [
-        {"Accept", "application/json"},
-        {"User-Agent", "CouchDB/" ++ couch_server:get_version()}
-    ],
-    timeout,            % milliseconds
-    ibrowse_options = [],
-    retries = 10,
-    wait = 250,         % milliseconds
-    httpc_pool = nil,
-    http_connections
-}).
-
--record(oauth, {
-    consumer_key,
-    token,
-    token_secret,
-    consumer_secret,
-    signature_method
-}).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_api_wrap_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_api_wrap_httpc.erl b/src/couch_replicator/src/couch_api_wrap_httpc.erl
deleted file mode 100644
index d05eec7..0000000
--- a/src/couch_replicator/src/couch_api_wrap_httpc.erl
+++ /dev/null
@@ -1,286 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_api_wrap_httpc).
-
--include("couch_db.hrl").
--include("couch_api_wrap.hrl").
--include("../ibrowse/ibrowse.hrl").
-
--export([setup/1]).
--export([send_req/3]).
--export([full_url/2]).
-
--import(couch_util, [
-    get_value/2,
-    get_value/3
-]).
-
--define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
--define(MAX_WAIT, 5 * 60 * 1000).
-
-
-setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
-    {ok, Pid} = couch_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
-    {ok, Db#httpdb{httpc_pool = Pid}}.
-
-
-send_req(HttpDb, Params1, Callback) ->
-    Params2 = ?replace(Params1, qs,
-        [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
-    Params = ?replace(Params2, ibrowse_options,
-        lists:keysort(1, get_value(ibrowse_options, Params2, []))),
-    {Worker, Response} = send_ibrowse_req(HttpDb, Params),
-    process_response(Response, Worker, HttpDb, Params, Callback).
-
-
-send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
-    Method = get_value(method, Params, get),
-    UserHeaders = lists:keysort(1, get_value(headers, Params, [])),
-    Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders),
-    Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
-    Url = full_url(HttpDb, Params),
-    Body = get_value(body, Params, []),
-    case get_value(path, Params) of
-    "_changes" ->
-        {ok, Worker} = ibrowse:spawn_link_worker_process(Url);
-    _ ->
-        {ok, Worker} = couch_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool)
-    end,
-    IbrowseOptions = [
-        {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} |
-        lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
-            HttpDb#httpdb.ibrowse_options)
-    ],
-    Response = ibrowse:send_req_direct(
-        Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity),
-    {Worker, Response}.
-
-
-process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
-    send_req(HttpDb, Params, Callback);
-
-process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
-    % ibrowse worker terminated because remote peer closed the socket
-    % -> not an error
-    send_req(HttpDb, Params, Cb);
-
-process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
-    process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
-
-process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
-    release_worker(Worker, HttpDb),
-    case list_to_integer(Code) of
-    Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
-        EJson = case Body of
-        <<>> ->
-            null;
-        Json ->
-            ?JSON_DECODE(Json)
-        end,
-        Callback(Ok, Headers, EJson);
-    R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
-        do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
-    Error ->
-        maybe_retry({code, Error}, Worker, HttpDb, Params, Callback)
-    end;
-
-process_response(Error, Worker, HttpDb, Params, Callback) ->
-    maybe_retry(Error, Worker, HttpDb, Params, Callback).
-
-
-process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
-    receive
-    {ibrowse_async_headers, ReqId, Code, Headers} ->
-        case list_to_integer(Code) of
-        Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
-            StreamDataFun = fun() ->
-                stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
-            end,
-            ibrowse:stream_next(ReqId),
-            try
-                Ret = Callback(Ok, Headers, StreamDataFun),
-                release_worker(Worker, HttpDb),
-                clean_mailbox_req(ReqId),
-                Ret
-            catch throw:{maybe_retry_req, Err} ->
-                clean_mailbox_req(ReqId),
-                maybe_retry(Err, Worker, HttpDb, Params, Callback)
-            end;
-        R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
-            do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
-        Error ->
-            report_error(Worker, HttpDb, Params, {code, Error})
-        end;
-    {ibrowse_async_response, ReqId, {error, _} = Error} ->
-        maybe_retry(Error, Worker, HttpDb, Params, Callback)
-    after HttpDb#httpdb.timeout + 500 ->
-        % Note: ibrowse should always reply with timeouts, but this doesn't
-        % seem to be always true when there's a very high rate of requests
-        % and many open connections.
-        maybe_retry(timeout, Worker, HttpDb, Params, Callback)
-    end.
-
-
-clean_mailbox_req(ReqId) ->
-    receive
-    {ibrowse_async_response, ReqId, _} ->
-        clean_mailbox_req(ReqId);
-    {ibrowse_async_response_end, ReqId} ->
-        clean_mailbox_req(ReqId)
-    after 0 ->
-        ok
-    end.
-
-
-release_worker(Worker, #httpdb{httpc_pool = Pool}) ->
-    ok = couch_httpc_pool:release_worker(Pool, Worker).
-
-
-maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
-    report_error(Worker, HttpDb, Params, {error, Error});
-
-maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
-    Params, Cb) ->
-    release_worker(Worker, HttpDb),
-    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
-    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
-    ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~s",
-        [Method, Url, Wait / 1000, error_cause(Error)]),
-    ok = timer:sleep(Wait),
-    Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
-    send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, Params, Cb).
-
-
-report_error(Worker, HttpDb, Params, Error) ->
-    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
-    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
-    do_report_error(Url, Method, Error),
-    release_worker(Worker, HttpDb),
-    exit({http_request_failed, Method, Url, Error}).
-
-
-do_report_error(Url, Method, {code, Code}) ->
-    ?LOG_ERROR("Replicator, request ~s to ~p failed. The received "
-        "HTTP error code is ~p", [Method, Url, Code]);
-
-do_report_error(FullUrl, Method, Error) ->
-    ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~s",
-        [Method, FullUrl, error_cause(Error)]).
-
-
-error_cause({error, Cause}) ->
-    lists:flatten(io_lib:format("~p", [Cause]));
-error_cause(Cause) ->
-    lists:flatten(io_lib:format("~p", [Cause])).
-
-
-stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
-    receive
-    {ibrowse_async_response, ReqId, {error, Error}} ->
-        throw({maybe_retry_req, Error});
-    {ibrowse_async_response, ReqId, <<>>} ->
-        ibrowse:stream_next(ReqId),
-        stream_data_self(HttpDb, Params, Worker, ReqId, Cb);
-    {ibrowse_async_response, ReqId, Data} ->
-        ibrowse:stream_next(ReqId),
-        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
-    {ibrowse_async_response_end, ReqId} ->
-        {<<>>, fun() -> throw({maybe_retry_req, more_data_expected}) end}
-    after T + 500 ->
-        % Note: ibrowse should always reply with timeouts, but this doesn't
-        % seem to be always true when there's a very high rate of requests
-        % and many open connections.
-        throw({maybe_retry_req, timeout})
-    end.
-
-
-full_url(#httpdb{url = BaseUrl}, Params) ->
-    Path = get_value(path, Params, []),
-    QueryArgs = get_value(qs, Params, []),
-    BaseUrl ++ Path ++ query_args_to_string(QueryArgs, []).
-
-
-query_args_to_string([], []) ->
-    "";
-query_args_to_string([], Acc) ->
-    "?" ++ string:join(lists:reverse(Acc), "&");
-query_args_to_string([{K, V} | Rest], Acc) ->
-    query_args_to_string(Rest, [K ++ "=" ++ couch_httpd:quote(V) | Acc]).
-
-
-oauth_header(#httpdb{oauth = nil}, _ConnParams) ->
-    [];
-oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
-    Consumer = {
-        OAuth#oauth.consumer_key,
-        OAuth#oauth.consumer_secret,
-        OAuth#oauth.signature_method
-    },
-    Method = case get_value(method, ConnParams, get) of
-    get -> "GET";
-    post -> "POST";
-    put -> "PUT";
-    head -> "HEAD"
-    end,
-    QSL = get_value(qs, ConnParams, []),
-    OAuthParams = oauth:signed_params(Method,
-        BaseUrl ++ get_value(path, ConnParams, []),
-        QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL,
-    [{"Authorization",
-        "OAuth " ++ oauth_uri:params_to_header_string(OAuthParams)}].
-
-
-do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
-    release_worker(Worker, HttpDb),
-    RedirectUrl = redirect_url(Headers, Url),
-    {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
-    send_req(HttpDb2, Params2, Cb).
-
-
-redirect_url(RespHeaders, OrigUrl) ->
-    MochiHeaders = mochiweb_headers:make(RespHeaders),
-    RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
-    #url{
-        host = Host,
-        host_type = HostType,
-        port = Port,
-        path = Path,  % includes query string
-        protocol = Proto
-    } = ibrowse_lib:parse_url(RedUrl),
-    #url{
-        username = User,
-        password = Passwd
-    } = ibrowse_lib:parse_url(OrigUrl),
-    Creds = case is_list(User) andalso is_list(Passwd) of
-    true ->
-        User ++ ":" ++ Passwd ++ "@";
-    false ->
-        []
-    end,
-    HostPart = case HostType of
-    ipv6_address ->
-        "[" ++ Host ++ "]";
-    _ ->
-        Host
-    end,
-    atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++
-        integer_to_list(Port) ++ Path.
-
-after_redirect(RedirectUrl, 303, HttpDb, Params) ->
-    after_redirect(RedirectUrl, HttpDb, ?replace(Params, method, get));
-after_redirect(RedirectUrl, _Code, HttpDb, Params) ->
-    after_redirect(RedirectUrl, HttpDb, Params).
-
-after_redirect(RedirectUrl, HttpDb, Params) ->
-    Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
-    {HttpDb#httpdb{url = RedirectUrl}, Params2}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_httpc_pool.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_httpc_pool.erl b/src/couch_replicator/src/couch_httpc_pool.erl
deleted file mode 100644
index f6b7c26..0000000
--- a/src/couch_replicator/src/couch_httpc_pool.erl
+++ /dev/null
@@ -1,138 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_httpc_pool).
--behaviour(gen_server).
-
-% public API
--export([start_link/2, stop/1]).
--export([get_worker/1, release_worker/2]).
-
-% gen_server API
--export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
--export([code_change/3, terminate/2]).
-
--include("couch_db.hrl").
-
--import(couch_util, [
-    get_value/2,
-    get_value/3
-]).
-
--record(state, {
-    url,
-    limit,                  % max # of workers allowed
-    free = [],              % free workers (connections)
-    busy = [],              % busy workers (connections)
-    waiting = queue:new()   % blocked clients waiting for a worker
-}).
-
-
-start_link(Url, Options) ->
-    gen_server:start_link(?MODULE, {Url, Options}, []).
-
-
-stop(Pool) ->
-    ok = gen_server:call(Pool, stop, infinity).
-
-
-get_worker(Pool) ->
-    {ok, _Worker} = gen_server:call(Pool, get_worker, infinity).
-
-
-release_worker(Pool, Worker) ->
-    ok = gen_server:cast(Pool, {release_worker, Worker}).
-
-
-init({Url, Options}) ->
-    process_flag(trap_exit, true),
-    State = #state{
-        url = Url,
-        limit = get_value(max_connections, Options)
-    },
-    {ok, State}.
-
-
-handle_call(get_worker, From, #state{waiting = Waiting} = State) ->
-    #state{url = Url, limit = Limit, busy = Busy, free = Free} = State,
-    case length(Busy) >= Limit of
-    true ->
-        {noreply, State#state{waiting = queue:in(From, Waiting)}};
-    false ->
-        case Free of
-        [] ->
-           {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
-           Free2 = Free;
-        [Worker | Free2] ->
-           ok
-        end,
-        NewState = State#state{free = Free2, busy = [Worker | Busy]},
-        {reply, {ok, Worker}, NewState}
-    end;
-
-handle_call(stop, _From, State) ->
-    {stop, normal, ok, State}.
-
-
-handle_cast({release_worker, Worker}, #state{waiting = Waiting} = State) ->
-    case is_process_alive(Worker) andalso
-        lists:member(Worker, State#state.busy) of
-    true ->
-        case queue:out(Waiting) of
-        {empty, Waiting2} ->
-            Busy2 = State#state.busy -- [Worker],
-            Free2 = [Worker | State#state.free];
-        {{value, From}, Waiting2} ->
-            gen_server:reply(From, {ok, Worker}),
-            Busy2 = State#state.busy,
-            Free2 = State#state.free
-        end,
-        NewState = State#state{
-           busy = Busy2,
-           free = Free2,
-           waiting = Waiting2
-        },
-        {noreply, NewState};
-   false ->
-        {noreply, State}
-   end.
-
-
-handle_info({'EXIT', Pid, _Reason}, #state{busy = Busy, free = Free} = State) ->
-    case Free -- [Pid] of
-    Free ->
-        case Busy -- [Pid] of
-        Busy ->
-            {noreply, State};
-        Busy2 ->
-            case queue:out(State#state.waiting) of
-            {empty, _} ->
-                {noreply, State#state{busy = Busy2}};
-            {{value, From}, Waiting2} ->
-                {ok, Worker} = ibrowse:spawn_link_worker_process(State#state.url),
-                gen_server:reply(From, {ok, Worker}),
-                {noreply, State#state{busy = [Worker | Busy2], waiting = Waiting2}}
-            end
-        end;
-    Free2 ->
-        {noreply, State#state{free = Free2}}
-    end.
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-terminate(_Reason, State) ->
-    lists:foreach(fun ibrowse_http_client:stop/1, State#state.free),
-    lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).
-

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_httpd_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_httpd_replicator.erl b/src/couch_replicator/src/couch_httpd_replicator.erl
deleted file mode 100644
index fb1e350..0000000
--- a/src/couch_replicator/src/couch_httpd_replicator.erl
+++ /dev/null
@@ -1,66 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_httpd_replicator).
-
--include("couch_db.hrl").
-
--import(couch_httpd, [
-    send_json/2,
-    send_json/3,
-    send_method_not_allowed/2
-]).
-
--import(couch_util, [
-    to_binary/1
-]).
-
--export([handle_req/1]).
-
-
-handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
-    couch_httpd:validate_ctype(Req, "application/json"),
-    RepDoc = {Props} = couch_httpd:json_body_obj(Req),
-    validate_rep_props(Props),
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepDoc, UserCtx),
-    case couch_replicator:replicate(Rep) of
-    {error, {Error, Reason}} ->
-        send_json(
-            Req, 404,
-            {[{error, to_binary(Error)}, {reason, to_binary(Reason)}]});
-    {error, not_found} ->
-        % Tried to cancel a replication that didn't exist.
-        send_json(Req, 404, {[{error, <<"not found">>}]});
-    {error, Reason} ->
-        send_json(Req, 500, {[{error, to_binary(Reason)}]});
-    {ok, {cancelled, RepId}} ->
-        send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
-    {ok, {continuous, RepId}} ->
-        send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
-    {ok, {HistoryResults}} ->
-        send_json(Req, {[{ok, true} | HistoryResults]})
-    end;
-
-handle_req(Req) ->
-    send_method_not_allowed(Req, "POST").
-
-validate_rep_props([]) ->
-    ok;
-validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
-    lists:foreach(fun
-        ({_,V}) when is_binary(V) -> ok;
-        ({K,_}) -> throw({bad_request,
-            <<K/binary," value must be a string.">>})
-        end, Params),
-    validate_rep_props(Rest);
-validate_rep_props([_|Rest]) ->
-    validate_rep_props(Rest).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_rep_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_rep_sup.erl b/src/couch_replicator/src/couch_rep_sup.erl
deleted file mode 100644
index 1318c59..0000000
--- a/src/couch_replicator/src/couch_rep_sup.erl
+++ /dev/null
@@ -1,31 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_rep_sup).
--behaviour(supervisor).
--export([init/1, start_link/0]).
-
--include("couch_db.hrl").
-
-start_link() ->
-    supervisor:start_link({local,?MODULE}, ?MODULE, []).
-
-%%=============================================================================
-%% supervisor callbacks
-%%=============================================================================
-
-init([]) ->
-    {ok, {{one_for_one, 3, 10}, []}}.
-
-%%=============================================================================
-%% internal functions
-%%=============================================================================

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replication_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replication_manager.erl b/src/couch_replicator/src/couch_replication_manager.erl
deleted file mode 100644
index 6b66cf3..0000000
--- a/src/couch_replicator/src/couch_replication_manager.erl
+++ /dev/null
@@ -1,626 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replication_manager).
--behaviour(gen_server).
-
-% public API
--export([replication_started/1, replication_completed/1, replication_error/2]).
-
-% gen_server callbacks
--export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
--export([code_change/3, terminate/2]).
-
--include("couch_db.hrl").
--include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
-
--define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
--define(REP_TO_STATE, couch_rep_id_to_rep_state).
--define(INITIAL_WAIT, 2.5). % seconds
--define(MAX_WAIT, 600).     % seconds
-
--record(rep_state, {
-    rep,
-    starting,
-    retries_left,
-    max_retries,
-    wait = ?INITIAL_WAIT
-}).
-
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
-]).
-
--record(state, {
-    changes_feed_loop = nil,
-    db_notifier = nil,
-    rep_db_name = nil,
-    rep_start_pids = [],
-    max_retries
-}).
-
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-replication_started(#rep{id = {BaseId, _} = RepId}) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        update_rep_doc(DocId, [
-            {<<"_replication_state">>, <<"triggered">>},
-            {<<"_replication_id">>, ?l2b(BaseId)}]),
-        ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
-        ?LOG_INFO("Document `~s` triggered replication `~s`",
-            [DocId, pp_rep_id(RepId)])
-    end.
-
-
-replication_completed(#rep{id = RepId}) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
-        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
-        ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
-            [pp_rep_id(RepId), DocId])
-    end.
-
-
-replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        % TODO: maybe add error reason to replication document
-        update_rep_doc(DocId, [
-            {<<"_replication_state">>, <<"error">>},
-            {<<"_replication_id">>, ?l2b(BaseId)}]),
-        ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
-    end.
-
-
-init(_) ->
-    process_flag(trap_exit, true),
-    ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
-    ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
-    Server = self(),
-    ok = couch_config:register(
-        fun("replicator", "db", NewName) ->
-            ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
-        ("replicator", "max_replication_retry_count", V) ->
-            ok = gen_server:cast(Server, {set_max_retries, retries_value(V)})
-        end
-    ),
-    {Loop, RepDbName} = changes_feed_loop(),
-    {ok, #state{
-        changes_feed_loop = Loop,
-        rep_db_name = RepDbName,
-        db_notifier = db_update_notifier(),
-        max_retries = retries_value(
-            couch_config:get("replicator", "max_replication_retry_count", "10"))
-    }}.
-
-
-handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) ->
-    NewState = try
-        process_update(State, Change)
-    catch
-    _Tag:Error ->
-        {RepProps} = get_value(doc, ChangeProps),
-        DocId = get_value(<<"_id">>, RepProps),
-        rep_db_update_error(Error, DocId),
-        State
-    end,
-    {reply, ok, NewState};
-
-
-handle_call({rep_started, RepId}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        ok;
-    RepState ->
-        NewRepState = RepState#rep_state{
-            starting = false,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries,
-            wait = ?INITIAL_WAIT
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
-    end,
-    {reply, ok, State};
-
-handle_call({rep_complete, RepId}, _From, State) ->
-    true = ets:delete(?REP_TO_STATE, RepId),
-    {reply, ok, State};
-
-handle_call({rep_error, RepId, Error}, _From, State) ->
-    {reply, ok, replication_error(State, RepId, Error)};
-
-handle_call(Msg, From, State) ->
-    ?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
-        [Msg, From]),
-    {stop, {error, {unexpected_call, Msg}}, State}.
-
-
-handle_cast({rep_db_changed, NewName}, #state{rep_db_name = NewName} = State) ->
-    {noreply, State};
-
-handle_cast({rep_db_changed, _NewName}, State) ->
-    {noreply, restart(State)};
-
-handle_cast({rep_db_created, NewName}, #state{rep_db_name = NewName} = State) ->
-    {noreply, State};
-
-handle_cast({rep_db_created, _NewName}, State) ->
-    {noreply, restart(State)};
-
-handle_cast({set_max_retries, MaxRetries}, State) ->
-    {noreply, State#state{max_retries = MaxRetries}};
-
-handle_cast(Msg, State) ->
-    ?LOG_ERROR("Replication manager received unexpected cast ~p", [Msg]),
-    {stop, {error, {unexpected_cast, Msg}}, State}.
-
-
-handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
-    % replicator DB deleted
-    {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
-
-handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
-    ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
-    {stop, {db_update_notifier_died, Reason}, State};
-
-handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
-    % one of the replication start processes terminated successfully
-    {noreply, State#state{rep_start_pids = Pids -- [From]}};
-
-handle_info({'DOWN', _Ref, _, _, _}, State) ->
-    % From a db monitor created by a replication process. Ignore.
-    {noreply, State};
-
-handle_info(Msg, State) ->
-    ?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]),
-    {stop, {unexpected_msg, Msg}, State}.
-
-
-terminate(_Reason, State) ->
-    #state{
-        rep_start_pids = StartPids,
-        changes_feed_loop = Loop,
-        db_notifier = DbNotifier
-    } = State,
-    stop_all_replications(),
-    lists:foreach(
-        fun(Pid) ->
-            catch unlink(Pid),
-            catch exit(Pid, stop)
-        end,
-        [Loop | StartPids]),
-    true = ets:delete(?REP_TO_STATE),
-    true = ets:delete(?DOC_TO_REP),
-    couch_db_update_notifier:stop(DbNotifier).
-
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-changes_feed_loop() ->
-    {ok, RepDb} = ensure_rep_db_exists(),
-    RepDbName = couch_db:name(RepDb),
-    couch_db:close(RepDb),
-    Server = self(),
-    Pid = spawn_link(
-        fun() ->
-            DbOpenOptions = [{user_ctx, RepDb#db.user_ctx}, sys_db],
-            {ok, Db} = couch_db:open_int(RepDbName, DbOpenOptions),
-            ChangesFeedFun = couch_changes:handle_changes(
-                #changes_args{
-                    include_docs = true,
-                    feed = "continuous",
-                    timeout = infinity,
-                    db_open_options = [sys_db]
-                },
-                {json_req, null},
-                Db
-            ),
-            ChangesFeedFun(
-                fun({change, Change, _}, _) ->
-                    case has_valid_rep_id(Change) of
-                    true ->
-                        ok = gen_server:call(
-                            Server, {rep_db_update, Change}, infinity);
-                    false ->
-                        ok
-                    end;
-                (_, _) ->
-                    ok
-                end
-            ),
-            couch_db:close(Db)
-        end
-    ),
-    {Pid, RepDbName}.
-
-
-has_valid_rep_id({Change}) ->
-    has_valid_rep_id(get_value(<<"id">>, Change));
-has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
-    false;
-has_valid_rep_id(_Else) ->
-    true.
-
-
-db_update_notifier() ->
-    Server = self(),
-    {ok, Notifier} = couch_db_update_notifier:start_link(
-        fun({created, DbName}) ->
-            case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
-            DbName ->
-                ok = gen_server:cast(Server, {rep_db_created, DbName});
-            _ ->
-                ok
-            end;
-        (_) ->
-            % no need to handle the 'deleted' event - the changes feed loop
-            % dies when the database is deleted
-            ok
-        end
-    ),
-    Notifier.
-
-
-restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
-    stop_all_replications(),
-    lists:foreach(
-        fun(Pid) ->
-            catch unlink(Pid),
-            catch exit(Pid, rep_db_changed)
-        end,
-        [Loop | StartPids]),
-    {NewLoop, NewRepDbName} = changes_feed_loop(),
-    State#state{
-        changes_feed_loop = NewLoop,
-        rep_db_name = NewRepDbName,
-        rep_start_pids = []
-    }.
-
-
-process_update(State, {Change}) ->
-    {RepProps} = JsonRepDoc = get_value(doc, Change),
-    DocId = get_value(<<"_id">>, RepProps),
-    case get_value(<<"deleted">>, Change, false) of
-    true ->
-        rep_doc_deleted(DocId),
-        State;
-    false ->
-        case get_value(<<"_replication_state">>, RepProps) of
-        undefined ->
-            maybe_start_replication(State, DocId, JsonRepDoc);
-        <<"triggered">> ->
-            maybe_start_replication(State, DocId, JsonRepDoc);
-        <<"completed">> ->
-            replication_complete(DocId),
-            State;
-        <<"error">> ->
-            case ets:lookup(?DOC_TO_REP, DocId) of
-            [] ->
-                maybe_start_replication(State, DocId, JsonRepDoc);
-            _ ->
-                State
-            end
-        end
-    end.
-
-
-rep_db_update_error(Error, DocId) ->
-    case Error of
-    {bad_rep_doc, Reason} ->
-        ok;
-    _ ->
-        Reason = to_binary(Error)
-    end,
-    ?LOG_ERROR("Replication manager, error processing document `~s`: ~s",
-        [DocId, Reason]),
-    update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]).
-
-
-rep_user_ctx({RepDoc}) ->
-    case get_value(<<"user_ctx">>, RepDoc) of
-    undefined ->
-        #user_ctx{};
-    {UserCtx} ->
-        #user_ctx{
-            name = get_value(<<"name">>, UserCtx, null),
-            roles = get_value(<<"roles">>, UserCtx, [])
-        }
-    end.
-
-
-maybe_start_replication(State, DocId, RepDoc) ->
-    #rep{id = {BaseId, _} = RepId} = Rep = parse_rep_doc(RepDoc),
-    case rep_state(RepId) of
-    nil ->
-        RepState = #rep_state{
-            rep = Rep,
-            starting = true,
-            retries_left = State#state.max_retries,
-            max_retries = State#state.max_retries
-        },
-        true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
-        true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
-        ?LOG_INFO("Attempting to start replication `~s` (document `~s`).",
-            [pp_rep_id(RepId), DocId]),
-        Pid = spawn_link(fun() -> start_replication(Rep, 0) end),
-        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
-    #rep_state{rep = #rep{doc_id = DocId}} ->
-        State;
-    #rep_state{starting = false, rep = #rep{doc_id = OtherDocId}} ->
-        ?LOG_INFO("The replication specified by the document `~s` was already"
-            " triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
-        State;
-    #rep_state{starting = true, rep = #rep{doc_id = OtherDocId}} ->
-        ?LOG_INFO("The replication specified by the document `~s` is already"
-            " being triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
-        State
-    end.
-
-
-parse_rep_doc(RepDoc) ->
-    {ok, Rep} = try
-        couch_replicator_utils:parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
-    catch
-    throw:{error, Reason} ->
-        throw({bad_rep_doc, Reason});
-    Tag:Err ->
-        throw({bad_rep_doc, to_binary({Tag, Err})})
-    end,
-    Rep.
-
-
-maybe_tag_rep_doc(DocId, {RepProps}, RepId) ->
-    case get_value(<<"_replication_id">>, RepProps) of
-    RepId ->
-        ok;
-    _ ->
-        update_rep_doc(DocId, [{<<"_replication_id">>, RepId}])
-    end.
-
-
-start_replication(Rep, Wait) ->
-    ok = timer:sleep(Wait * 1000),
-    case (catch couch_replicator:async_replicate(Rep)) of
-    {ok, _} ->
-        ok;
-    Error ->
-        replication_error(Rep, Error)
-    end.
-
-
-replication_complete(DocId) ->
-    case ets:lookup(?DOC_TO_REP, DocId) of
-    [{DocId, {BaseId, Ext} = RepId}] ->
-        case rep_state(RepId) of
-        nil ->
-            % Prior to OTP R14B02, temporary child specs remain in
-            % in the supervisor after a worker finishes - remove them.
-            % We want to be able to start the same replication but with
-            % eventually different values for parameters that don't
-            % contribute to its ID calculation.
-            _ = supervisor:delete_child(couch_rep_sup, BaseId ++ Ext);
-        #rep_state{} ->
-            ok
-        end,
-        true = ets:delete(?DOC_TO_REP, DocId);
-    _ ->
-        ok
-    end.
-
-
-rep_doc_deleted(DocId) ->
-    case ets:lookup(?DOC_TO_REP, DocId) of
-    [{DocId, RepId}] ->
-        couch_replicator:cancel_replication(RepId),
-        true = ets:delete(?REP_TO_STATE, RepId),
-        true = ets:delete(?DOC_TO_REP, DocId),
-        ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
-            " was deleted", [pp_rep_id(RepId), DocId]);
-    [] ->
-        ok
-    end.
-
-
-replication_error(State, RepId, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        State;
-    RepState ->
-        maybe_retry_replication(RepState, Error, State)
-    end.
-
-maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId},
-        max_retries = MaxRetries
-    } = RepState,
-    couch_replicator:cancel_replication(RepId),
-    true = ets:delete(?REP_TO_STATE, RepId),
-    true = ets:delete(?DOC_TO_REP, DocId),
-    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
-        "~nReached maximum retry attempts (~p).",
-        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
-    State;
-
-maybe_retry_replication(RepState, Error, State) ->
-    #rep_state{
-        rep = #rep{id = RepId, doc_id = DocId} = Rep
-    } = RepState,
-    #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
-    true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
-    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
-        "~nRestarting replication in ~p seconds.",
-        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
-    Pid = spawn_link(fun() -> start_replication(Rep, Wait) end),
-    State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
-
-
-stop_all_replications() ->
-    ?LOG_INFO("Stopping all ongoing replications because the replicator"
-        " database was deleted or changed", []),
-    ets:foldl(
-        fun({_, RepId}, _) ->
-            couch_replicator:cancel_replication(RepId)
-        end,
-        ok, ?DOC_TO_REP),
-    true = ets:delete_all_objects(?REP_TO_STATE),
-    true = ets:delete_all_objects(?DOC_TO_REP).
-
-
-update_rep_doc(RepDocId, KVs) ->
-    {ok, RepDb} = ensure_rep_db_exists(),
-    try
-        case couch_db:open_doc(RepDb, RepDocId, [ejson_body]) of
-        {ok, LatestRepDoc} ->
-            update_rep_doc(RepDb, LatestRepDoc, KVs);
-        _ ->
-            ok
-        end
-    catch throw:conflict ->
-        % Shouldn't happen, as by default only the role _replicator can
-        % update replication documents.
-        ?LOG_ERROR("Conflict error when updating replication document `~s`."
-            " Retrying.", [RepDocId]),
-        ok = timer:sleep(5),
-        update_rep_doc(RepDocId, KVs)
-    after
-        couch_db:close(RepDb)
-    end.
-
-update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
-    NewRepDocBody = lists:foldl(
-        fun({<<"_replication_state">> = K, State} = KV, Body) ->
-                case get_value(K, Body) of
-                State ->
-                    Body;
-                _ ->
-                    Body1 = lists:keystore(K, 1, Body, KV),
-                    lists:keystore(
-                        <<"_replication_state_time">>, 1, Body1,
-                        {<<"_replication_state_time">>, timestamp()})
-                end;
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody, KVs),
-    case NewRepDocBody of
-    RepDocBody ->
-        ok;
-    _ ->
-        % Might not succeed - when the replication doc is deleted right
-        % before this update (not an error, ignore).
-        couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
-    end.
-
-
-% RFC3339 timestamps.
-% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
-timestamp() ->
-    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
-    UTime = erlang:universaltime(),
-    LocalTime = calendar:universal_time_to_local_time(UTime),
-    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
-        calendar:datetime_to_gregorian_seconds(UTime),
-    zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
-    iolist_to_binary(
-        io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
-            [Year, Month, Day, Hour, Min, Sec,
-                zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
-
-zone(Hr, Min) when Hr >= 0, Min >= 0 ->
-    io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
-zone(Hr, Min) ->
-    io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
-
-
-
-ensure_rep_db_exists() ->
-    DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
-    UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
-    case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}]) of
-    {ok, Db} ->
-        Db;
-    _Error ->
-        {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}])
-    end,
-    ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
-    {ok, Db}.
-
-
-ensure_rep_ddoc_exists(RepDb, DDocID) ->
-    case couch_db:open_doc(RepDb, DDocID, []) of
-    {ok, _Doc} ->
-        ok;
-    _ ->
-        DDoc = couch_doc:from_json_obj({[
-            {<<"_id">>, DDocID},
-            {<<"language">>, <<"javascript">>},
-            {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-        ]}),
-        {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
-     end.
-
-
-% pretty-print replication id
-pp_rep_id(#rep{id = RepId}) ->
-    pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
-    Base ++ Extension.
-
-
-rep_state(RepId) ->
-    case ets:lookup(?REP_TO_STATE, RepId) of
-    [{RepId, RepState}] ->
-        RepState;
-    [] ->
-        nil
-    end.
-
-
-error_reason({error, Reason}) ->
-    Reason;
-error_reason(Reason) ->
-    Reason.
-
-
-retries_value("infinity") ->
-    infinity;
-retries_value(Value) ->
-    list_to_integer(Value).
-
-
-state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
-    Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
-    case Left of
-    infinity ->
-        State#rep_state{wait = Wait2};
-    _ ->
-        State#rep_state{retries_left = Left - 1, wait = Wait2}
-    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replication_notifier.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replication_notifier.erl b/src/couch_replicator/src/couch_replication_notifier.erl
deleted file mode 100644
index c686c2b..0000000
--- a/src/couch_replicator/src/couch_replication_notifier.erl
+++ /dev/null
@@ -1,57 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replication_notifier).
-
--behaviour(gen_event).
-
-% public API
--export([start_link/1, stop/1, notify/1]).
-
-% gen_event callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_event/2, handle_call/2, handle_info/2]).
-
--include("couch_db.hrl").
-
-start_link(FunAcc) ->
-    couch_event_sup:start_link(couch_replication,
-        {couch_replication_notifier, make_ref()}, FunAcc).
-
-notify(Event) ->
-    gen_event:notify(couch_replication, Event).
-
-stop(Pid) ->
-    couch_event_sup:stop(Pid).
-
-
-init(FunAcc) ->
-    {ok, FunAcc}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-handle_event(Event, Fun) when is_function(Fun, 1) ->
-    Fun(Event),
-    {ok, Fun};
-handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
-    Acc2 = Fun(Event, Acc),
-    {ok, {Fun, Acc2}}.
-
-handle_call(_Msg, State) ->
-    {reply, ok, State}.
-
-handle_info(_Msg, State) ->
-    {ok, State}.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f913ca6e/src/couch_replicator/src/couch_replicator.app.src
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator.app.src b/src/couch_replicator/src/couch_replicator.app.src
index 70485ee..c3425e6 100644
--- a/src/couch_replicator/src/couch_replicator.app.src
+++ b/src/couch_replicator/src/couch_replicator.app.src
@@ -13,20 +13,20 @@
 {application, couch_replicator, [
     {description, "CouchDB replicator"},
     {vsn, "@version@"},
-    {modules, [
-        couch_api_wrap_httpc,
-	    couch_api_wrap,
-        couch_api_httpc_pool,
-	    couch_httpd_replicator,
-	    couch_rep_sup,
-	    couch_replication_manager,
-	    couch_replication_notifier,
+    {modules, [  
+        couch_replicator_api_wrap,
+        couch_replicator_httpc,
+        couch_replicator_httpd,
+        couch_replicator_job_sup,
+        couch_replicator_notifier,
+        couch_replicator_manager,
+        couch_replicator_httpc_pool,
         couch_replicator_utils,
-	    couch_replicator_worker,
-	    couch_replicator
+        couch_replicator_worker,
+        couch_replicator
     ]},
     {registered, [
-        couch_rep_sup
+        couch_replicator_job_sup
     ]},
     {applications, [kernel, stdlib]}
 ]}.


Mime
View raw message