couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [21/49] couchdb commit: updated refs/heads/1843-feature-bigcouch to 3069c01
Date Wed, 05 Feb 2014 14:50:43 GMT
Remove src/couch_replicator


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/550e8202
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/550e8202
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/550e8202

Branch: refs/heads/1843-feature-bigcouch
Commit: 550e8202ae88b667516c51701af25927411e24d3
Parents: 2acbbd3
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Tue Feb 4 17:40:14 2014 -0600
Committer: Paul J. Davis <paul.joseph.davis@gmail.com>
Committed: Tue Feb 4 17:40:14 2014 -0600

----------------------------------------------------------------------
 src/couch_replicator/README.md                  |  200 ----
 .../src/couch_replicator.app.src                |   44 -
 src/couch_replicator/src/couch_replicator.erl   | 1002 ------------------
 src/couch_replicator/src/couch_replicator.hrl   |   30 -
 .../src/couch_replicator_api_wrap.erl           |  802 --------------
 .../src/couch_replicator_api_wrap.hrl           |   36 -
 .../src/couch_replicator_app.erl                |   17 -
 .../src/couch_replicator_httpc.erl              |  297 ------
 .../src/couch_replicator_httpc_pool.erl         |  194 ----
 .../src/couch_replicator_httpd.erl              |   65 --
 .../src/couch_replicator_job_sup.erl            |   29 -
 .../src/couch_replicator_js_functions.hrl       |  151 ---
 .../src/couch_replicator_manager.erl            |  889 ----------------
 .../src/couch_replicator_notifier.erl           |   57 -
 .../src/couch_replicator_sup.erl                |   42 -
 .../src/couch_replicator_utils.erl              |  432 --------
 .../src/couch_replicator_worker.erl             |  514 ---------
 src/couch_replicator/src/json_stream_parse.erl  |  432 --------
 src/couch_replicator/test/01-load.t             |   37 -
 src/couch_replicator/test/02-httpc-pool.t       |  240 -----
 .../test/03-replication-compact.t               |  493 ---------
 .../test/04-replication-large-atts.t            |  256 -----
 .../test/05-replication-many-leaves.t           |  283 -----
 .../test/06-doc-missing-stubs.t                 |  293 -----
 24 files changed, 6835 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/README.md
----------------------------------------------------------------------
diff --git a/src/couch_replicator/README.md b/src/couch_replicator/README.md
deleted file mode 100644
index 376b3fc..0000000
--- a/src/couch_replicator/README.md
+++ /dev/null
@@ -1,200 +0,0 @@
-## Replicator
-
-### Overview
-
-Replication is a process that synchronizes two databases. All changes
-in one database (the source), inserts, updates, and deletes, are made
-to the second database (the target). The replication can be run on
-either the node containing the source (a *push* replication), or on the
-node containing the target database (a *pull* replication) or on a
-third node different from either the source or the target.
-
-#### Background
-
-This current version is based on the latest from CouchDB as partially
-documented on the CouchDB  [WIKI][1]. The first solid version that worked
-was implemented by [Adam Kocoloski][4] and subsequently picked up by [Filipe
-Manana][3], another CouchDB committer, who did two major reworkings. The
-first added the [replicator][2] db and the second some performance
-improvements and configurability to adjust factors that affect
-performance.
- 
-This was then ported and integrated with dbcore. The changes required
-were modest but there are notable differences in the use of the
-replicator db as noted below. These notes are intended to be the
-definitive resource for Cloudant users and will be kept in sync with
-code changes
-
-### Usage
-
-The following examples all assume the use of the `_replicate2` end
-point. Eventualy this will be replaced by `_replicate` but this allows
-us to support both the new and old replicator during the testing
-phase, .eg.:
-
-    curl -X POST -H 'content-type:application/json'
-    http://bitdiddle.cloudant.com/_replicate2 -d
-    '{"source":http://cust1.cloudant.com/foo","target":"http://cust2.cloudant.com/bar"}' 
-
-In the remaining examples we'll just note the JSON
-bodies. `_replicate2` is really all there is to calling the
-replicator, all the parameters are in the JSON body. For example a
-simple pull replication (run from the target machine):
-
-    {"source":"http://bitdiddle.cloudant.com/foo",
-     "target":"http://nordier.cloudant.com/foo"}
-To create the target db also:
-
-    {"source":"http://mazincas.cloudant.com/foo",
-     "target":"http://nordier.cloudant.com/foo",
-     "create_target": true}
-
-A continuous replication will stay running and as changes occur in the
-source, replicate them to the target. Under the covers it makes use of
-a continuous `_changes` feed:
-
-    {"source":"http://bitdiddle.cloudant.com/foo",
-     "target":"http://nordier.cloudant.com/foo",
-     "continuous":true}
-
-A replication can be stopped by posting the same body but with a
-cancel property added:
-
-    {"source":"http://bitdiddle.cloudant.com/foo",
-     "target":"http://nordier.cloudant.com/foo",
-     "continuous":true,
-     "cancel":true}
-
-When a continuous replication is run a replication id is returned,
-that provides an aditional method for cancelling the replication:
-
-    {"source":"http://bitdiddle.cloudant.com/foo",
-     "target":"http://nordier.cloudant.com/foo",
-     "continuous":true}
-
-    {"ok":true,"_local_id":"0a81b645497e6270611ec3419767a584+continuous+create_target"}
-
-To cancel:
-
-    {"replication_id":
-    "0a81b645497e6270611ec3419767a584+continuous+create_target",
-    "cancel": true}
-
-These are the main use cases. Additionally there are filter functions
-supported, JS functions that control whether a doc is replicated. They
-work similar to views, .eg.:
-
-    {
-    "_id":"_design/foo",
-    "filters": {
-      "foo": function(doc, req) {
-                   if (doc.name == req.query.key) {
-                      return true;
-                   } else {
-                      return false;
-                   }
-                }
-       }
-    }
-
-It's use is specified in the replication as follows:
-
-    {"source":" "http://nordier.cloudant.com/foo", 
-     "target":"http://beezlechaus.cloudant.com/foo",
-     "filter":"foo/foo",
-     "query_params": {"key":"value"}
-    }
-
-Some applications might need to just replicate a few docs in which
-case `doc_ids` can be used:
-
-    {"source":"http://nordier.cloudant.com/foo",
-     "target":"http://mazincas.cloudant.com/foo",
-     "doc_ids":["foo","bar","baz"]
-    }
-
-### Replicator DB
-
-A replicator db is a new way to manage replications better by storing
-them in a database. One just puts a JSON doc to the replicator db with
-a body the same as if is were posted to _replicate2
-
-    curl -X PUT 
-    http://bitdiddle.cloudant.com/_replicator/repl1 -d
-    '{"source":http://cust1.cloudant.com/foo",
-      "target":"http://cust2.cloudant.com/bar"}'
-
-The doc might look something like:
-
-<pre>
-{
-    "_id": "repl1",
-    "source":  "http://cust1.cloudant.com/foo",
-    "target":  "http://cust1.cloudant.com/foo", 
-    "create_target":  true,
-    "_replication_id":  "c0ebe9256695ff083347cbf95f93e280",
-    "_replication_state":  "triggered",
-    "_replication_state_time":  "2011-06-07T16:54:35+01:00"
-}
-</pre>
-
-This replication now persists across server restarts and the
-replication can be cancelled by simply deleting the doc. The
-`_replication_state` will change to `completed` when a replication
-finishes, which may not happen in the case of a continuous
-replication. It can also change to `error` if it fails.
-
-### Advanced Configurations
-
-There are a few additional arguments that can be passed to a
-replication that govern it's performance. These are to be used with
-care as they can have signifificant impact on the system in a
-multi-tenant environment.
-
- * worker_processes - The default is 4. This controls how many
-   separate processes will read from the changes manager and write to
-   the target. A higher number can improve throughput.
-
- * worker_batch_size - The default is 500. This controls how many
-   documents are processed. After each batch a checkpoint is written
-   so this controls how frequently checkpointing occurs.
-
- * http_connections - The maximum number of http connections used per
-   replication. The default is 20.
-
- * connection_timeout - How long a connection can remain idle, the
-   default is 30000 (30s)
-
- * retries_per_request - When requests fail the number of times to
-   retry, the default is 10. There is a wait period between each
-   attempt, that begins with 0.25s and doubles on each iteration, with
-   a cap at 5 minutes.
-
- * socket_options - a list of options that can be used with the socket
-   connections. See the [erlang documentation][6] for details.
-
-
-### Differences from CouchDB
-
-CouchDB allows the replicator db to be shared by multiple users,
-controlling access via update handlers. It also allows the replicator
-db to be changed on the fly by setting a value in the config file. We
-support neither of these in dbcore. Each account has it's own
-replicator db, named _replicator, and that's it.
-
-Another difference to note is in specifying urls. Using the name of a
-local db works fine in a single CouchDB instance but makes less sense
-in a clustered environment. Though we try to determine a full url to
-use this doesn't work well in practice so it's a good rule of thumb to
-always use the full urls in both the source and target.
-
-### Overview of Code and Design
-
-----
-
-[1]: http://wiki.apache.org/couchdb/Replication
-[2]: https://gist.github.com/832610
-[3]: https://github.com/fdmanana
-[4]: https://github.com/kocolosk
-[5]: http://guide.couchdb.org/draft/replication.html
-[6]: http://www.erlang.org/doc/man/inet.html#setopts-2

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator.app.src
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator.app.src b/src/couch_replicator/src/couch_replicator.app.src
deleted file mode 100644
index 2da0d4a..0000000
--- a/src/couch_replicator/src/couch_replicator.app.src
+++ /dev/null
@@ -1,44 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-{application, couch_replicator, [
-    {description, "CouchDB replicator"},
-    {vsn, git},
-    {modules, [
-        couch_replicator,
-        couch_replicator_api_wrap,
-        couch_replicator_app,
-        couch_replicator_httpc,
-        couch_replicator_httpd,
-        couch_replicator_job_sup,
-        couch_replicator_notifier,
-        couch_replicator_manager,
-        couch_replicator_httpc_pool,
-        couch_replicator_sup,
-        couch_replicator_utils,
-        couch_replicator_worker
-    ]},
-    {registered, [
-        couch_replication,
-        couch_replicator_manager,
-        couch_replicator_job_sup
-    ]},
-    {applications, [
-        kernel,
-        stdlib,
-        twig,
-        fabric,
-        mem3,
-        couch
-    ]}
-]}.
-

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
deleted file mode 100644
index e1ff15c..0000000
--- a/src/couch_replicator/src/couch_replicator.erl
+++ /dev/null
@@ -1,1002 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator).
--behaviour(gen_server).
-
-% public API
--export([replicate/2]).
-
-% meant to be used only by the replicator database listener
--export([async_replicate/1]).
--export([cancel_replication/1]).
-
-% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
-
--include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
--include("couch_replicator.hrl").
-
--define(LOWEST_SEQ, 0).
-
--import(couch_util, [
-    get_value/2,
-    get_value/3,
-    to_binary/1
-]).
-
--import(couch_replicator_utils, [
-    start_db_compaction_notifier/2,
-    stop_db_compaction_notifier/1
-]).
-
--record(rep_state, {
-    rep_details,
-    source_name,
-    target_name,
-    source,
-    target,
-    history,
-    checkpoint_history,
-    start_seq,
-    committed_seq,
-    current_through_seq,
-    seqs_in_progress = [],
-    highest_seq_done = {0, ?LOWEST_SEQ},
-    source_log,
-    target_log,
-    rep_starttime,
-    src_starttime,
-    tgt_starttime,
-    timer, % checkpoint timer
-    changes_queue,
-    changes_manager,
-    changes_reader,
-    workers,
-    stats = #rep_stats{},
-    session_id,
-    source_db_compaction_notifier = nil,
-    target_db_compaction_notifier = nil,
-    source_monitor = nil,
-    target_monitor = nil,
-    source_seq = nil,
-    use_checkpoints = true
-}).
-
-
-replicate(PostBody, Ctx) ->
-    {ok, #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep} =
-        couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
-    case get_value(cancel, Options, false) of
-    true ->
-        case get_value(id, Options, nil) of
-        nil ->
-            cancel_replication(RepId);
-        RepId2 ->
-            cancel_replication(RepId2, UserCtx)
-        end;
-    false ->
-        {ok, Listener} = rep_result_listener(RepId),
-        Result = do_replication_loop(Rep),
-        couch_replicator_notifier:stop(Listener),
-        Result
-    end.
-
-
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
-    case async_replicate(Rep) of
-    {ok, _Pid} ->
-        case get_value(continuous, Options, false) of
-        true ->
-            {ok, {continuous, ?l2b(BaseId ++ Ext)}};
-        false ->
-            wait_for_result(Id)
-        end;
-    Error ->
-        Error
-    end.
-
-
-async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
-    RepChildId = BaseId ++ Ext,
-    Source = couch_replicator_api_wrap:db_uri(Src),
-    Target = couch_replicator_api_wrap:db_uri(Tgt),
-    Timeout = get_value(connection_timeout, Rep#rep.options),
-    ChildSpec = {
-        RepChildId,
-        {gen_server, start_link, [?MODULE, Rep, [{timeout, Timeout}]]},
-        temporary,
-        250,
-        worker,
-        [?MODULE]
-    },
-    % All these nested cases to attempt starting/restarting a replication child
-    % are ugly and not 100% race condition free. The following patch submission
-    % is a solution:
-    %
-    % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
-    %
-    case supervisor:start_child(couch_replicator_job_sup, ChildSpec) of
-    {ok, Pid} ->
-        twig:log(notice,"starting new replication `~s` at ~p (`~s` -> `~s`)",
-            [RepChildId, Pid, Source, Target]),
-        {ok, Pid};
-    {error, already_present} ->
-        case supervisor:restart_child(couch_replicator_job_sup, RepChildId) of
-        {ok, Pid} ->
-            twig:log(notice,"restarting replication `~s` at ~p (`~s` -> `~s`)",
-                [RepChildId, Pid, Source, Target]),
-            {ok, Pid};
-        {error, running} ->
-            %% this error occurs if multiple replicators are racing
-            %% each other to start and somebody else won. Just grab
-            %% the Pid by calling start_child again.
-            timer:sleep(50 + random:uniform(100)),
-            async_replicate(Rep);
-        {error, {'EXIT', {badarg,
-            [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
-            % Clause to deal with a change in the supervisor module introduced
-            % in R14B02. For more details consult the thread at:
-            %     http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
-            _ = supervisor:delete_child(couch_replicator_job_sup, RepChildId),
-            async_replicate(Rep);
-        {error, _} = Error ->
-            Error
-        end;
-    {error, {already_started, Pid}} ->
-        twig:log(notice,"replication `~s` already running at ~p (`~s` -> `~s`)",
-            [RepChildId, Pid, Source, Target]),
-        {ok, Pid};
-    {error, {Error, _}} ->
-        {error, Error}
-    end.
-
-
-rep_result_listener(RepId) ->
-    ReplyTo = self(),
-    {ok, _Listener} = couch_replicator_notifier:start_link(
-        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
-                ReplyTo ! Ev;
-            (_) ->
-                ok
-        end).
-
-
-wait_for_result(RepId) ->
-    receive
-    {finished, RepId, RepResult} ->
-        {ok, RepResult};
-    {error, RepId, Reason} ->
-        {error, Reason}
-    end.
-
-
-cancel_replication({BaseId, Extension}) ->
-    FullRepId = BaseId ++ Extension,
-    twig:log(notice,"Canceling replication `~s`...", [FullRepId]),
-    case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
-    ok ->
-        twig:log(notice,"Replication `~s` canceled.", [FullRepId]),
-        case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
-            ok ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            {error, not_found} ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            Error ->
-                Error
-        end;
-    Error ->
-        twig:log(error,"Error canceling replication `~s`: ~p", [FullRepId, Error]),
-        Error
-    end.
-
-cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
-    case lists:member(<<"_admin">>, Roles) of
-    true ->
-        cancel_replication(RepId);
-    false ->
-        {BaseId, Ext} = RepId,
-        case lists:keysearch(
-            BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
-        {value, {_, Pid, _, _}} when is_pid(Pid) ->
-            case (catch gen_server:call(Pid, get_details, infinity)) of
-            {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
-                cancel_replication(RepId);
-            {ok, _} ->
-                throw({unauthorized,
-                    <<"Can't cancel a replication triggered by another user">>});
-            {'EXIT', {noproc, {gen_server, call, _}}} ->
-                {error, not_found};
-            Error ->
-                throw(Error)
-            end;
-        _ ->
-            {error, not_found}
-        end
-    end.
-
-init(InitArgs) ->
-    {ok, InitArgs, 0}.
-
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
-    process_flag(trap_exit, true),
-
-    #rep_state{
-        source = Source,
-        target = Target,
-        source_name = SourceName,
-        target_name = TargetName,
-        start_seq = {_Ts, StartSeq},
-        source_seq = SourceCurSeq,
-        committed_seq = {_, CommittedSeq}
-    } = State = init_state(Rep),
-
-    NumWorkers = get_value(worker_processes, Options),
-    BatchSize = get_value(worker_batch_size, Options),
-    {ok, ChangesQueue} = couch_work_queue:new([
-        {max_items, BatchSize * NumWorkers * 2},
-        {max_size, 100 * 1024 * NumWorkers}
-    ]),
-    % This starts the _changes reader process. It adds the changes from
-    % the source db to the ChangesQueue.
-    ChangesReader = spawn_changes_reader(StartSeq, Source, ChangesQueue, Options),
-    % Changes manager - responsible for dequeing batches from the changes queue
-    % and deliver them to the worker processes.
-    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
-    % This starts the worker processes. They ask the changes queue manager for a
-    % a batch of _changes rows to process -> check which revs are missing in the
-    % target, and for the missing ones, it copies them from the source to the target.
-    MaxConns = get_value(http_connections, Options),
-    Workers = lists:map(
-        fun(_) ->
-            {ok, Pid} = couch_replicator_worker:start_link(
-                self(), Source, Target, ChangesManager, MaxConns),
-            Pid
-        end,
-        lists:seq(1, NumWorkers)),
-
-    couch_task_status:add_task([
-        {type, replication},
-        {user, UserCtx#user_ctx.name},
-        {replication_id, ?l2b(BaseId ++ Ext)},
-        {doc_id, Rep#rep.doc_id},
-        {source, ?l2b(SourceName)},
-        {target, ?l2b(TargetName)},
-        {continuous, get_value(continuous, Options, false)},
-        {revisions_checked, 0},
-        {missing_revisions_found, 0},
-        {docs_read, 0},
-        {docs_written, 0},
-        {changes_pending, pending(SourceCurSeq, CommittedSeq)},
-        {doc_write_failures, 0},
-        {source_seq, SourceCurSeq},
-        {checkpointed_source_seq, CommittedSeq},
-        {progress, 0}
-    ]),
-    couch_task_status:set_update_frequency(1000),
-
-    % Until OTP R14B03:
-    %
-    % Restarting a temporary supervised child implies that the original arguments
-    % (#rep{} record) specified in the MFA component of the supervisor
-    % child spec will always be used whenever the child is restarted.
-    % This implies the same replication performance tunning parameters will
-    % always be used. The solution is to delete the child spec (see
-    % cancel_replication/1) and then start the replication again, but this is
-    % unfortunately not immune to race conditions.
-
-    twig:log(notice,"Replication `~p` is using:~n"
-        "~c~p worker processes~n"
-        "~ca worker batch size of ~p~n"
-        "~c~p HTTP connections~n"
-        "~ca connection timeout of ~p milliseconds~n"
-        "~c~p retries per request~n"
-        "~csocket options are: ~s~s",
-        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
-            MaxConns, $\t, get_value(connection_timeout, Options),
-            $\t, get_value(retries, Options),
-            $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
-            case StartSeq of
-            ?LOWEST_SEQ ->
-                "";
-            _ ->
-                io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
-            end]),
-
-    twig:log(debug,"Worker pids are: ~p", [Workers]),
-
-    couch_replicator_manager:replication_started(Rep),
-
-    {ok, State#rep_state{
-            changes_queue = ChangesQueue,
-            changes_manager = ChangesManager,
-            changes_reader = ChangesReader,
-            workers = Workers
-        }
-    }.
-
-
-handle_info(shutdown, St) ->
-    {stop, shutdown, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    twig:log(error,"Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    twig:log(error,"Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
-    twig:log(error,"ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
-    twig:log(error,"ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
-    twig:log(error,"ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
-    case Workers -- [Pid] of
-    Workers ->
-        twig:log(error,"unknown pid bit the dust ~p ~n",[Pid]),
-        {noreply, State#rep_state{workers = Workers}};
-        %% not clear why a stop was here before
-        %%{stop, {unknown_process_died, Pid, normal}, State};
-    [] ->
-        catch unlink(State#rep_state.changes_manager),
-        catch exit(State#rep_state.changes_manager, kill),
-        do_last_checkpoint(State);
-    Workers2 ->
-        {noreply, State#rep_state{workers = Workers2}}
-    end;
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
-    State2 = cancel_timer(State),
-    case lists:member(Pid, Workers) of
-    false ->
-        {stop, {unknown_process_died, Pid, Reason}, State2};
-    true ->
-        twig:log(error,"Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
-    end;
-
-handle_info(timeout, InitArgs) ->
-    try do_init(InitArgs) of {ok, State} ->
-        {noreply, State}
-    catch Class:Error ->
-        Stack = erlang:get_stacktrace(),
-        {stop, shutdown, {error, Class, Error, Stack, InitArgs}}
-    end.
-
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
-    {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
-    gen_server:reply(From, ok),
-    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
-    {noreply, State#rep_state{stats = NewStats}};
-
-handle_call({report_seq_done, Seq, StatsInc}, From,
-    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
-        current_through_seq = ThroughSeq, stats = Stats} = State) ->
-    gen_server:reply(From, ok),
-    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
-    [Seq | Rest] ->
-        {Seq, Rest};
-    [_ | _] ->
-        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
-    end,
-    NewHighestDone = lists:max([HighestDone, Seq]),
-    NewThroughSeq = case NewSeqsInProgress of
-    [] ->
-        lists:max([NewThroughSeq0, NewHighestDone]);
-    _ ->
-        NewThroughSeq0
-    end,
-    twig:log(debug,"Worker reported seq ~p, through seq was ~p, "
-        "new through seq is ~p, highest seq done was ~p, "
-        "new highest seq done is ~p~n"
-        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
-        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
-            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
-    SourceCurSeq = source_cur_seq(State),
-    NewState = State#rep_state{
-        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
-        current_through_seq = NewThroughSeq,
-        seqs_in_progress = NewSeqsInProgress,
-        highest_seq_done = NewHighestDone,
-        source_seq = SourceCurSeq
-    },
-    update_task(NewState),
-    {noreply, NewState}.
-
-
-handle_cast({db_compacted, DbName},
-    #rep_state{source = #db{name = DbName} = Source} = State) ->
-    {ok, NewSource} = couch_db:reopen(Source),
-    {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
-    #rep_state{target = #db{name = DbName} = Target} = State) ->
-    {ok, NewTarget} = couch_db:reopen(Target),
-    {noreply, State#rep_state{target = NewTarget}};
-
-handle_cast(checkpoint, State) ->
-    case do_checkpoint(State) of
-    {ok, NewState} ->
-        {noreply, NewState#rep_state{timer = start_timer(State)}};
-    Error ->
-        {stop, Error, State}
-    end;
-
-handle_cast({report_seq, Seq},
-    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
-    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
-    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-code_change(_OldVsn, OldState, _Extra) when tuple_size(OldState) =:= 30 ->
-    {ok, erlang:append_element(OldState, true)};
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
-    checkpoint_history = CheckpointHistory} = State) ->
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
-    couch_replicator_manager:replication_completed(Rep, rep_stats(State));
-
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
-    % cancelled replication throught ?MODULE:cancel_replication/1
-    couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
-    terminate_cleanup(State);
-
-terminate(Reason, #rep_state{} = State) ->
-    #rep_state{
-        source_name = Source,
-        target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
-    } = State,
-    twig:log(error,"Replication `~s` (`~s` -> `~s`) failed: ~s",
-        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, Reason}),
-    couch_replicator_manager:replication_error(Rep, Reason);
-
-terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
-    #rep{id=RepId} = InitArgs,
-    twig:log(error,"~p:~p: Replication failed to start for args ~p: ~p",
-             [Class, Error, InitArgs, Stack]),
-    case Error of
-    {unauthorized, DbUri} ->
-        NotifyError = {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
-    {db_not_found, DbUri} ->
-        NotifyError = {db_not_found, <<"could not open ", DbUri/binary>>};
-    _ ->
-        NotifyError = Error
-    end,
-    couch_replicator_notifier:notify({error, RepId, NotifyError}),
-    couch_replicator_manager:replication_error(InitArgs, NotifyError).
-
-terminate_cleanup(State) ->
-    update_task(State),
-    stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
-    stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
-    couch_replicator_api_wrap:db_close(State#rep_state.source),
-    couch_replicator_api_wrap:db_close(State#rep_state.target).
-
-
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
-    {stop, normal, cancel_timer(State)};
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = Seq} = State) ->
-    case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
-    {ok, NewState} ->
-        {stop, normal, cancel_timer(NewState)};
-    Error ->
-        {stop, Error, State}
-    end.
-
-
-start_timer(State) ->
-    After = checkpoint_interval(State),
-    case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
-    {ok, Ref} ->
-        Ref;
-    Error ->
-        twig:log(error,"Replicator, error scheduling checkpoint:  ~p", [Error]),
-        nil
-    end.
-
-
-cancel_timer(#rep_state{timer = nil} = State) ->
-    State;
-cancel_timer(#rep_state{timer = Timer} = State) ->
-    {ok, cancel} = timer:cancel(Timer),
-    State#rep_state{timer = nil}.
-
-
-init_state(Rep) ->
-    #rep{
-        source = Src, target = Tgt,
-        options = Options, user_ctx = UserCtx
-    } = Rep,
-    {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
-        get_value(create_target, Options, false)),
-
-    {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
-    {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
-
-    [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
-
-    {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
-    StartSeq1 = get_value(since_seq, Options, StartSeq0),
-    StartSeq = {0, StartSeq1},
-    #doc{body={CheckpointHistory}} = SourceLog,
-    State = #rep_state{
-        rep_details = Rep,
-        source_name = couch_replicator_api_wrap:db_uri(Source),
-        target_name = couch_replicator_api_wrap:db_uri(Target),
-        source = Source,
-        target = Target,
-        history = History,
-        checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
-        start_seq = StartSeq,
-        current_through_seq = StartSeq,
-        committed_seq = StartSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = couch_util:rfc1123_date(),
-        src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
-        tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
-        session_id = couch_uuids:random(),
-        source_db_compaction_notifier =
-            start_db_compaction_notifier(Source, self()),
-        target_db_compaction_notifier =
-            start_db_compaction_notifier(Target, self()),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
-        source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
-        use_checkpoints = get_value(use_checkpoints, Options, true)
-    },
-    State#rep_state{timer = start_timer(State)}.
-
-
-find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
-    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
-    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
-
-
-fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
-    lists:reverse(Acc);
-
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
-    case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
-    {error, <<"not_found">>} when Vsn > 1 ->
-        OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
-        fold_replication_logs(Dbs, Vsn - 1,
-            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
-    {error, <<"not_found">>} ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
-    {ok, Doc} when LogId =:= NewId ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
-    {ok, Doc} ->
-        MigratedLog = #doc{id = NewId, body = Doc#doc.body},
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
-    end.
-
-
-spawn_changes_reader(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
-    spawn_link(fun() ->
-        put(last_seq, StartSeq),
-        put(retries_left, Db#httpdb.retries),
-        read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
-    end);
-spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
-    spawn_link(fun() ->
-        read_changes(StartSeq, Db, ChangesQueue, Options)
-    end).
-
-read_changes(StartSeq, Db, ChangesQueue, Options) ->
-    try
-        couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
-            fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
-                case Id of
-                <<>> ->
-                    % Previous CouchDB releases had a bug which allowed a doc
-                    % with an empty ID to be inserted into databases. Such doc
-                    % is impossible to GET.
-                    twig:log(error,"Replicator: ignoring document with empty ID in "
-                        "source database `~s` (_changes sequence ~p)",
-                        [couch_replicator_api_wrap:db_uri(Db), Seq]);
-                _ ->
-                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
-                end,
-                put(last_seq, Seq);
-            ({last_seq, LS}) ->
-                case get_value(continuous, Options) of
-                true ->
-                    % LS should never be undefined, but it doesn't hurt to be
-                    % defensive inside the replicator.
-                    Seq = case LS of undefined -> get(last_seq); _ -> LS end,
-                    read_changes(Seq, Db, ChangesQueue, Options);
-                _ ->
-                    % This clause is unreachable today, but let's plan ahead
-                    % for the future where we checkpoint against last_seq
-                    % instead of the sequence of the last change.  The two can
-                    % differ substantially in the case of a restrictive filter.
-                    ok
-                end
-            end, Options),
-        couch_work_queue:close(ChangesQueue)
-    catch exit:{http_request_failed, _, _, _} = Error ->
-        case get(retries_left) of
-        N when N > 0 ->
-            put(retries_left, N - 1),
-            LastSeq = get(last_seq),
-            Db2 = case LastSeq of
-            StartSeq ->
-                twig:log(notice,"Retrying _changes request to source database ~s"
-                    " with since=~p in ~p seconds",
-                    [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
-                ok = timer:sleep(Db#httpdb.wait),
-                Db#httpdb{wait = 2 * Db#httpdb.wait};
-            _ ->
-                twig:log(notice,"Retrying _changes request to source database ~s"
-                    " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
-                Db
-            end,
-            read_changes(LastSeq, Db2, ChangesQueue, Options);
-        _ ->
-            exit(Error)
-        end
-    end.
-
-
-spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
-    spawn_link(fun() ->
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
-    end).
-
-changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
-    receive
-    {get_changes, From} ->
-        case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
-        closed ->
-            From ! {closed, self()};
-        {ok, Changes} ->
-            #doc_info{high_seq = Seq} = lists:last(Changes),
-            ReportSeq = {Ts, Seq},
-            ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
-            From ! {changes, self(), Changes, ReportSeq}
-        end,
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
-    end.
-
-
-checkpoint_interval(_State) ->
-    5000.
-
-do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
-    NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
-    {ok, NewState};
-do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
-    SourceCurSeq = source_cur_seq(State),
-    NewState = State#rep_state{source_seq = SourceCurSeq},
-    update_task(NewState),
-    {ok, NewState};
-do_checkpoint(State) ->
-    #rep_state{
-        source_name=SourceName,
-        target_name=TargetName,
-        source = Source,
-        target = Target,
-        history = OldHistory,
-        start_seq = {_, StartSeq},
-        current_through_seq = {_Ts, NewSeq} = NewTsSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = ReplicationStartTime,
-        src_starttime = SrcInstanceStartTime,
-        tgt_starttime = TgtInstanceStartTime,
-        stats = Stats,
-        rep_details = #rep{options = Options},
-        session_id = SessionId
-    } = State,
-    case commit_to_both(Source, Target) of
-    {source_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
-    {target_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
-    {SrcInstanceStartTime, TgtInstanceStartTime} ->
-        twig:log(notice,"recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
-            [SourceName, TargetName, NewSeq]),
-        StartTime = ?l2b(ReplicationStartTime),
-        EndTime = ?l2b(couch_util:rfc1123_date()),
-        NewHistoryEntry = {[
-            {<<"session_id">>, SessionId},
-            {<<"start_time">>, StartTime},
-            {<<"end_time">>, EndTime},
-            {<<"start_last_seq">>, StartSeq},
-            {<<"end_last_seq">>, NewSeq},
-            {<<"recorded_seq">>, NewSeq},
-            {<<"missing_checked">>, Stats#rep_stats.missing_checked},
-            {<<"missing_found">>, Stats#rep_stats.missing_found},
-            {<<"docs_read">>, Stats#rep_stats.docs_read},
-            {<<"docs_written">>, Stats#rep_stats.docs_written},
-            {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
-        ]},
-        BaseHistory = [
-            {<<"session_id">>, SessionId},
-            {<<"source_last_seq">>, NewSeq},
-            {<<"replication_id_version">>, ?REP_ID_VERSION}
-        ] ++ case get_value(doc_ids, Options) of
-        undefined ->
-            [];
-        _DocIds ->
-            % backwards compatibility with the result of a replication by
-            % doc IDs in versions 0.11.x and 1.0.x
-            % TODO: deprecate (use same history format, simplify code)
-            [
-                {<<"start_time">>, StartTime},
-                {<<"end_time">>, EndTime},
-                {<<"docs_read">>, Stats#rep_stats.docs_read},
-                {<<"docs_written">>, Stats#rep_stats.docs_written},
-                {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
-            ]
-        end,
-        % limit history to 50 entries
-        NewRepHistory = {
-            BaseHistory ++
-            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
-        },
-
-        try
-            {SrcRevPos, SrcRevId} = update_checkpoint(
-                Source, SourceLog#doc{body = NewRepHistory}, source),
-            {TgtRevPos, TgtRevId} = update_checkpoint(
-                Target, TargetLog#doc{body = NewRepHistory}, target),
-            SourceCurSeq = source_cur_seq(State),
-            NewState = State#rep_state{
-                source_seq = SourceCurSeq,
-                checkpoint_history = NewRepHistory,
-                committed_seq = NewTsSeq,
-                source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
-                target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
-            },
-            update_task(NewState),
-            {ok, NewState}
-        catch throw:{checkpoint_commit_failure, _} = Failure ->
-            Failure
-        end;
-    {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Target database out of sync. "
-            "Try to increase max_dbs_open at the target's server.">>};
-    {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source database out of sync. "
-            "Try to increase max_dbs_open at the source's server.">>};
-    {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source and target databases out of "
-            "sync. Try to increase max_dbs_open at both servers.">>}
-    end.
-
-
-update_checkpoint(Db, Doc, DbType) ->
-    try
-        update_checkpoint(Db, Doc)
-    catch throw:{checkpoint_commit_failure, Reason} ->
-        throw({checkpoint_commit_failure,
-            <<"Error updating the ", (to_binary(DbType))/binary,
-                " checkpoint document: ", (to_binary(Reason))/binary>>})
-    end.
-
-update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
-    try
-        case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
-        {ok, PosRevId} ->
-            PosRevId;
-        {error, Reason} ->
-            throw({checkpoint_commit_failure, Reason})
-        end
-    catch throw:conflict ->
-        case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
-        {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
-            % This means that we were able to update successfully the
-            % checkpoint doc in a previous attempt but we got a connection
-            % error (timeout for e.g.) before receiving the success response.
-            % Therefore the request was retried and we got a conflict, as the
-            % revision we sent is not the current one.
-            % We confirm this by verifying the doc body we just got is the same
-            % that we have just sent.
-            {Pos, RevId};
-        _ ->
-            throw({checkpoint_commit_failure, conflict})
-        end
-    end.
-
-
-commit_to_both(Source, Target) ->
-    % commit the src async
-    ParentPid = self(),
-    SrcCommitPid = spawn_link(
-        fun() ->
-            Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
-            ParentPid ! {self(), Result}
-        end),
-
-    % commit tgt sync
-    TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
-
-    SourceResult = receive
-    {SrcCommitPid, Result} ->
-        unlink(SrcCommitPid),
-        receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
-        Result;
-    {'EXIT', SrcCommitPid, Reason} ->
-        {error, Reason}
-    end,
-    case TargetResult of
-    {ok, TargetStartTime} ->
-        case SourceResult of
-        {ok, SourceStartTime} ->
-            {SourceStartTime, TargetStartTime};
-        SourceError ->
-            {source_error, SourceError}
-        end;
-    TargetError ->
-        {target_error, TargetError}
-    end.
-
-
-compare_replication_logs(SrcDoc, TgtDoc) ->
-    #doc{body={RepRecProps}} = SrcDoc,
-    #doc{body={RepRecPropsTgt}} = TgtDoc,
-    case get_value(<<"session_id">>, RepRecProps) ==
-            get_value(<<"session_id">>, RepRecPropsTgt) of
-    true ->
-        % if the records have the same session id,
-        % then we have a valid replication history
-        OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
-        OldHistory = get_value(<<"history">>, RepRecProps, []),
-        {OldSeqNum, OldHistory};
-    false ->
-        SourceHistory = get_value(<<"history">>, RepRecProps, []),
-        TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
-        twig:log(notice,"Replication records differ. "
-                "Scanning histories to find a common ancestor.", []),
-        twig:log(debug,"Record on source:~p~nRecord on target:~p~n",
-                [RepRecProps, RepRecPropsTgt]),
-        compare_rep_history(SourceHistory, TargetHistory)
-    end.
-
-compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
-    twig:log(notice,"no common ancestry -- performing full replication", []),
-    {?LOWEST_SEQ, []};
-compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
-    SourceId = get_value(<<"session_id">>, S),
-    case has_session_id(SourceId, Target) of
-    true ->
-        RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
-        twig:log(notice,"found a common replication record with source_seq ~p",
-            [RecordSeqNum]),
-        {RecordSeqNum, SourceRest};
-    false ->
-        TargetId = get_value(<<"session_id">>, T),
-        case has_session_id(TargetId, SourceRest) of
-        true ->
-            RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
-            twig:log(notice,"found a common replication record with source_seq ~p",
-                [RecordSeqNum]),
-            {RecordSeqNum, TargetRest};
-        false ->
-            compare_rep_history(SourceRest, TargetRest)
-        end
-    end.
-
-
-has_session_id(_SessionId, []) ->
-    false;
-has_session_id(SessionId, [{Props} | Rest]) ->
-    case get_value(<<"session_id">>, Props, nil) of
-    SessionId ->
-        true;
-    _Else ->
-        has_session_id(SessionId, Rest)
-    end.
-
-
-db_monitor(#db{} = Db) ->
-    couch_db:monitor(Db);
-db_monitor(_HttpDb) ->
-    nil.
-
-
-source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
-    case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
-    {ok, Info} ->
-        get_value(<<"update_seq">>, Info, Seq);
-    _ ->
-        Seq
-    end;
-source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
-    {ok, Info} = couch_replicator_api_wrap:get_db_info(Db),
-    get_value(<<"update_seq">>, Info, Seq).
-
-
-update_task(State) ->
-    #rep_state{
-        current_through_seq = {_, CurSeq},
-        source_seq = SourceCurSeq
-    } = State,
-    couch_task_status:update(
-        rep_stats(State) ++ [
-        {source_seq, SourceCurSeq},
-        case {unpack_seq(CurSeq), unpack_seq(SourceCurSeq)} of
-            {_, 0} ->
-                {progress, 0};
-            {CurSeq1, SourceCurSeq1} ->
-                {progress, (CurSeq1 * 100) div SourceCurSeq1}
-        end
-    ]).
-
-
-rep_stats(State) ->
-    #rep_state{
-        committed_seq = {_, CommittedSeq},
-        stats = Stats,
-        source_seq = SourceCurSeq
-    } = State,
-    [
-        {revisions_checked, Stats#rep_stats.missing_checked},
-        {missing_revisions_found, Stats#rep_stats.missing_found},
-        {docs_read, Stats#rep_stats.docs_read},
-        {docs_written, Stats#rep_stats.docs_written},
-        {changes_pending, pending(SourceCurSeq, CommittedSeq)},
-        {doc_write_failures, Stats#rep_stats.doc_write_failures},
-        {checkpointed_source_seq, CommittedSeq}
-    ].
-
-pending(SourceCurSeq, CommittedSeq) ->
-    unpack_seq(SourceCurSeq) - unpack_seq(CommittedSeq).
-
-unpack_seq(Seq) when is_number(Seq) ->
-    Seq;
-unpack_seq([SeqNum, _]) ->
-    SeqNum;
-unpack_seq(Seq) when is_binary(Seq) ->
-    Pattern = "^\\[?(?<seqnum>[0-9]+)",
-    Options = [{capture, [seqnum], list}],
-    {match, [SeqNum]} = re:run(Seq, Pattern, Options),
-    list_to_integer(SeqNum).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
deleted file mode 100644
index 018aa4b..0000000
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ /dev/null
@@ -1,30 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_ID_VERSION, 3).
-
--record(rep, {
-    id,
-    source,
-    target,
-    options,
-    user_ctx,
-    doc_id
-}).
-
--record(rep_stats, {
-    missing_checked = 0,
-    missing_found = 0,
-    docs_read = 0,
-    docs_written = 0,
-    doc_write_failures = 0
-}).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
deleted file mode 100644
index d072187..0000000
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ /dev/null
@@ -1,802 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_api_wrap).
-
-% This module wraps the native erlang API, and allows for performing
-% operations on a remote vs. local databases via the same API.
-%
-% Notes:
-% Many options and apis aren't yet supported here, they are added as needed.
-
--include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
-
--export([
-    db_open/2,
-    db_open/3,
-    db_close/1,
-    get_db_info/1,
-    update_doc/3,
-    update_doc/4,
-    update_docs/3,
-    update_docs/4,
-    ensure_full_commit/1,
-    get_missing_revs/2,
-    open_doc/3,
-    open_doc_revs/6,
-    changes_since/5,
-    db_uri/1
-    ]).
-
--import(couch_replicator_httpc, [
-    send_req/3
-    ]).
-
--import(couch_util, [
-    encode_doc_id/1,
-    get_value/2,
-    get_value/3
-    ]).
-
-
-db_uri(#httpdb{url = Url}) ->
-    couch_util:url_strip_password(Url);
-
-db_uri(#db{name = Name}) ->
-    db_uri(Name);
-
-db_uri(DbName) ->
-    ?b2l(DbName).
-
-
-db_open(Db, Options) ->
-    db_open(Db, Options, false).
-
-db_open(#httpdb{} = Db1, _Options, Create) ->
-    {ok, Db} = couch_replicator_httpc:setup(Db1),
-    case Create of
-    false ->
-        ok;
-    true ->
-        send_req(Db, [{method, put}], fun(_, _, _) -> ok end)
-    end,
-    send_req(Db, [{method, head}],
-        fun(200, _, _) ->
-            {ok, Db};
-        (401, _, _) ->
-            throw({unauthorized, ?l2b(db_uri(Db))});
-        (_, _, _) ->
-            throw({db_not_found, ?l2b(db_uri(Db))})
-        end);
-db_open(DbName, Options, Create) ->
-    try
-        case Create of
-        false ->
-            ok;
-        true ->
-            ok = couch_httpd:verify_is_server_admin(
-                get_value(user_ctx, Options)),
-            couch_db:create(DbName, Options)
-        end,
-        case couch_db:open(DbName, Options) of
-        {error, illegal_database_name, _} ->
-            throw({db_not_found, DbName});
-        {not_found, _Reason} ->
-            throw({db_not_found, DbName});
-        {ok, _Db} = Success ->
-            Success
-        end
-    catch
-    throw:{unauthorized, _} ->
-        throw({unauthorized, DbName})
-    end.
-
-db_close(#httpdb{httpc_pool = Pool}) ->
-    unlink(Pool),
-    ok = couch_replicator_httpc_pool:stop(Pool);
-db_close(DbName) ->
-    catch couch_db:close(DbName).
-
-
-get_db_info(#httpdb{} = Db) ->
-    send_req(Db, [],
-        fun(200, _, {Props}) ->
-            {ok, Props}
-        end);
-get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
-    {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
-    {ok, Info} = couch_db:get_db_info(Db),
-    couch_db:close(Db),
-    {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
-
-
-ensure_full_commit(#httpdb{} = Db) ->
-    send_req(
-        Db,
-        [{method, post}, {path, "_ensure_full_commit"},
-            {headers, [{"Content-Type", "application/json"}]}],
-        fun(201, _, {Props}) ->
-            {ok, get_value(<<"instance_start_time">>, Props)};
-        (_, _, {Props}) ->
-            {error, get_value(<<"error">>, Props)}
-        end);
-ensure_full_commit(Db) ->
-    couch_db:ensure_full_commit(Db).
-
-
-get_missing_revs(#httpdb{} = Db, IdRevs) ->
-    JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]},
-    send_req(
-        Db,
-        [{method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)},
-            {headers, [{"Content-Type", "application/json"}]}],
-        fun(200, _, {Props}) ->
-            ConvertToNativeFun = fun({Id, {Result}}) ->
-                MissingRevs = couch_doc:parse_revs(
-                    get_value(<<"missing">>, Result)
-                ),
-                PossibleAncestors = couch_doc:parse_revs(
-                    get_value(<<"possible_ancestors">>, Result, [])
-                ),
-                {Id, MissingRevs, PossibleAncestors}
-            end,
-            {ok, lists:map(ConvertToNativeFun, Props)}
-        end);
-get_missing_revs(Db, IdRevs) ->
-    couch_db:get_missing_revs(Db, IdRevs).
-
-
-
-open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
-    Path = encode_doc_id(Id),
-    QArgs = options_to_query_args(
-        HttpDb, Path, [revs, {open_revs, Revs} | Options]),
-    Self = self(),
-    Streamer = spawn_link(fun() ->
-            send_req(
-                HttpDb,
-                [{path, Path}, {qs, QArgs},
-                    {ibrowse_options, [{stream_to, {self(), once}}]},
-                    {headers, [{"Accept", "multipart/mixed"}]}],
-                fun(200, Headers, StreamDataFun) ->
-                    remote_open_doc_revs_streamer_start(Self),
-                    {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
-                        get_value("Content-Type", Headers),
-                        StreamDataFun,
-                        fun mp_parse_mixed/1)
-                end),
-            unlink(Self)
-        end),
-    % If this process dies normally we can leave
-    % the Streamer process hanging around keeping an
-    % HTTP connection open. This is a bit of a
-    % hammer approach to making sure it releases
-    % that connection back to the pool.
-    spawn(fun() ->
-        Ref = erlang:monitor(process, Self),
-        receive
-            {'DOWN', Ref, process, Self, normal} ->
-                exit(Streamer, {streamer_parent_died, Self});
-            {'DOWN', Ref, process, Self, _} ->
-                ok
-            end
-    end),
-    receive
-    {started_open_doc_revs, Ref} ->
-        receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc)
-    end;
-open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
-    {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
-    {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
-
-
-open_doc(#httpdb{} = Db, Id, Options) ->
-    send_req(
-        Db,
-        [{path, encode_doc_id(Id)}, {qs, options_to_query_args(Options, [])}],
-        fun(200, _, Body) ->
-            {ok, couch_doc:from_json_obj(Body)};
-        (_, _, {Props}) ->
-            {error, get_value(<<"error">>, Props)}
-        end);
-open_doc(Db, Id, Options) ->
-    case couch_db:open_doc(Db, Id, Options) of
-    {ok, _} = Ok ->
-        Ok;
-    {not_found, _Reason} ->
-        {error, <<"not_found">>}
-    end.
-
-
-update_doc(Db, Doc, Options) ->
-    update_doc(Db, Doc, Options, interactive_edit).
-
-update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
-    QArgs = case Type of
-    replicated_changes ->
-        [{"new_edits", "false"}];
-    _ ->
-        []
-    end ++ options_to_query_args(Options, []),
-    Boundary = couch_uuids:random(),
-    JsonBytes = ?JSON_ENCODE(
-        couch_doc:to_json_obj(
-          Doc, [revs, attachments, follows, att_encoding_info | Options])),
-    {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
-        JsonBytes, Doc#doc.atts, true),
-    Headers = case lists:member(delay_commit, Options) of
-    true ->
-        [{"X-Couch-Full-Commit", "false"}];
-    false ->
-        []
-    end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
-    Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
-    send_req(
-        HttpDb,
-        [{method, put}, {path, encode_doc_id(DocId)},
-            {qs, QArgs}, {headers, Headers}, {body, Body}],
-        fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 orelse Code =:= 202 ->
-                {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
-            (409, _, _) ->
-                throw(conflict);
-            (Code, _, {Props}) ->
-                case {Code, get_value(<<"error">>, Props)} of
-                {401, <<"unauthorized">>} ->
-                    throw({unauthorized, get_value(<<"reason">>, Props)});
-                {403, <<"forbidden">>} ->
-                    throw({forbidden, get_value(<<"reason">>, Props)});
-                {412, <<"missing_stub">>} ->
-                    throw({missing_stub, get_value(<<"reason">>, Props)});
-                {_, Error} ->
-                    {error, Error}
-                end
-        end);
-update_doc(Db, Doc, Options, Type) ->
-    couch_db:update_doc(Db, Doc, Options, Type).
-
-
-update_docs(Db, DocList, Options) ->
-    update_docs(Db, DocList, Options, interactive_edit).
-
-update_docs(_Db, [], _Options, _UpdateType) ->
-    {ok, []};
-update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
-    FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
-    Prefix = case UpdateType of
-    replicated_changes ->
-        <<"{\"new_edits\":false,\"docs\":[">>;
-    interactive_edit ->
-        <<"{\"docs\":[">>
-    end,
-    Suffix = <<"]}">>,
-    % Note: nginx and other servers don't like PUT/POST requests without
-    % a Content-Length header, so we can't do a chunked transfer encoding
-    % and JSON encode each doc only before sending it through the socket.
-    {Docs, Len} = lists:mapfoldl(
-        fun(#doc{} = Doc, Acc) ->
-            Json = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
-            {Json, Acc + iolist_size(Json)};
-        (Doc, Acc) ->
-            {Doc, Acc + iolist_size(Doc)}
-        end,
-        byte_size(Prefix) + byte_size(Suffix) + length(DocList) - 1,
-        DocList),
-    BodyFun = fun(eof) ->
-            eof;
-        ([]) ->
-            {ok, Suffix, eof};
-        ([prefix | Rest]) ->
-            {ok, Prefix, Rest};
-        ([Doc]) ->
-            {ok, Doc, []};
-        ([Doc | RestDocs]) ->
-            {ok, [Doc, ","], RestDocs}
-    end,
-    Headers = [
-        {"Content-Length", Len},
-        {"Content-Type", "application/json"},
-        {"X-Couch-Full-Commit", FullCommit}
-    ],
-    send_req(
-        HttpDb,
-        [{method, post}, {path, "_bulk_docs"},
-            {body, {BodyFun, [prefix | Docs]}}, {headers, Headers}],
-        fun(201, _, Results) when is_list(Results) ->
-                {ok, bulk_results_to_errors(DocList, Results, remote)};
-           (417, _, Results) when is_list(Results) ->
-                {ok, bulk_results_to_errors(DocList, Results, remote)}
-        end);
-update_docs(Db, DocList, Options, UpdateType) ->
-    Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
-    {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
-
-
-changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
-              Style, StartSeq, UserFun, Options) ->
-    Timeout = erlang:max(1000, InactiveTimeout - 5000),
-    BaseQArgs = case get_value(continuous, Options, false) of
-    false ->
-        [{"feed", "normal"}];
-    true ->
-        [{"feed", "continuous"}]
-    end ++ [
-        {"style", atom_to_list(Style)}, {"since", ?JSON_ENCODE(StartSeq)},
-        {"timeout", integer_to_list(Timeout)}
-           ],
-    DocIds = get_value(doc_ids, Options),
-    {QArgs, Method, Body, Headers} = case DocIds of
-    undefined ->
-        QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
-        {QArgs1, get, [], Headers1};
-    _ when is_list(DocIds) ->
-        Headers2 = [{"Content-Type", "application/json"} | Headers1],
-        JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
-        {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
-    end,
-    send_req(
-        HttpDb,
-        [{method, Method}, {path, "_changes"}, {qs, QArgs},
-            {headers, Headers}, {body, Body},
-            {ibrowse_options, [{stream_to, {self(), once}}]}],
-        fun(200, _, DataStreamFun) ->
-                parse_changes_feed(Options, UserFun, DataStreamFun);
-            (405, _, _) when is_list(DocIds) ->
-                % CouchDB versions < 1.1.0 don't have the builtin _changes feed
-                % filter "_doc_ids" neither support POST
-                send_req(HttpDb, [{method, get}, {path, "_changes"},
-                    {qs, BaseQArgs}, {headers, Headers1},
-                    {ibrowse_options, [{stream_to, {self(), once}}]}],
-                    fun(200, _, DataStreamFun2) ->
-                        UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
-                            case lists:member(Id, DocIds) of
-                            true ->
-                                UserFun(DocInfo);
-                            false ->
-                                ok
-                            end
-                        end,
-                        parse_changes_feed(Options, UserFun2, DataStreamFun2)
-                    end)
-        end);
-changes_since(Db, Style, StartSeq, UserFun, Options) ->
-    Filter = case get_value(doc_ids, Options) of
-    undefined ->
-        ?b2l(get_value(filter, Options, <<>>));
-    _DocIds ->
-        "_doc_ids"
-    end,
-    Args = #changes_args{
-        style = Style,
-        since = StartSeq,
-        filter = Filter,
-        feed = case get_value(continuous, Options, false) of
-            true ->
-                "continuous";
-            false ->
-                "normal"
-        end,
-        timeout = infinity
-    },
-    QueryParams = get_value(query_params, Options, {[]}),
-    Req = changes_json_req(Db, Filter, QueryParams, Options),
-    ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db),
-    ChangesFeedFun(fun({change, Change, _}, _) ->
-            UserFun(json_to_doc_info(Change));
-        (_, _) ->
-            ok
-    end).
-
-
-% internal functions
-
-maybe_add_changes_filter_q_args(BaseQS, Options) ->
-    case get_value(filter, Options) of
-    undefined ->
-        BaseQS;
-    FilterName ->
-        {Params} = get_value(query_params, Options, {[]}),
-        [{"filter", ?b2l(FilterName)} | lists:foldl(
-            fun({K, V}, QSAcc) ->
-                Ks = couch_util:to_list(K),
-                case lists:keymember(Ks, 1, QSAcc) of
-                true ->
-                    QSAcc;
-                false ->
-                    [{Ks, couch_util:to_list(V)} | QSAcc]
-                end
-            end,
-            BaseQS, Params)]
-    end.
-
-parse_changes_feed(Options, UserFun, DataStreamFun) ->
-    case get_value(continuous, Options, false) of
-    true ->
-        continuous_changes(DataStreamFun, UserFun);
-    false ->
-        EventFun = fun(Ev) ->
-            changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, [])
-        end,
-        json_stream_parse:events(DataStreamFun, EventFun)
-    end.
-
-changes_json_req(_Db, "", _QueryParams, _Options) ->
-    {[]};
-changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
-    {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
-changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
-    {ok, Info} = couch_db:get_db_info(Db),
-    % simulate a request to db_name/_changes
-    {[
-        {<<"info">>, {Info}},
-        {<<"id">>, null},
-        {<<"method">>, 'GET'},
-        {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
-        {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}},
-        {<<"headers">>, []},
-        {<<"body">>, []},
-        {<<"peer">>, <<"replicator">>},
-        {<<"form">>, []},
-        {<<"cookie">>, []},
-        {<<"userCtx">>, couch_util:json_user_ctx(Db)}
-    ]}.
-
-
-options_to_query_args(HttpDb, Path, Options) ->
-    case lists:keytake(atts_since, 1, Options) of
-    false ->
-        options_to_query_args(Options, []);
-    {value, {atts_since, []}, Options2} ->
-        options_to_query_args(Options2, []);
-    {value, {atts_since, PAs}, Options2} ->
-        QueryArgs1 = options_to_query_args(Options2, []),
-        FullUrl = couch_replicator_httpc:full_url(
-            HttpDb, [{path, Path}, {qs, QueryArgs1}]),
-        RevList = atts_since_arg(
-            length("GET " ++ FullUrl ++ " HTTP/1.1\r\n") +
-            length("&atts_since=") + 6,  % +6 = % encoded [ and ]
-            PAs, []),
-        [{"atts_since", ?JSON_ENCODE(RevList)} | QueryArgs1]
-    end.
-
-
-options_to_query_args([], Acc) ->
-    lists:reverse(Acc);
-options_to_query_args([ejson_body | Rest], Acc) ->
-    options_to_query_args(Rest, Acc);
-options_to_query_args([delay_commit | Rest], Acc) ->
-    options_to_query_args(Rest, Acc);
-options_to_query_args([revs | Rest], Acc) ->
-    options_to_query_args(Rest, [{"revs", "true"} | Acc]);
-options_to_query_args([{open_revs, all} | Rest], Acc) ->
-    options_to_query_args(Rest, [{"open_revs", "all"} | Acc]);
-options_to_query_args([latest | Rest], Acc) ->
-    options_to_query_args(Rest, [{"latest", "true"} | Acc]);
-options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
-    JsonRevs = ?b2l(iolist_to_binary(?JSON_ENCODE(couch_doc:revs_to_strs(Revs)))),
-    options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).
-
-
--define(MAX_URL_LEN, 7000).
-
-atts_since_arg(_UrlLen, [], Acc) ->
-    lists:reverse(Acc);
-atts_since_arg(UrlLen, [PA | Rest], Acc) ->
-    RevStr = couch_doc:rev_to_str(PA),
-    NewUrlLen = case Rest of
-    [] ->
-        % plus 2 double quotes (% encoded)
-        UrlLen + size(RevStr) + 6;
-    _ ->
-        % plus 2 double quotes and a comma (% encoded)
-        UrlLen + size(RevStr) + 9
-    end,
-    case NewUrlLen >= ?MAX_URL_LEN of
-    true ->
-        lists:reverse(Acc);
-    false ->
-        atts_since_arg(NewUrlLen, Rest, [RevStr | Acc])
-    end.
-
-
-% TODO: A less verbose, more elegant and automatic restart strategy for
-%       the exported open_doc_revs/6 function. The restart should be
-%       transparent to the caller like any other Couch API function exported
-%       by this module.
-receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) ->
-    try
-        % Left only for debugging purposes via an interactive or remote shell
-        erlang:put(open_doc_revs, {Id, Revs, Ref, Streamer}),
-        receive_docs(Streamer, Fun, Ref, Acc)
-    catch
-    error:{restart_open_doc_revs, NewRef} ->
-        receive_docs_loop(Streamer, Fun, Id, Revs, NewRef, Acc)
-    end.
-
-receive_docs(Streamer, UserFun, Ref, UserAcc) ->
-    Streamer ! {get_headers, Ref, self()},
-    receive
-    {started_open_doc_revs, NewRef} ->
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {headers, Ref, Headers} ->
-        case get_value("content-type", Headers) of
-        {"multipart/related", _} = ContentType ->
-            case couch_doc:doc_from_multi_part_stream(
-                ContentType,
-                fun() -> receive_doc_data(Streamer, Ref) end,
-                Ref) of
-            {ok, Doc, WaitFun, Parser} ->
-                case UserFun({ok, Doc}, UserAcc) of
-                {ok, UserAcc2} ->
-                    ok;
-                {skip, UserAcc2} ->
-                    couch_doc:abort_multi_part_stream(Parser)
-                end,
-                WaitFun(),
-                receive_docs(Streamer, UserFun, Ref, UserAcc2)
-            end;
-        {"application/json", []} ->
-            Doc = couch_doc:from_json_obj(
-                    ?JSON_DECODE(receive_all(Streamer, Ref, []))),
-            {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
-            receive_docs(Streamer, UserFun, Ref, UserAcc2);
-        {"application/json", [{"error","true"}]} ->
-            {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
-            Rev = get_value(<<"missing">>, ErrorProps),
-            Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
-            {_, UserAcc2} = UserFun(Result, UserAcc),
-            receive_docs(Streamer, UserFun, Ref, UserAcc2)
-        end;
-    {done, Ref} ->
-        {ok, UserAcc}
-    end.
-
-
-restart_remote_open_doc_revs(Ref, NewRef) ->
-    receive
-    {body_bytes, Ref, _} ->
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {body_done, Ref} ->
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {done, Ref} ->
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {headers, Ref, _} ->
-        restart_remote_open_doc_revs(Ref, NewRef)
-    after 0 ->
-        erlang:error({restart_open_doc_revs, NewRef})
-    end.
-
-
-remote_open_doc_revs_streamer_start(Parent) ->
-    receive
-    {get_headers, _Ref, Parent} ->
-        remote_open_doc_revs_streamer_start(Parent);
-    {next_bytes, _Ref, Parent} ->
-        remote_open_doc_revs_streamer_start(Parent)
-    after 0 ->
-        Parent ! {started_open_doc_revs, make_ref()}
-    end.
-
-
-receive_all(Streamer, Ref, Acc) ->
-    Streamer ! {next_bytes, Ref, self()},
-    receive
-    {started_open_doc_revs, NewRef} ->
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {body_bytes, Ref, Bytes} ->
-        receive_all(Streamer, Ref, [Bytes | Acc]);
-    {body_done, Ref} ->
-        lists:reverse(Acc)
-    end.
-
-
-mp_parse_mixed(eof) ->
-    receive {get_headers, Ref, From} ->
-        From ! {done, Ref}
-    end;
-mp_parse_mixed({headers, H}) ->
-    receive {get_headers, Ref, From} ->
-        From ! {headers, Ref, H}
-    end,
-    fun mp_parse_mixed/1;
-mp_parse_mixed({body, Bytes}) ->
-    receive {next_bytes, Ref, From} ->
-        From ! {body_bytes, Ref, Bytes}
-    end,
-    fun mp_parse_mixed/1;
-mp_parse_mixed(body_end) ->
-    receive {next_bytes, Ref, From} ->
-        From ! {body_done, Ref};
-    {get_headers, Ref, From} ->
-        self() ! {get_headers, Ref, From}
-    end,
-    fun mp_parse_mixed/1.
-
-
-receive_doc_data(Streamer, Ref) ->
-    Streamer ! {next_bytes, Ref, self()},
-    receive
-    {body_bytes, Ref, Bytes} ->
-        {Bytes, fun() -> receive_doc_data(Streamer, Ref) end};
-    {body_done, Ref} ->
-        {<<>>, fun() -> receive_doc_data(Streamer, Ref) end}
-    end.
-
-doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
-    Self = self(),
-    Parser = spawn_link(fun() ->
-        {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
-            ContentType, DataFun,
-            fun(Next) -> couch_replicator_utils:mp_parse_doc(Next, []) end),
-        unlink(Self)
-        end),
-    Parser ! {get_doc_bytes, Ref, self()},
-    receive
-    {started_open_doc_revs, NewRef} ->
-        unlink(Parser),
-        exit(Parser, kill),
-        restart_remote_open_doc_revs(Ref, NewRef);
-    {doc_bytes, Ref, DocBytes} ->
-        Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
-        ReadAttachmentDataFun = fun() ->
-            Parser ! {get_bytes, Ref, self()},
-            receive
-            {started_open_doc_revs, NewRef} ->
-                unlink(Parser),
-                exit(Parser, kill),
-                receive {bytes, Ref, _} -> ok after 0 -> ok end,
-                restart_remote_open_doc_revs(Ref, NewRef);
-            {bytes, Ref, Bytes} ->
-                Bytes
-            end
-        end,
-        Atts2 = lists:map(
-            fun(#att{data = follows} = A) ->
-                A#att{data = ReadAttachmentDataFun};
-            (A) ->
-                A
-            end, Doc#doc.atts),
-        {ok, Doc#doc{atts = Atts2}, Parser}
-    end.
-
-
-changes_ev1(object_start, UserFun, UserAcc) ->
-    fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
-
-changes_ev2({key, <<"results">>}, UserFun, UserAcc) ->
-    fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end;
-changes_ev2(_, UserFun, UserAcc) ->
-    fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
-
-changes_ev3(array_start, UserFun, UserAcc) ->
-    fun(Ev) -> changes_ev_loop(Ev, UserFun, UserAcc) end.
-
-changes_ev_loop(object_start, UserFun, UserAcc) ->
-    fun(Ev) ->
-        json_stream_parse:collect_object(Ev,
-            fun(Obj) ->
-                UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc),
-                fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end
-            end)
-    end;
-changes_ev_loop(array_end, _UserFun, _UserAcc) ->
-    fun(_Ev) -> changes_ev_done() end.
-
-changes_ev_done() ->
-    fun(_Ev) -> changes_ev_done() end.
-
-continuous_changes(DataFun, UserFun) ->
-    {DataFun2, _, Rest} = json_stream_parse:events(
-        DataFun,
-        fun(Ev) -> parse_changes_line(Ev, UserFun) end),
-    continuous_changes(fun() -> {Rest, DataFun2} end, UserFun).
-
-parse_changes_line(object_start, UserFun) ->
-    fun(Ev) ->
-        json_stream_parse:collect_object(Ev,
-            fun(Obj) -> UserFun(json_to_doc_info(Obj)) end)
-    end.
-
-json_to_doc_info({Props}) ->
-    case get_value(<<"changes">>, Props) of
-    undefined ->
-        {last_seq, get_value(<<"last_seq">>, Props)};
-    Changes ->
-        RevsInfo = lists:map(
-            fun({Change}) ->
-                Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)),
-                Del = couch_replicator_utils:is_deleted(Change),
-                #rev_info{rev=Rev, deleted=Del}
-            end, Changes),
-        #doc_info{
-            id = get_value(<<"id">>, Props),
-            high_seq = get_value(<<"seq">>, Props),
-            revs = RevsInfo
-        }
-    end.
-
-
-bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
-    lists:reverse(lists:foldl(
-        fun({_, {ok, _}}, Acc) ->
-            Acc;
-        ({#doc{id = Id, revs = {Pos, [RevId | _]}}, Error}, Acc) ->
-            {_, Error, Reason} = couch_httpd:error_info(Error),
-            [ {[{id, Id}, {rev, rev_to_str({Pos, RevId})},
-                {error, Error}, {reason, Reason}]} | Acc ]
-        end,
-        [], lists:zip(Docs, Results)));
-
-bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) ->
-    bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit);
-
-bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) ->
-    lists:map(
-        fun({{Id, Rev}, Err}) ->
-            {_, Error, Reason} = couch_httpd:error_info(Err),
-            {[{id, Id}, {rev, rev_to_str(Rev)}, {error, Error}, {reason, Reason}]}
-        end,
-        Results);
-
-bulk_results_to_errors(_Docs, Results, remote) ->
-    lists:reverse(lists:foldl(
-        fun({Props}, Acc) ->
-            case get_value(<<"error">>, Props, get_value(error, Props)) of
-            undefined ->
-                Acc;
-            Error ->
-                Id = get_value(<<"id">>, Props, get_value(id, Props)),
-                Rev = get_value(<<"rev">>, Props, get_value(rev, Props)),
-                Reason = get_value(<<"reason">>, Props, get_value(reason, Props)),
-                [ {[{id, Id}, {rev, rev_to_str(Rev)},
-                    {error, Error}, {reason, Reason}]} | Acc ]
-            end
-        end,
-        [], Results)).
-
-
-rev_to_str({_Pos, _Id} = Rev) ->
-    couch_doc:rev_to_str(Rev);
-rev_to_str(Rev) ->
-    Rev.
-
-
-stream_doc({JsonBytes, Atts, Boundary, Len}) ->
-    case erlang:erase({doc_streamer, Boundary}) of
-    Pid when is_pid(Pid) ->
-        unlink(Pid),
-        exit(Pid, kill);
-    _ ->
-        ok
-    end,
-    Self = self(),
-    DocStreamer = spawn_link(fun() ->
-        couch_doc:doc_to_multi_part_stream(
-            Boundary, JsonBytes, Atts,
-            fun(Data) ->
-                receive {get_data, Ref, From} ->
-                    From ! {data, Ref, Data}
-                end
-            end, true),
-        unlink(Self)
-    end),
-    erlang:put({doc_streamer, Boundary}, DocStreamer),
-    {ok, <<>>, {Len, Boundary}};
-stream_doc({0, Id}) ->
-    erlang:erase({doc_streamer, Id}),
-    eof;
-stream_doc({LenLeft, Id}) when LenLeft > 0 ->
-    Ref = make_ref(),
-    erlang:get({doc_streamer, Id}) ! {get_data, Ref, self()},
-    receive {data, Ref, Data} ->
-        {ok, Data, {LenLeft - iolist_size(Data), Id}}
-    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_api_wrap.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/src/couch_replicator_api_wrap.hrl
deleted file mode 100644
index 1a6f27a..0000000
--- a/src/couch_replicator/src/couch_replicator_api_wrap.hrl
+++ /dev/null
@@ -1,36 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-
-
--record(httpdb, {
-    url,
-    oauth = nil,
-    headers = [
-        {"Accept", "application/json"},
-        {"User-Agent", "CouchDB/" ++ couch_server:get_version()}
-    ],
-    timeout,            % milliseconds
-    ibrowse_options = [],
-    retries = 10,
-    wait = 250,         % milliseconds
-    httpc_pool = nil,
-    http_connections
-}).
-
--record(oauth, {
-    consumer_key,
-    token,
-    token_secret,
-    consumer_secret,
-    signature_method
-}).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/550e8202/src/couch_replicator/src/couch_replicator_app.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_app.erl b/src/couch_replicator/src/couch_replicator_app.erl
deleted file mode 100644
index e4dc63e..0000000
--- a/src/couch_replicator/src/couch_replicator_app.erl
+++ /dev/null
@@ -1,17 +0,0 @@
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_app).
--behaviour(application).
--export([start/2, stop/1]).
-
-start(_Type, []) ->
-    couch_replicator_sup:start_link().
-
-stop([]) ->
-    ok.


Mime
View raw message