couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From beno...@apache.org
Subject [38/57] [abbrv] [partial] inital move to rebar compilation
Date Tue, 07 Jan 2014 00:36:58 GMT
http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator.erl b/apps/couch_replicator/src/couch_replicator.erl
new file mode 100644
index 0000000..d470c8a
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator.erl
@@ -0,0 +1,965 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator).
+-behaviour(gen_server).
+
+% public API
+-export([replicate/1]).
+
+% meant to be used only by the replicator database listener
+-export([async_replicate/1]).
+-export([cancel_replication/1]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3,
+    to_binary/1
+]).
+
+-import(couch_replicator_utils, [
+    start_db_compaction_notifier/2,
+    stop_db_compaction_notifier/1
+]).
+
+-record(rep_state, {
+    rep_details,
+    source_name,
+    target_name,
+    source,
+    target,
+    history,
+    checkpoint_history,
+    start_seq,
+    committed_seq,
+    current_through_seq,
+    seqs_in_progress = [],
+    highest_seq_done = {0, ?LOWEST_SEQ},
+    source_log,
+    target_log,
+    rep_starttime,
+    src_starttime,
+    tgt_starttime,
+    timer, % checkpoint timer
+    changes_queue,
+    changes_manager,
+    changes_reader,
+    workers,
+    stats = #rep_stats{},
+    session_id,
+    source_db_compaction_notifier = nil,
+    target_db_compaction_notifier = nil,
+    source_monitor = nil,
+    target_monitor = nil,
+    source_seq = nil,
+    use_checkpoints = true,
+    checkpoint_interval = 5000
+}).
+
+
+replicate(#rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep) ->
+    case get_value(cancel, Options, false) of
+    true ->
+        case get_value(id, Options, nil) of
+        nil ->
+            cancel_replication(RepId);
+        RepId2 ->
+            cancel_replication(RepId2, UserCtx)
+        end;
+    false ->
+        {ok, Listener} = rep_result_listener(RepId),
+        Result = do_replication_loop(Rep),
+        couch_replicator_notifier:stop(Listener),
+        Result
+    end.
+
+
+do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
+    case async_replicate(Rep) of
+    {ok, _Pid} ->
+        case get_value(continuous, Options, false) of
+        true ->
+            {ok, {continuous, ?l2b(BaseId ++ Ext)}};
+        false ->
+            wait_for_result(Id)
+        end;
+    Error ->
+        Error
+    end.
+
+
+async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
+    RepChildId = BaseId ++ Ext,
+    Source = couch_replicator_api_wrap:db_uri(Src),
+    Target = couch_replicator_api_wrap:db_uri(Tgt),
+    Timeout = get_value(connection_timeout, Rep#rep.options),
+    ChildSpec = {
+        RepChildId,
+        {gen_server, start_link, [?MODULE, Rep, [{timeout, Timeout}]]},
+        temporary,
+        250,
+        worker,
+        [?MODULE]
+    },
+    % All these nested cases to attempt starting/restarting a replication child
+    % are ugly and not 100% race condition free. The following patch submission
+    % is a solution:
+    %
+    % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
+    %
+    case supervisor:start_child(couch_replicator_job_sup, ChildSpec) of
+    {ok, Pid} ->
+        ?LOG_INFO("starting new replication `~s` at ~p (`~s` -> `~s`)",
+            [RepChildId, Pid, Source, Target]),
+        {ok, Pid};
+    {error, already_present} ->
+        case supervisor:restart_child(couch_replicator_job_sup, RepChildId) of
+        {ok, Pid} ->
+            ?LOG_INFO("restarting replication `~s` at ~p (`~s` -> `~s`)",
+                [RepChildId, Pid, Source, Target]),
+            {ok, Pid};
+        {error, running} ->
+            %% this error occurs if multiple replicators are racing
+            %% each other to start and somebody else won. Just grab
+            %% the Pid by calling start_child again.
+            {error, {already_started, Pid}} =
+                supervisor:start_child(couch_replicator_job_sup, ChildSpec),
+            ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
+                [RepChildId, Pid, Source, Target]),
+            {ok, Pid};
+        {error, {'EXIT', {badarg,
+            [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
+            % Clause to deal with a change in the supervisor module introduced
+            % in R14B02. For more details consult the thread at:
+            %     http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
+            _ = supervisor:delete_child(couch_replicator_job_sup, RepChildId),
+            async_replicate(Rep);
+        {error, _} = Error ->
+            Error
+        end;
+    {error, {already_started, Pid}} ->
+        ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
+            [RepChildId, Pid, Source, Target]),
+        {ok, Pid};
+    {error, {Error, _}} ->
+        {error, Error}
+    end.
+
+
+rep_result_listener(RepId) ->
+    ReplyTo = self(),
+    {ok, _Listener} = couch_replicator_notifier:start_link(
+        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+                ReplyTo ! Ev;
+            (_) ->
+                ok
+        end).
+
+
+wait_for_result(RepId) ->
+    receive
+    {finished, RepId, RepResult} ->
+        {ok, RepResult};
+    {error, RepId, Reason} ->
+        {error, Reason}
+    end.
+
+
+cancel_replication({BaseId, Extension}) ->
+    FullRepId = BaseId ++ Extension,
+    ?LOG_INFO("Canceling replication `~s`...", [FullRepId]),
+    case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
+    ok ->
+        ?LOG_INFO("Replication `~s` canceled.", [FullRepId]),
+        case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
+            ok ->
+                {ok, {cancelled, ?l2b(FullRepId)}};
+            {error, not_found} ->
+                {ok, {cancelled, ?l2b(FullRepId)}};
+            Error ->
+                Error
+        end;
+    Error ->
+        ?LOG_ERROR("Error canceling replication `~s`: ~p", [FullRepId, Error]),
+        Error
+    end.
+
+cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
+    case lists:member(<<"_admin">>, Roles) of
+    true ->
+        cancel_replication(RepId);
+    false ->
+        {BaseId, Ext} = RepId,
+        case lists:keysearch(
+            BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
+        {value, {_, Pid, _, _}} when is_pid(Pid) ->
+            case (catch gen_server:call(Pid, get_details, infinity)) of
+            {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
+                cancel_replication(RepId);
+            {ok, _} ->
+                throw({unauthorized,
+                    <<"Can't cancel a replication triggered by another user">>});
+            {'EXIT', {noproc, {gen_server, call, _}}} ->
+                {error, not_found};
+            Error ->
+                throw(Error)
+            end;
+        _ ->
+            {error, not_found}
+        end
+    end.
+
+init(InitArgs) ->
+    try
+        do_init(InitArgs)
+    catch
+    throw:{unauthorized, DbUri} ->
+        {stop, {unauthorized,
+            <<"unauthorized to access or create database ", DbUri/binary>>}};
+    throw:{db_not_found, DbUri} ->
+        {stop, {db_not_found, <<"could not open ", DbUri/binary>>}};
+    throw:Error ->
+        {stop, Error}
+    end.
+
+do_init(#rep{options = Options, id = {BaseId, Ext}} = Rep) ->
+    process_flag(trap_exit, true),
+
+    #rep_state{
+        source = Source,
+        target = Target,
+        source_name = SourceName,
+        target_name = TargetName,
+        start_seq = {_Ts, StartSeq},
+        source_seq = SourceCurSeq,
+        committed_seq = {_, CommittedSeq},
+        checkpoint_interval = CheckpointInterval
+    } = State = init_state(Rep),
+
+    NumWorkers = get_value(worker_processes, Options),
+    BatchSize = get_value(worker_batch_size, Options),
+    {ok, ChangesQueue} = couch_work_queue:new([
+        {max_items, BatchSize * NumWorkers * 2},
+        {max_size, 100 * 1024 * NumWorkers}
+    ]),
+    % This starts the _changes reader process. It adds the changes from
+    % the source db to the ChangesQueue.
+    ChangesReader = spawn_changes_reader(StartSeq, Source, ChangesQueue, Options),
+    % Changes manager - responsible for dequeing batches from the changes queue
+    % and deliver them to the worker processes.
+    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
+    % This starts the worker processes. They ask the changes queue manager for a
+    % a batch of _changes rows to process -> check which revs are missing in the
+    % target, and for the missing ones, it copies them from the source to the target.
+    MaxConns = get_value(http_connections, Options),
+    Workers = lists:map(
+        fun(_) ->
+            {ok, Pid} = couch_replicator_worker:start_link(
+                self(), Source, Target, ChangesManager, MaxConns),
+            Pid
+        end,
+        lists:seq(1, NumWorkers)),
+
+    couch_task_status:add_task([
+        {type, replication},
+        {replication_id, ?l2b(BaseId ++ Ext)},
+        {doc_id, Rep#rep.doc_id},
+        {source, ?l2b(SourceName)},
+        {target, ?l2b(TargetName)},
+        {continuous, get_value(continuous, Options, false)},
+        {revisions_checked, 0},
+        {missing_revisions_found, 0},
+        {docs_read, 0},
+        {docs_written, 0},
+        {doc_write_failures, 0},
+        {source_seq, SourceCurSeq},
+        {checkpointed_source_seq, CommittedSeq},
+        {progress, 0},
+        {checkpoint_interval, CheckpointInterval}
+    ]),
+    couch_task_status:set_update_frequency(1000),
+
+    % Until OTP R14B03:
+    %
+    % Restarting a temporary supervised child implies that the original arguments
+    % (#rep{} record) specified in the MFA component of the supervisor
+    % child spec will always be used whenever the child is restarted.
+    % This implies the same replication performance tunning parameters will
+    % always be used. The solution is to delete the child spec (see
+    % cancel_replication/1) and then start the replication again, but this is
+    % unfortunately not immune to race conditions.
+
+    ?LOG_INFO("Replication `~p` is using:~n"
+        "~c~p worker processes~n"
+        "~ca worker batch size of ~p~n"
+        "~c~p HTTP connections~n"
+        "~ca connection timeout of ~p milliseconds~n"
+        "~c~p retries per request~n"
+        "~csocket options are: ~s~s",
+        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
+            MaxConns, $\t, get_value(connection_timeout, Options),
+            $\t, get_value(retries, Options),
+            $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
+            case StartSeq of
+            ?LOWEST_SEQ ->
+                "";
+            _ ->
+                io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
+            end]),
+
+    ?LOG_DEBUG("Worker pids are: ~p", [Workers]),
+
+    couch_replicator_manager:replication_started(Rep),
+
+    {ok, State#rep_state{
+            changes_queue = ChangesQueue,
+            changes_manager = ChangesManager,
+            changes_reader = ChangesReader,
+            workers = Workers
+        }
+    }.
+
+
+handle_info(shutdown, St) ->
+    {stop, shutdown, St};
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
+    ?LOG_ERROR("Source database is down. Reason: ~p", [Why]),
+    {stop, source_db_down, St};
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
+    ?LOG_ERROR("Target database is down. Reason: ~p", [Why]),
+    {stop, target_db_down, St};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+    ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
+    {stop, changes_reader_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
+    ?LOG_ERROR("ChangesManager process died with reason: ~p", [Reason]),
+    {stop, changes_manager_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+    ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
+    {stop, changes_queue_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
+    case Workers -- [Pid] of
+    Workers ->
+        {stop, {unknown_process_died, Pid, normal}, State};
+    [] ->
+        catch unlink(State#rep_state.changes_manager),
+        catch exit(State#rep_state.changes_manager, kill),
+        do_last_checkpoint(State);
+    Workers2 ->
+        {noreply, State#rep_state{workers = Workers2}}
+    end;
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
+    State2 = cancel_timer(State),
+    case lists:member(Pid, Workers) of
+    false ->
+        {stop, {unknown_process_died, Pid, Reason}, State2};
+    true ->
+        ?LOG_ERROR("Worker ~p died with reason: ~p", [Pid, Reason]),
+        {stop, {worker_died, Pid, Reason}, State2}
+    end.
+
+
+handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
+    {reply, {ok, Rep}, State};
+
+handle_call({add_stats, Stats}, From, State) ->
+    gen_server:reply(From, ok),
+    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
+    {noreply, State#rep_state{stats = NewStats}};
+
+handle_call({report_seq_done, Seq, StatsInc}, From,
+    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
+        current_through_seq = ThroughSeq, stats = Stats} = State) ->
+    gen_server:reply(From, ok),
+    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+    [Seq | Rest] ->
+        {Seq, Rest};
+    [_ | _] ->
+        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+    end,
+    NewHighestDone = lists:max([HighestDone, Seq]),
+    NewThroughSeq = case NewSeqsInProgress of
+    [] ->
+        lists:max([NewThroughSeq0, NewHighestDone]);
+    _ ->
+        NewThroughSeq0
+    end,
+    ?LOG_DEBUG("Worker reported seq ~p, through seq was ~p, "
+        "new through seq is ~p, highest seq done was ~p, "
+        "new highest seq done is ~p~n"
+        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+    SourceCurSeq = source_cur_seq(State),
+    NewState = State#rep_state{
+        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
+        current_through_seq = NewThroughSeq,
+        seqs_in_progress = NewSeqsInProgress,
+        highest_seq_done = NewHighestDone,
+        source_seq = SourceCurSeq
+    },
+    update_task(NewState),
+    {noreply, NewState}.
+
+
+handle_cast({db_compacted, DbName},
+    #rep_state{source = #db{name = DbName} = Source} = State) ->
+    {ok, NewSource} = couch_db:reopen(Source),
+    {noreply, State#rep_state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+    #rep_state{target = #db{name = DbName} = Target} = State) ->
+    {ok, NewTarget} = couch_db:reopen(Target),
+    {noreply, State#rep_state{target = NewTarget}};
+
+handle_cast(checkpoint, State) ->
+    case do_checkpoint(State) of
+    {ok, NewState} ->
+        {noreply, NewState#rep_state{timer = start_timer(State)}};
+    Error ->
+        {stop, Error, State}
+    end;
+
+handle_cast({report_seq, Seq},
+    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
+
+
+code_change(OldVsn, OldState, Extra) when tuple_size(OldState) =:= 30 ->
+    code_change(OldVsn, erlang:append_element(OldState, true), Extra);
+code_change(OldVsn, OldState, Extra) when tuple_size(OldState) =:= 31 ->
+    code_change(OldVsn, erlang:append_element(OldState, 5000), Extra);
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
+    {ok, State}.
+
+
+terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
+    checkpoint_history = CheckpointHistory} = State) ->
+    terminate_cleanup(State),
+    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
+    couch_replicator_manager:replication_completed(Rep, rep_stats(State));
+
+terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
+    % cancelled replication throught ?MODULE:cancel_replication/1
+    couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
+    terminate_cleanup(State);
+
+terminate(Reason, State) ->
+    #rep_state{
+        source_name = Source,
+        target_name = Target,
+        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
+    } = State,
+    ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
+        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
+    terminate_cleanup(State),
+    couch_replicator_notifier:notify({error, RepId, Reason}),
+    couch_replicator_manager:replication_error(Rep, Reason).
+
+
+terminate_cleanup(State) ->
+    update_task(State),
+    stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
+    stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
+    couch_replicator_api_wrap:db_close(State#rep_state.source),
+    couch_replicator_api_wrap:db_close(State#rep_state.target).
+
+
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+    highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
+    {stop, normal, cancel_timer(State)};
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+    highest_seq_done = Seq} = State) ->
+    case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
+    {ok, NewState} ->
+        {stop, normal, cancel_timer(NewState)};
+    Error ->
+        {stop, Error, State}
+    end.
+
+
+start_timer(State) ->
+    After = State#rep_state.checkpoint_interval,
+    case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
+    {ok, Ref} ->
+        Ref;
+    Error ->
+        ?LOG_ERROR("Replicator, error scheduling checkpoint:  ~p", [Error]),
+        nil
+    end.
+
+
+cancel_timer(#rep_state{timer = nil} = State) ->
+    State;
+cancel_timer(#rep_state{timer = Timer} = State) ->
+    {ok, cancel} = timer:cancel(Timer),
+    State#rep_state{timer = nil}.
+
+
+init_state(Rep) ->
+    #rep{
+        source = Src, target = Tgt,
+        options = Options, user_ctx = UserCtx
+    } = Rep,
+    {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
+    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
+        get_value(create_target, Options, false)),
+
+    {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
+    {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
+
+    [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
+
+    {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+    StartSeq1 = get_value(since_seq, Options, StartSeq0),
+    StartSeq = {0, StartSeq1},
+    #doc{body={CheckpointHistory}} = SourceLog,
+    State = #rep_state{
+        rep_details = Rep,
+        source_name = couch_replicator_api_wrap:db_uri(Source),
+        target_name = couch_replicator_api_wrap:db_uri(Target),
+        source = Source,
+        target = Target,
+        history = History,
+        checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
+        start_seq = StartSeq,
+        current_through_seq = StartSeq,
+        committed_seq = StartSeq,
+        source_log = SourceLog,
+        target_log = TargetLog,
+        rep_starttime = couch_util:rfc1123_date(),
+        src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
+        tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+        session_id = couch_uuids:random(),
+        source_db_compaction_notifier =
+            start_db_compaction_notifier(Source, self()),
+        target_db_compaction_notifier =
+            start_db_compaction_notifier(Target, self()),
+        source_monitor = db_monitor(Source),
+        target_monitor = db_monitor(Target),
+        source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
+        use_checkpoints = get_value(use_checkpoints, Options, true),
+        checkpoint_interval = get_value(checkpoint_interval, Options, 5000)
+    },
+    State#rep_state{timer = start_timer(State)}.
+
+
+find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
+    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
+    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+
+
+fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
+    lists:reverse(Acc);
+
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
+    case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
+    {error, <<"not_found">>} when Vsn > 1 ->
+        OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
+        fold_replication_logs(Dbs, Vsn - 1,
+            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
+    {error, <<"not_found">>} ->
+        fold_replication_logs(
+            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
+    {ok, Doc} when LogId =:= NewId ->
+        fold_replication_logs(
+            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
+    {ok, Doc} ->
+        MigratedLog = #doc{id = NewId, body = Doc#doc.body},
+        fold_replication_logs(
+            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
+    end.
+
+
+spawn_changes_reader(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
+    spawn_link(fun() ->
+        put(last_seq, StartSeq),
+        put(retries_left, Db#httpdb.retries),
+        read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
+    end);
+spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
+    spawn_link(fun() ->
+        read_changes(StartSeq, Db, ChangesQueue, Options)
+    end).
+
+read_changes(StartSeq, Db, ChangesQueue, Options) ->
+    try
+        couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
+            fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
+                case Id of
+                <<>> ->
+                    % Previous CouchDB releases had a bug which allowed a doc
+                    % with an empty ID to be inserted into databases. Such doc
+                    % is impossible to GET.
+                    ?LOG_ERROR("Replicator: ignoring document with empty ID in "
+                        "source database `~s` (_changes sequence ~p)",
+                        [couch_replicator_api_wrap:db_uri(Db), Seq]);
+                _ ->
+                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
+                end,
+                put(last_seq, Seq)
+            end, Options),
+        couch_work_queue:close(ChangesQueue)
+    catch exit:{http_request_failed, _, _, _} = Error ->
+        case get(retries_left) of
+        N when N > 0 ->
+            put(retries_left, N - 1),
+            LastSeq = get(last_seq),
+            Db2 = case LastSeq of
+            StartSeq ->
+                ?LOG_INFO("Retrying _changes request to source database ~s"
+                    " with since=~p in ~p seconds",
+                    [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
+                ok = timer:sleep(Db#httpdb.wait),
+                Db#httpdb{wait = 2 * Db#httpdb.wait};
+            _ ->
+                ?LOG_INFO("Retrying _changes request to source database ~s"
+                    " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
+                Db
+            end,
+            read_changes(LastSeq, Db2, ChangesQueue, Options);
+        _ ->
+            exit(Error)
+        end
+    end.
+
+
+spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
+    spawn_link(fun() ->
+        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
+    end).
+
+changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
+    receive
+    {get_changes, From} ->
+        case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
+        closed ->
+            From ! {closed, self()};
+        {ok, Changes} ->
+            #doc_info{high_seq = Seq} = lists:last(Changes),
+            ReportSeq = {Ts, Seq},
+            ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
+            From ! {changes, self(), Changes, ReportSeq}
+        end,
+        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+    end.
+
+
+do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
+    NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
+    {ok, NewState};
+do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
+    SourceCurSeq = source_cur_seq(State),
+    NewState = State#rep_state{source_seq = SourceCurSeq},
+    update_task(NewState),
+    {ok, NewState};
+do_checkpoint(State) ->
+    #rep_state{
+        source_name=SourceName,
+        target_name=TargetName,
+        source = Source,
+        target = Target,
+        history = OldHistory,
+        start_seq = {_, StartSeq},
+        current_through_seq = {_Ts, NewSeq} = NewTsSeq,
+        source_log = SourceLog,
+        target_log = TargetLog,
+        rep_starttime = ReplicationStartTime,
+        src_starttime = SrcInstanceStartTime,
+        tgt_starttime = TgtInstanceStartTime,
+        stats = Stats,
+        rep_details = #rep{options = Options},
+        session_id = SessionId
+    } = State,
+    case commit_to_both(Source, Target) of
+    {source_error, Reason} ->
+         {checkpoint_commit_failure,
+             <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
+    {target_error, Reason} ->
+         {checkpoint_commit_failure,
+             <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
+    {SrcInstanceStartTime, TgtInstanceStartTime} ->
+        ?LOG_INFO("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
+            [SourceName, TargetName, NewSeq]),
+        StartTime = ?l2b(ReplicationStartTime),
+        EndTime = ?l2b(couch_util:rfc1123_date()),
+        NewHistoryEntry = {[
+            {<<"session_id">>, SessionId},
+            {<<"start_time">>, StartTime},
+            {<<"end_time">>, EndTime},
+            {<<"start_last_seq">>, StartSeq},
+            {<<"end_last_seq">>, NewSeq},
+            {<<"recorded_seq">>, NewSeq},
+            {<<"missing_checked">>, Stats#rep_stats.missing_checked},
+            {<<"missing_found">>, Stats#rep_stats.missing_found},
+            {<<"docs_read">>, Stats#rep_stats.docs_read},
+            {<<"docs_written">>, Stats#rep_stats.docs_written},
+            {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+        ]},
+        BaseHistory = [
+            {<<"session_id">>, SessionId},
+            {<<"source_last_seq">>, NewSeq},
+            {<<"replication_id_version">>, ?REP_ID_VERSION}
+        ] ++ case get_value(doc_ids, Options) of
+        undefined ->
+            [];
+        _DocIds ->
+            % backwards compatibility with the result of a replication by
+            % doc IDs in versions 0.11.x and 1.0.x
+            % TODO: deprecate (use same history format, simplify code)
+            [
+                {<<"start_time">>, StartTime},
+                {<<"end_time">>, EndTime},
+                {<<"docs_read">>, Stats#rep_stats.docs_read},
+                {<<"docs_written">>, Stats#rep_stats.docs_written},
+                {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+            ]
+        end,
+        % limit history to 50 entries
+        NewRepHistory = {
+            BaseHistory ++
+            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+        },
+
+        try
+            {SrcRevPos, SrcRevId} = update_checkpoint(
+                Source, SourceLog#doc{body = NewRepHistory}, source),
+            {TgtRevPos, TgtRevId} = update_checkpoint(
+                Target, TargetLog#doc{body = NewRepHistory}, target),
+            SourceCurSeq = source_cur_seq(State),
+            NewState = State#rep_state{
+                source_seq = SourceCurSeq,
+                checkpoint_history = NewRepHistory,
+                committed_seq = NewTsSeq,
+                source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+                target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+            },
+            update_task(NewState),
+            {ok, NewState}
+        catch throw:{checkpoint_commit_failure, _} = Failure ->
+            Failure
+        end;
+    {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Target database out of sync. "
+            "Try to increase max_dbs_open at the target's server.">>};
+    {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Source database out of sync. "
+            "Try to increase max_dbs_open at the source's server.">>};
+    {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+        {checkpoint_commit_failure, <<"Source and target databases out of "
+            "sync. Try to increase max_dbs_open at both servers.">>}
+    end.
+
+
+update_checkpoint(Db, Doc, DbType) ->
+    try
+        update_checkpoint(Db, Doc)
+    catch throw:{checkpoint_commit_failure, Reason} ->
+        throw({checkpoint_commit_failure,
+            <<"Error updating the ", (to_binary(DbType))/binary,
+                " checkpoint document: ", (to_binary(Reason))/binary>>})
+    end.
+
+update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
+    try
+        case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
+        {ok, PosRevId} ->
+            PosRevId;
+        {error, Reason} ->
+            throw({checkpoint_commit_failure, Reason})
+        end
+    catch throw:conflict ->
+        case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
+        {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
+            % This means that we were able to update successfully the
+            % checkpoint doc in a previous attempt but we got a connection
+            % error (timeout for e.g.) before receiving the success response.
+            % Therefore the request was retried and we got a conflict, as the
+            % revision we sent is not the current one.
+            % We confirm this by verifying the doc body we just got is the same
+            % that we have just sent.
+            {Pos, RevId};
+        _ ->
+            throw({checkpoint_commit_failure, conflict})
+        end
+    end.
+
+
+commit_to_both(Source, Target) ->
+    % commit the src async
+    ParentPid = self(),
+    SrcCommitPid = spawn_link(
+        fun() ->
+            Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
+            ParentPid ! {self(), Result}
+        end),
+
+    % commit tgt sync
+    TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
+
+    SourceResult = receive
+    {SrcCommitPid, Result} ->
+        unlink(SrcCommitPid),
+        receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
+        Result;
+    {'EXIT', SrcCommitPid, Reason} ->
+        {error, Reason}
+    end,
+    case TargetResult of
+    {ok, TargetStartTime} ->
+        case SourceResult of
+        {ok, SourceStartTime} ->
+            {SourceStartTime, TargetStartTime};
+        SourceError ->
+            {source_error, SourceError}
+        end;
+    TargetError ->
+        {target_error, TargetError}
+    end.
+
+
+compare_replication_logs(SrcDoc, TgtDoc) ->
+    #doc{body={RepRecProps}} = SrcDoc,
+    #doc{body={RepRecPropsTgt}} = TgtDoc,
+    case get_value(<<"session_id">>, RepRecProps) ==
+            get_value(<<"session_id">>, RepRecPropsTgt) of
+    true ->
+        % if the records have the same session id,
+        % then we have a valid replication history
+        OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
+        OldHistory = get_value(<<"history">>, RepRecProps, []),
+        {OldSeqNum, OldHistory};
+    false ->
+        SourceHistory = get_value(<<"history">>, RepRecProps, []),
+        TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
+        ?LOG_INFO("Replication records differ. "
+                "Scanning histories to find a common ancestor.", []),
+        ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+                [RepRecProps, RepRecPropsTgt]),
+        compare_rep_history(SourceHistory, TargetHistory)
+    end.
+
+compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
+    ?LOG_INFO("no common ancestry -- performing full replication", []),
+    {?LOWEST_SEQ, []};
+compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
+    SourceId = get_value(<<"session_id">>, S),
+    case has_session_id(SourceId, Target) of
+    true ->
+        RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
+        ?LOG_INFO("found a common replication record with source_seq ~p",
+            [RecordSeqNum]),
+        {RecordSeqNum, SourceRest};
+    false ->
+        TargetId = get_value(<<"session_id">>, T),
+        case has_session_id(TargetId, SourceRest) of
+        true ->
+            RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
+            ?LOG_INFO("found a common replication record with source_seq ~p",
+                [RecordSeqNum]),
+            {RecordSeqNum, TargetRest};
+        false ->
+            compare_rep_history(SourceRest, TargetRest)
+        end
+    end.
+
+
+has_session_id(_SessionId, []) ->
+    false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+    case get_value(<<"session_id">>, Props, nil) of
+    SessionId ->
+        true;
+    _Else ->
+        has_session_id(SessionId, Rest)
+    end.
+
+
+db_monitor(#db{} = Db) ->
+    couch_db:monitor(Db);
+db_monitor(_HttpDb) ->
+    nil.
+
+
+source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
+    case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
+    {ok, Info} ->
+        get_value(<<"update_seq">>, Info, Seq);
+    _ ->
+        Seq
+    end;
+source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
+    {ok, Info} = couch_replicator_api_wrap:get_db_info(Db),
+    get_value(<<"update_seq">>, Info, Seq).
+
+
+update_task(State) ->
+    #rep_state{
+        current_through_seq = {_, CurSeq},
+        source_seq = SourceCurSeq
+    } = State,
+    couch_task_status:update(
+        rep_stats(State) ++ [
+        {source_seq, SourceCurSeq},
+        case is_number(CurSeq) andalso is_number(SourceCurSeq) of
+        true ->
+            case SourceCurSeq of
+            0 ->
+                {progress, 0};
+            _ ->
+                {progress, (CurSeq * 100) div SourceCurSeq}
+            end;
+        false ->
+            {progress, null}
+        end
+    ]).
+
+
+rep_stats(State) ->
+    #rep_state{
+        committed_seq = {_, CommittedSeq},
+        stats = Stats
+    } = State,
+    [
+        {revisions_checked, Stats#rep_stats.missing_checked},
+        {missing_revisions_found, Stats#rep_stats.missing_found},
+        {docs_read, Stats#rep_stats.docs_read},
+        {docs_written, Stats#rep_stats.docs_written},
+        {doc_write_failures, Stats#rep_stats.doc_write_failures},
+        {checkpointed_source_seq, CommittedSeq}
+    ].
+

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator.hrl b/apps/couch_replicator/src/couch_replicator.hrl
new file mode 100644
index 0000000..018aa4b
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator.hrl
@@ -0,0 +1,30 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(REP_ID_VERSION, 3).
+
+-record(rep, {
+    id,
+    source,
+    target,
+    options,
+    user_ctx,
+    doc_id
+}).
+
+-record(rep_stats, {
+    missing_checked = 0,
+    missing_found = 0,
+    docs_read = 0,
+    docs_written = 0,
+    doc_write_failures = 0
+}).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_api_wrap.erl b/apps/couch_replicator/src/couch_replicator_api_wrap.erl
new file mode 100644
index 0000000..311025b
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -0,0 +1,877 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_api_wrap).
+
+% This module wraps the native erlang API, and allows for performing
+% operations on a remote vs. local databases via the same API.
+%
+% Notes:
+% Many options and apis aren't yet supported here, they are added as needed.
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+
+-export([
+    db_open/2,
+    db_open/3,
+    db_close/1,
+    get_db_info/1,
+    update_doc/3,
+    update_doc/4,
+    update_docs/3,
+    update_docs/4,
+    ensure_full_commit/1,
+    get_missing_revs/2,
+    open_doc/3,
+    open_doc_revs/6,
+    changes_since/5,
+    db_uri/1
+    ]).
+
+-import(couch_replicator_httpc, [
+    send_req/3
+    ]).
+
+-import(couch_util, [
+    encode_doc_id/1,
+    get_value/2,
+    get_value/3
+    ]).
+
+-define(MAX_WAIT, 5 * 60 * 1000).
+
+db_uri(#httpdb{url = Url}) ->
+    couch_util:url_strip_password(Url);
+
+db_uri(#db{name = Name}) ->
+    db_uri(Name);
+
+db_uri(DbName) ->
+    ?b2l(DbName).
+
+
+db_open(Db, Options) ->
+    db_open(Db, Options, false).
+
+db_open(#httpdb{} = Db1, _Options, Create) ->
+    {ok, Db} = couch_replicator_httpc:setup(Db1),
+    case Create of
+    false ->
+        ok;
+    true ->
+        send_req(Db, [{method, put}], fun(_, _, _) -> ok end)
+    end,
+    send_req(Db, [{method, head}],
+        fun(200, _, _) ->
+            {ok, Db};
+        (401, _, _) ->
+            throw({unauthorized, ?l2b(db_uri(Db))});
+        (_, _, _) ->
+            throw({db_not_found, ?l2b(db_uri(Db))})
+        end);
+db_open(DbName, Options, Create) ->
+    try
+        case Create of
+        false ->
+            ok;
+        true ->
+            ok = couch_httpd:verify_is_server_admin(
+                get_value(user_ctx, Options)),
+            couch_db:create(DbName, Options)
+        end,
+        case couch_db:open(DbName, Options) of
+        {error, illegal_database_name, _} ->
+            throw({db_not_found, DbName});
+        {not_found, _Reason} ->
+            throw({db_not_found, DbName});
+        {ok, _Db} = Success ->
+            Success
+        end
+    catch
+    throw:{unauthorized, _} ->
+        throw({unauthorized, DbName})
+    end.
+
+db_close(#httpdb{httpc_pool = Pool}) ->
+    unlink(Pool),
+    ok = couch_replicator_httpc_pool:stop(Pool);
+db_close(DbName) ->
+    catch couch_db:close(DbName).
+
+
+get_db_info(#httpdb{} = Db) ->
+    send_req(Db, [],
+        fun(200, _, {Props}) ->
+            {ok, Props}
+        end);
+get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
+    {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+    {ok, Info} = couch_db:get_db_info(Db),
+    couch_db:close(Db),
+    {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
+
+
+ensure_full_commit(#httpdb{} = Db) ->
+    send_req(
+        Db,
+        [{method, post}, {path, "_ensure_full_commit"},
+            {headers, [{"Content-Type", "application/json"}]}],
+        fun(201, _, {Props}) ->
+            {ok, get_value(<<"instance_start_time">>, Props)};
+        (_, _, {Props}) ->
+            {error, get_value(<<"error">>, Props)}
+        end);
+ensure_full_commit(Db) ->
+    couch_db:ensure_full_commit(Db).
+
+
+get_missing_revs(#httpdb{} = Db, IdRevs) ->
+    JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]},
+    send_req(
+        Db,
+        [{method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)},
+            {headers, [{"Content-Type", "application/json"}]}],
+        fun(200, _, {Props}) ->
+            ConvertToNativeFun = fun({Id, {Result}}) ->
+                MissingRevs = couch_doc:parse_revs(
+                    get_value(<<"missing">>, Result)
+                ),
+                PossibleAncestors = couch_doc:parse_revs(
+                    get_value(<<"possible_ancestors">>, Result, [])
+                ),
+                {Id, MissingRevs, PossibleAncestors}
+            end,
+            {ok, lists:map(ConvertToNativeFun, Props)}
+        end);
+get_missing_revs(Db, IdRevs) ->
+    couch_db:get_missing_revs(Db, IdRevs).
+
+
+
+open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) ->
+    Path = encode_doc_id(Id),
+    QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+    Url = couch_util:url_strip_password(
+        couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}])
+    ),
+    ?LOG_ERROR("Replication crashing because GET ~s failed", [Url]),
+    exit(kaboom);
+open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
+    Path = encode_doc_id(Id),
+    QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+    {Pid, Ref} = spawn_monitor(fun() ->
+        Self = self(),
+        Callback = fun(200, Headers, StreamDataFun) ->
+            remote_open_doc_revs_streamer_start(Self),
+            {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+                get_value("Content-Type", Headers),
+                StreamDataFun,
+                fun mp_parse_mixed/1
+            )
+        end,
+        Streamer = spawn_link(fun() ->
+            Params = [
+                {path, Path},
+                {qs, QS},
+                {ibrowse_options, [{stream_to, {self(), once}}]},
+                {headers, [{"Accept", "multipart/mixed"}]}
+            ],
+            % We're setting retries to 0 here to avoid the case where the
+            % Streamer retries the request and ends up jumbling together two
+            % different response bodies.  Retries are handled explicitly by
+            % open_doc_revs itself.
+            send_req(HttpDb#httpdb{retries = 0}, Params, Callback)
+        end),
+        % If this process dies normally we can leave
+        % the Streamer process hanging around keeping an
+        % HTTP connection open. This is a bit of a
+        % hammer approach to making sure it releases
+        % that connection back to the pool.
+        spawn(fun() ->
+            Ref = erlang:monitor(process, Self),
+            receive
+                {'DOWN', Ref, process, Self, normal} ->
+                    exit(Streamer, {streamer_parent_died, Self});
+                {'DOWN', Ref, process, Self, _} ->
+                    ok
+                end
+        end),
+        receive
+        {started_open_doc_revs, Ref} ->
+            Ret = receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc),
+            exit({exit_ok, Ret})
+        end
+    end),
+    receive
+        {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
+            Ret;
+        {'DOWN', Ref, process, Pid, {{nocatch, {missing_stub,_} = Stub}, _}} ->
+            throw(Stub);
+        {'DOWN', Ref, process, Pid, Else} ->
+            Url = couch_util:url_strip_password(
+                couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}])
+            ),
+            #httpdb{retries = Retries, wait = Wait0} = HttpDb,
+            Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT),
+            ?LOG_INFO("Retrying GET to ~s in ~p seconds due to error ~p",
+                [Url, Wait / 1000, error_reason(Else)]
+            ),
+            ok = timer:sleep(Wait),
+            RetryDb = HttpDb#httpdb{
+                retries = Retries - 1,
+                wait = Wait
+            },
+            open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc)
+    end;
+open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
+    {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
+    {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
+
+error_reason({http_request_failed, "GET", _Url, {error, timeout}}) ->
+    timeout;
+error_reason({http_request_failed, "GET", _Url, {error, {_, req_timedout}}}) ->
+    req_timedout;
+error_reason({http_request_failed, "GET", _Url, Error}) ->
+    Error;
+error_reason(Else) ->
+    Else.
+
+open_doc(#httpdb{} = Db, Id, Options) ->
+    send_req(
+        Db,
+        [{path, encode_doc_id(Id)}, {qs, options_to_query_args(Options, [])}],
+        fun(200, _, Body) ->
+            {ok, couch_doc:from_json_obj(Body)};
+        (_, _, {Props}) ->
+            {error, get_value(<<"error">>, Props)}
+        end);
+open_doc(Db, Id, Options) ->
+    case couch_db:open_doc(Db, Id, Options) of
+    {ok, _} = Ok ->
+        Ok;
+    {not_found, _Reason} ->
+        {error, <<"not_found">>}
+    end.
+
+
+update_doc(Db, Doc, Options) ->
+    update_doc(Db, Doc, Options, interactive_edit).
+
+update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
+    QArgs = case Type of
+    replicated_changes ->
+        [{"new_edits", "false"}];
+    _ ->
+        []
+    end ++ options_to_query_args(Options, []),
+    Boundary = couch_uuids:random(),
+    JsonBytes = ?JSON_ENCODE(
+        couch_doc:to_json_obj(
+          Doc, [revs, attachments, follows, att_encoding_info | Options])),
+    {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
+        JsonBytes, Doc#doc.atts, true),
+    Headers = case lists:member(delay_commit, Options) of
+    true ->
+        [{"X-Couch-Full-Commit", "false"}];
+    false ->
+        []
+    end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
+    Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
+    send_req(
+        % A crash here bubbles all the way back up to run_user_fun inside
+        % open_doc_revs, which will retry the whole thing.  That's the
+        % appropriate course of action, since we've already started streaming
+        % the response body from the GET request.
+        HttpDb#httpdb{retries = 0},
+        [{method, put}, {path, encode_doc_id(DocId)},
+            {qs, QArgs}, {headers, Headers}, {body, Body}],
+        fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->
+                {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
+            (409, _, _) ->
+                throw(conflict);
+            (Code, _, {Props}) ->
+                case {Code, get_value(<<"error">>, Props)} of
+                {401, <<"unauthorized">>} ->
+                    throw({unauthorized, get_value(<<"reason">>, Props)});
+                {403, <<"forbidden">>} ->
+                    throw({forbidden, get_value(<<"reason">>, Props)});
+                {412, <<"missing_stub">>} ->
+                    throw({missing_stub, get_value(<<"reason">>, Props)});
+                {_, Error} ->
+                    {error, Error}
+                end
+        end);
+update_doc(Db, Doc, Options, Type) ->
+    couch_db:update_doc(Db, Doc, Options, Type).
+
+
+update_docs(Db, DocList, Options) ->
+    update_docs(Db, DocList, Options, interactive_edit).
+
+update_docs(_Db, [], _Options, _UpdateType) ->
+    {ok, []};
+update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
+    FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
+    Prefix = case UpdateType of
+    replicated_changes ->
+        <<"{\"new_edits\":false,\"docs\":[">>;
+    interactive_edit ->
+        <<"{\"docs\":[">>
+    end,
+    Suffix = <<"]}">>,
+    % Note: nginx and other servers don't like PUT/POST requests without
+    % a Content-Length header, so we can't do a chunked transfer encoding
+    % and JSON encode each doc only before sending it through the socket.
+    {Docs, Len} = lists:mapfoldl(
+        fun(#doc{} = Doc, Acc) ->
+            Json = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
+            {Json, Acc + iolist_size(Json)};
+        (Doc, Acc) ->
+            {Doc, Acc + iolist_size(Doc)}
+        end,
+        byte_size(Prefix) + byte_size(Suffix) + length(DocList) - 1,
+        DocList),
+    BodyFun = fun(eof) ->
+            eof;
+        ([]) ->
+            {ok, Suffix, eof};
+        ([prefix | Rest]) ->
+            {ok, Prefix, Rest};
+        ([Doc]) ->
+            {ok, Doc, []};
+        ([Doc | RestDocs]) ->
+            {ok, [Doc, ","], RestDocs}
+    end,
+    Headers = [
+        {"Content-Length", Len},
+        {"Content-Type", "application/json"},
+        {"X-Couch-Full-Commit", FullCommit}
+    ],
+    send_req(
+        HttpDb,
+        [{method, post}, {path, "_bulk_docs"},
+            {body, {BodyFun, [prefix | Docs]}}, {headers, Headers}],
+        fun(201, _, Results) when is_list(Results) ->
+                {ok, bulk_results_to_errors(DocList, Results, remote)};
+           (417, _, Results) when is_list(Results) ->
+                {ok, bulk_results_to_errors(DocList, Results, remote)}
+        end);
+update_docs(Db, DocList, Options, UpdateType) ->
+    Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
+    {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
+
+
+changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq,
+    UserFun, Options) ->
+    HeartBeat = erlang:max(1000, HttpDb#httpdb.timeout div 3),
+    BaseQArgs = case get_value(continuous, Options, false) of
+    false ->
+        [{"feed", "normal"}];
+    true ->
+        [{"feed", "continuous"}]
+    end ++ [
+        {"style", atom_to_list(Style)}, {"since", ?JSON_ENCODE(StartSeq)},
+        {"heartbeat", integer_to_list(HeartBeat)}
+    ],
+    DocIds = get_value(doc_ids, Options),
+    {QArgs, Method, Body, Headers} = case DocIds of
+    undefined ->
+        QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
+        {QArgs1, get, [], Headers1};
+    _ when is_list(DocIds) ->
+        Headers2 = [{"Content-Type", "application/json"} | Headers1],
+        JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
+        {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
+    end,
+    send_req(
+        HttpDb,
+        [{method, Method}, {path, "_changes"}, {qs, QArgs},
+            {headers, Headers}, {body, Body},
+            {ibrowse_options, [{stream_to, {self(), once}}]}],
+        fun(200, _, DataStreamFun) ->
+                parse_changes_feed(Options, UserFun, DataStreamFun);
+            (405, _, _) when is_list(DocIds) ->
+                % CouchDB versions < 1.1.0 don't have the builtin _changes feed
+                % filter "_doc_ids" neither support POST
+                send_req(HttpDb, [{method, get}, {path, "_changes"},
+                    {qs, BaseQArgs}, {headers, Headers1},
+                    {ibrowse_options, [{stream_to, {self(), once}}]}],
+                    fun(200, _, DataStreamFun2) ->
+                        UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
+                            case lists:member(Id, DocIds) of
+                            true ->
+                                UserFun(DocInfo);
+                            false ->
+                                ok
+                            end
+                        end,
+                        parse_changes_feed(Options, UserFun2, DataStreamFun2)
+                    end)
+        end);
+changes_since(Db, Style, StartSeq, UserFun, Options) ->
+    Filter = case get_value(doc_ids, Options) of
+    undefined ->
+        ?b2l(get_value(filter, Options, <<>>));
+    _DocIds ->
+        "_doc_ids"
+    end,
+    Args = #changes_args{
+        style = Style,
+        since = StartSeq,
+        filter = Filter,
+        feed = case get_value(continuous, Options, false) of
+            true ->
+                "continuous";
+            false ->
+                "normal"
+        end,
+        timeout = infinity
+    },
+    QueryParams = get_value(query_params, Options, {[]}),
+    Req = changes_json_req(Db, Filter, QueryParams, Options),
+    ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db),
+    ChangesFeedFun(fun({change, Change, _}, _) ->
+            UserFun(json_to_doc_info(Change));
+        (_, _) ->
+            ok
+    end).
+
+
+% internal functions
+
+maybe_add_changes_filter_q_args(BaseQS, Options) ->
+    case get_value(filter, Options) of
+    undefined ->
+        BaseQS;
+    FilterName ->
+        {Params} = get_value(query_params, Options, {[]}),
+        [{"filter", ?b2l(FilterName)} | lists:foldl(
+            fun({K, V}, QSAcc) ->
+                Ks = couch_util:to_list(K),
+                case lists:keymember(Ks, 1, QSAcc) of
+                true ->
+                    QSAcc;
+                false ->
+                    [{Ks, couch_util:to_list(V)} | QSAcc]
+                end
+            end,
+            BaseQS, Params)]
+    end.
+
+parse_changes_feed(Options, UserFun, DataStreamFun) ->
+    case get_value(continuous, Options, false) of
+    true ->
+        continuous_changes(DataStreamFun, UserFun);
+    false ->
+        EventFun = fun(Ev) ->
+            changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, [])
+        end,
+        json_stream_parse:events(DataStreamFun, EventFun)
+    end.
+
+changes_json_req(_Db, "", _QueryParams, _Options) ->
+    {[]};
+changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
+    {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
+changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
+    {ok, Info} = couch_db:get_db_info(Db),
+    % simulate a request to db_name/_changes
+    {[
+        {<<"info">>, {Info}},
+        {<<"id">>, null},
+        {<<"method">>, 'GET'},
+        {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+        {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}},
+        {<<"headers">>, []},
+        {<<"body">>, []},
+        {<<"peer">>, <<"replicator">>},
+        {<<"form">>, []},
+        {<<"cookie">>, []},
+        {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+    ]}.
+
+
+options_to_query_args(HttpDb, Path, Options) ->
+    case lists:keytake(atts_since, 1, Options) of
+    false ->
+        options_to_query_args(Options, []);
+    {value, {atts_since, []}, Options2} ->
+        options_to_query_args(Options2, []);
+    {value, {atts_since, PAs}, Options2} ->
+        QueryArgs1 = options_to_query_args(Options2, []),
+        FullUrl = couch_replicator_httpc:full_url(
+            HttpDb, [{path, Path}, {qs, QueryArgs1}]),
+        RevList = atts_since_arg(
+            length("GET " ++ FullUrl ++ " HTTP/1.1\r\n") +
+            length("&atts_since=") + 6,  % +6 = % encoded [ and ]
+            PAs, []),
+        [{"atts_since", ?JSON_ENCODE(RevList)} | QueryArgs1]
+    end.
+
+
+options_to_query_args([], Acc) ->
+    lists:reverse(Acc);
+options_to_query_args([ejson_body | Rest], Acc) ->
+    options_to_query_args(Rest, Acc);
+options_to_query_args([delay_commit | Rest], Acc) ->
+    options_to_query_args(Rest, Acc);
+options_to_query_args([revs | Rest], Acc) ->
+    options_to_query_args(Rest, [{"revs", "true"} | Acc]);
+options_to_query_args([{open_revs, all} | Rest], Acc) ->
+    options_to_query_args(Rest, [{"open_revs", "all"} | Acc]);
+options_to_query_args([latest | Rest], Acc) ->
+    options_to_query_args(Rest, [{"latest", "true"} | Acc]);
+options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
+    JsonRevs = ?b2l(?JSON_ENCODE(couch_doc:revs_to_strs(Revs))),
+    options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).
+
+
+-define(MAX_URL_LEN, 7000).
+
+atts_since_arg(_UrlLen, [], Acc) ->
+    lists:reverse(Acc);
+atts_since_arg(UrlLen, [PA | Rest], Acc) ->
+    RevStr = couch_doc:rev_to_str(PA),
+    NewUrlLen = case Rest of
+    [] ->
+        % plus 2 double quotes (% encoded)
+        UrlLen + size(RevStr) + 6;
+    _ ->
+        % plus 2 double quotes and a comma (% encoded)
+        UrlLen + size(RevStr) + 9
+    end,
+    case NewUrlLen >= ?MAX_URL_LEN of
+    true ->
+        lists:reverse(Acc);
+    false ->
+        atts_since_arg(NewUrlLen, Rest, [RevStr | Acc])
+    end.
+
+
+% TODO: A less verbose, more elegant and automatic restart strategy for
+%       the exported open_doc_revs/6 function. The restart should be
+%       transparent to the caller like any other Couch API function exported
+%       by this module.
+receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) ->
+    try
+        % Left only for debugging purposes via an interactive or remote shell
+        erlang:put(open_doc_revs, {Id, Revs, Ref, Streamer}),
+        receive_docs(Streamer, Fun, Ref, Acc)
+    catch
+    error:{restart_open_doc_revs, NewRef} ->
+        receive_docs_loop(Streamer, Fun, Id, Revs, NewRef, Acc)
+    end.
+
+receive_docs(Streamer, UserFun, Ref, UserAcc) ->
+    Streamer ! {get_headers, Ref, self()},
+    receive
+    {started_open_doc_revs, NewRef} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {headers, Ref, Headers} ->
+        case get_value("content-type", Headers) of
+        {"multipart/related", _} = ContentType ->
+            case doc_from_multi_part_stream(
+                ContentType,
+                fun() -> receive_doc_data(Streamer, Ref) end,
+                Ref) of
+            {ok, Doc, Parser} ->
+                case run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref) of
+                {ok, UserAcc2} ->
+                    ok;
+                {skip, UserAcc2} ->
+                    couch_doc:abort_multi_part_stream(Parser)
+                end,
+                receive_docs(Streamer, UserFun, Ref, UserAcc2)
+            end;
+        {"application/json", []} ->
+            Doc = couch_doc:from_json_obj(
+                    ?JSON_DECODE(receive_all(Streamer, Ref, []))),
+            {_, UserAcc2} = run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref),
+            receive_docs(Streamer, UserFun, Ref, UserAcc2);
+        {"application/json", [{"error","true"}]} ->
+            {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
+            Rev = get_value(<<"missing">>, ErrorProps),
+            Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
+            {_, UserAcc2} = run_user_fun(UserFun, Result, UserAcc, Ref),
+            receive_docs(Streamer, UserFun, Ref, UserAcc2)
+        end;
+    {done, Ref} ->
+        {ok, UserAcc}
+    end.
+
+
+run_user_fun(UserFun, Arg, UserAcc, OldRef) ->
+    {Pid, Ref} = spawn_monitor(fun() ->
+        try UserFun(Arg, UserAcc) of
+            Resp ->
+                exit({exit_ok, Resp})
+        catch
+            throw:Reason ->
+                exit({exit_throw, Reason});
+            error:Reason ->
+                exit({exit_error, Reason});
+            exit:Reason ->
+                exit({exit_exit, Reason})
+        end
+    end),
+    receive
+        {started_open_doc_revs, NewRef} ->
+            erlang:demonitor(Ref, [flush]),
+            exit(Pid, kill),
+            restart_remote_open_doc_revs(OldRef, NewRef);
+        {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
+            Ret;
+        {'DOWN', Ref, process, Pid, {exit_throw, Reason}} ->
+            throw(Reason);
+        {'DOWN', Ref, process, Pid, {exit_error, Reason}} ->
+            erlang:error(Reason);
+        {'DOWN', Ref, process, Pid, {exit_exit, Reason}} ->
+            erlang:exit(Reason)
+    end.
+
+
+restart_remote_open_doc_revs(Ref, NewRef) ->
+    receive
+    {body_bytes, Ref, _} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {body_done, Ref} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {done, Ref} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {headers, Ref, _} ->
+        restart_remote_open_doc_revs(Ref, NewRef)
+    after 0 ->
+        erlang:error({restart_open_doc_revs, NewRef})
+    end.
+
+
+remote_open_doc_revs_streamer_start(Parent) ->
+    receive
+    {get_headers, _Ref, Parent} ->
+        remote_open_doc_revs_streamer_start(Parent);
+    {next_bytes, _Ref, Parent} ->
+        remote_open_doc_revs_streamer_start(Parent)
+    after 0 ->
+        Parent ! {started_open_doc_revs, make_ref()}
+    end.
+
+
+receive_all(Streamer, Ref, Acc) ->
+    Streamer ! {next_bytes, Ref, self()},
+    receive
+    {started_open_doc_revs, NewRef} ->
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {body_bytes, Ref, Bytes} ->
+        receive_all(Streamer, Ref, [Bytes | Acc]);
+    {body_done, Ref} ->
+        lists:reverse(Acc)
+    end.
+
+
+mp_parse_mixed(eof) ->
+    receive {get_headers, Ref, From} ->
+        From ! {done, Ref}
+    end;
+mp_parse_mixed({headers, H}) ->
+    receive {get_headers, Ref, From} ->
+        From ! {headers, Ref, H}
+    end,
+    fun mp_parse_mixed/1;
+mp_parse_mixed({body, Bytes}) ->
+    receive {next_bytes, Ref, From} ->
+        From ! {body_bytes, Ref, Bytes}
+    end,
+    fun mp_parse_mixed/1;
+mp_parse_mixed(body_end) ->
+    receive {next_bytes, Ref, From} ->
+        From ! {body_done, Ref};
+    {get_headers, Ref, From} ->
+        self() ! {get_headers, Ref, From}
+    end,
+    fun mp_parse_mixed/1.
+
+
+receive_doc_data(Streamer, Ref) ->
+    Streamer ! {next_bytes, Ref, self()},
+    receive
+    {body_bytes, Ref, Bytes} ->
+        {Bytes, fun() -> receive_doc_data(Streamer, Ref) end};
+    {body_done, Ref} ->
+        {<<>>, fun() -> receive_doc_data(Streamer, Ref) end}
+    end.
+
+doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
+    Self = self(),
+    Parser = spawn_link(fun() ->
+        {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+            ContentType, DataFun,
+            fun(Next) -> couch_doc:mp_parse_doc(Next, []) end),
+        unlink(Self)
+        end),
+    Parser ! {get_doc_bytes, Ref, self()},
+    receive
+    {started_open_doc_revs, NewRef} ->
+        unlink(Parser),
+        exit(Parser, kill),
+        restart_remote_open_doc_revs(Ref, NewRef);
+    {doc_bytes, Ref, DocBytes} ->
+        Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
+        ReadAttachmentDataFun = fun() ->
+            link(Parser),
+            Parser ! {get_bytes, Ref, self()},
+            receive
+            {started_open_doc_revs, NewRef} ->
+                unlink(Parser),
+                exit(Parser, kill),
+                receive {bytes, Ref, _} -> ok after 0 -> ok end,
+                restart_remote_open_doc_revs(Ref, NewRef);
+            {bytes, Ref, Bytes} ->
+                Bytes
+            end
+        end,
+        Atts2 = lists:map(
+            fun(#att{data = follows} = A) ->
+                A#att{data = ReadAttachmentDataFun};
+            (A) ->
+                A
+            end, Doc#doc.atts),
+        {ok, Doc#doc{atts = Atts2}, Parser}
+    end.
+
+
+changes_ev1(object_start, UserFun, UserAcc) ->
+    fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
+
+changes_ev2({key, <<"results">>}, UserFun, UserAcc) ->
+    fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end;
+changes_ev2(_, UserFun, UserAcc) ->
+    fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
+
+changes_ev3(array_start, UserFun, UserAcc) ->
+    fun(Ev) -> changes_ev_loop(Ev, UserFun, UserAcc) end.
+
+changes_ev_loop(object_start, UserFun, UserAcc) ->
+    fun(Ev) ->
+        json_stream_parse:collect_object(Ev,
+            fun(Obj) ->
+                UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc),
+                fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end
+            end)
+    end;
+changes_ev_loop(array_end, _UserFun, _UserAcc) ->
+    fun(_Ev) -> changes_ev_done() end.
+
+changes_ev_done() ->
+    fun(_Ev) -> changes_ev_done() end.
+
+continuous_changes(DataFun, UserFun) ->
+    {DataFun2, _, Rest} = json_stream_parse:events(
+        DataFun,
+        fun(Ev) -> parse_changes_line(Ev, UserFun) end),
+    continuous_changes(fun() -> {Rest, DataFun2} end, UserFun).
+
+parse_changes_line(object_start, UserFun) ->
+    fun(Ev) ->
+        json_stream_parse:collect_object(Ev,
+            fun(Obj) -> UserFun(json_to_doc_info(Obj)) end)
+    end.
+
+json_to_doc_info({Props}) ->
+    RevsInfo = lists:map(
+        fun({Change}) ->
+            Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)),
+            Del = (true =:= get_value(<<"deleted">>, Change)),
+            #rev_info{rev=Rev, deleted=Del}
+        end, get_value(<<"changes">>, Props)),
+    #doc_info{
+        id = get_value(<<"id">>, Props),
+        high_seq = get_value(<<"seq">>, Props),
+        revs = RevsInfo
+    }.
+
+
+bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
+    lists:reverse(lists:foldl(
+        fun({_, {ok, _}}, Acc) ->
+            Acc;
+        ({#doc{id = Id, revs = {Pos, [RevId | _]}}, Error}, Acc) ->
+            {_, Error, Reason} = couch_httpd:error_info(Error),
+            [ {[{id, Id}, {rev, rev_to_str({Pos, RevId})},
+                {error, Error}, {reason, Reason}]} | Acc ]
+        end,
+        [], lists:zip(Docs, Results)));
+
+bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) ->
+    bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit);
+
+bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) ->
+    lists:map(
+        fun({{Id, Rev}, Err}) ->
+            {_, Error, Reason} = couch_httpd:error_info(Err),
+            {[{id, Id}, {rev, rev_to_str(Rev)}, {error, Error}, {reason, Reason}]}
+        end,
+        Results);
+
+bulk_results_to_errors(_Docs, Results, remote) ->
+    lists:reverse(lists:foldl(
+        fun({Props}, Acc) ->
+            case get_value(<<"error">>, Props, get_value(error, Props)) of
+            undefined ->
+                Acc;
+            Error ->
+                Id = get_value(<<"id">>, Props, get_value(id, Props)),
+                Rev = get_value(<<"rev">>, Props, get_value(rev, Props)),
+                Reason = get_value(<<"reason">>, Props, get_value(reason, Props)),
+                [ {[{id, Id}, {rev, rev_to_str(Rev)},
+                    {error, Error}, {reason, Reason}]} | Acc ]
+            end
+        end,
+        [], Results)).
+
+
+rev_to_str({_Pos, _Id} = Rev) ->
+    couch_doc:rev_to_str(Rev);
+rev_to_str(Rev) ->
+    Rev.
+
+write_fun() ->
+    fun(Data) ->
+        receive {get_data, Ref, From} ->
+            From ! {data, Ref, Data}
+        end
+    end.
+
+stream_doc({JsonBytes, Atts, Boundary, Len}) ->
+    case erlang:erase({doc_streamer, Boundary}) of
+    Pid when is_pid(Pid) ->
+        unlink(Pid),
+        exit(Pid, kill);
+    _ ->
+        ok
+    end,
+    DocStreamer = spawn_link(
+        couch_doc,
+        doc_to_multi_part_stream,
+        [Boundary, JsonBytes, Atts, write_fun(), true]
+    ),
+    erlang:put({doc_streamer, Boundary}, DocStreamer),
+    {ok, <<>>, {Len, Boundary}};
+stream_doc({0, Id}) ->
+    erlang:erase({doc_streamer, Id}),
+    eof;
+stream_doc({LenLeft, Id}) when LenLeft > 0 ->
+    Ref = make_ref(),
+    erlang:get({doc_streamer, Id}) ! {get_data, Ref, self()},
+    receive {data, Ref, Data} ->
+        {ok, Data, {LenLeft - iolist_size(Data), Id}}
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_api_wrap.hrl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_api_wrap.hrl b/apps/couch_replicator/src/couch_replicator_api_wrap.hrl
new file mode 100644
index 0000000..1a6f27a
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_api_wrap.hrl
@@ -0,0 +1,36 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+
+-record(httpdb, {
+    url,
+    oauth = nil,
+    headers = [
+        {"Accept", "application/json"},
+        {"User-Agent", "CouchDB/" ++ couch_server:get_version()}
+    ],
+    timeout,            % milliseconds
+    ibrowse_options = [],
+    retries = 10,
+    wait = 250,         % milliseconds
+    httpc_pool = nil,
+    http_connections
+}).
+
+-record(oauth, {
+    consumer_key,
+    token,
+    token_secret,
+    consumer_secret,
+    signature_method
+}).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_httpc.erl b/apps/couch_replicator/src/couch_replicator_httpc.erl
new file mode 100644
index 0000000..c13bf18
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_httpc.erl
@@ -0,0 +1,297 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_httpc).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
+
+-export([setup/1]).
+-export([send_req/3]).
+-export([full_url/2]).
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3
+]).
+
+-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
+-define(MAX_WAIT, 5 * 60 * 1000).
+
+
+setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
+    {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
+    {ok, Db#httpdb{httpc_pool = Pid}}.
+
+
+send_req(HttpDb, Params1, Callback) ->
+    Params2 = ?replace(Params1, qs,
+        [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
+    Params = ?replace(Params2, ibrowse_options,
+        lists:keysort(1, get_value(ibrowse_options, Params2, []))),
+    {Worker, Response} = send_ibrowse_req(HttpDb, Params),
+    process_response(Response, Worker, HttpDb, Params, Callback).
+
+
+send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
+    Method = get_value(method, Params, get),
+    UserHeaders = lists:keysort(1, get_value(headers, Params, [])),
+    Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders),
+    Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
+    Url = full_url(HttpDb, Params),
+    Body = get_value(body, Params, []),
+    case get_value(path, Params) of
+    "_changes" ->
+        {ok, Worker} = ibrowse:spawn_link_worker_process(Url);
+    _ ->
+        {ok, Worker} = couch_replicator_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool)
+    end,
+    IbrowseOptions = [
+        {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} |
+        lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
+            HttpDb#httpdb.ibrowse_options)
+    ],
+    Response = ibrowse:send_req_direct(
+        Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity),
+    {Worker, Response}.
+
+
+process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
+    send_req(HttpDb, Params, Callback);
+
+process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
+    % ibrowse worker terminated because remote peer closed the socket
+    % -> not an error
+    send_req(HttpDb, Params, Cb);
+
+process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
+    process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
+
+process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
+    release_worker(Worker, HttpDb),
+    case list_to_integer(Code) of
+    Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
+        EJson = case Body of
+        <<>> ->
+            null;
+        Json ->
+            ?JSON_DECODE(Json)
+        end,
+        Callback(Ok, Headers, EJson);
+    R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+        do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
+    Error ->
+        maybe_retry({code, Error}, Worker, HttpDb, Params, Callback)
+    end;
+
+process_response(Error, Worker, HttpDb, Params, Callback) ->
+    maybe_retry(Error, Worker, HttpDb, Params, Callback).
+
+
+process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
+    receive
+    {ibrowse_async_headers, ReqId, Code, Headers} ->
+        case list_to_integer(Code) of
+        Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
+            StreamDataFun = fun() ->
+                stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
+            end,
+            ibrowse:stream_next(ReqId),
+            try
+                Ret = Callback(Ok, Headers, StreamDataFun),
+                release_worker(Worker, HttpDb),
+                clean_mailbox_req(ReqId),
+                Ret
+            catch throw:{maybe_retry_req, Err} ->
+                clean_mailbox_req(ReqId),
+                maybe_retry(Err, Worker, HttpDb, Params, Callback)
+            end;
+        R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+            do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
+        Error ->
+            report_error(Worker, HttpDb, Params, {code, Error})
+        end;
+    {ibrowse_async_response, ReqId, {error, _} = Error} ->
+        maybe_retry(Error, Worker, HttpDb, Params, Callback)
+    after HttpDb#httpdb.timeout + 500 ->
+        % Note: ibrowse should always reply with timeouts, but this doesn't
+        % seem to be always true when there's a very high rate of requests
+        % and many open connections.
+        maybe_retry(timeout, Worker, HttpDb, Params, Callback)
+    end.
+
+
+clean_mailbox_req(ReqId) ->
+    receive
+    {ibrowse_async_response, ReqId, _} ->
+        clean_mailbox_req(ReqId);
+    {ibrowse_async_response_end, ReqId} ->
+        clean_mailbox_req(ReqId)
+    after 0 ->
+        ok
+    end.
+
+
+release_worker(Worker, #httpdb{httpc_pool = Pool}) ->
+    ok = couch_replicator_httpc_pool:release_worker(Pool, Worker).
+
+
+maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
+    report_error(Worker, HttpDb, Params, {error, Error});
+
+maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+    Params, Cb) ->
+    release_worker(Worker, HttpDb),
+    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+    ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~s",
+        [Method, Url, Wait / 1000, error_cause(Error)]),
+    ok = timer:sleep(Wait),
+    Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
+    send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, Params, Cb).
+
+
+report_error(Worker, HttpDb, Params, Error) ->
+    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+    do_report_error(Url, Method, Error),
+    release_worker(Worker, HttpDb),
+    exit({http_request_failed, Method, Url, Error}).
+
+
+do_report_error(Url, Method, {code, Code}) ->
+    ?LOG_ERROR("Replicator, request ~s to ~p failed. The received "
+        "HTTP error code is ~p", [Method, Url, Code]);
+
+do_report_error(FullUrl, Method, Error) ->
+    ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~s",
+        [Method, FullUrl, error_cause(Error)]).
+
+
+error_cause({error, Cause}) ->
+    lists:flatten(io_lib:format("~p", [Cause]));
+error_cause(Cause) ->
+    lists:flatten(io_lib:format("~p", [Cause])).
+
+
+stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
+    case accumulate_messages(ReqId, [], T + 500) of
+    {Data, ibrowse_async_response} ->
+        ibrowse:stream_next(ReqId),
+        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
+    {Data, ibrowse_async_response_end} ->
+        {Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
+    end.
+
+accumulate_messages(ReqId, Acc, Timeout) ->
+    receive
+    {ibrowse_async_response, ReqId, {error, Error}} ->
+        throw({maybe_retry_req, Error});
+    {ibrowse_async_response, ReqId, <<>>} ->
+        accumulate_messages(ReqId, Acc, Timeout);
+    {ibrowse_async_response, ReqId, Data} ->
+        accumulate_messages(ReqId, [Data | Acc], 0);
+    {ibrowse_async_response_end, ReqId} ->
+        {iolist_to_binary(lists:reverse(Acc)), ibrowse_async_response_end}
+    after Timeout ->
+        % Note: ibrowse should always reply with timeouts, but this doesn't
+        % seem to be always true when there's a very high rate of requests
+        % and many open connections.
+        if Acc =:= [] ->
+            throw({maybe_retry_req, timeout});
+        true ->
+            {iolist_to_binary(lists:reverse(Acc)), ibrowse_async_response}
+        end
+    end.
+
+
+full_url(#httpdb{url = BaseUrl}, Params) ->
+    Path = get_value(path, Params, []),
+    QueryArgs = get_value(qs, Params, []),
+    BaseUrl ++ Path ++ query_args_to_string(QueryArgs, []).
+
+
+query_args_to_string([], []) ->
+    "";
+query_args_to_string([], Acc) ->
+    "?" ++ string:join(lists:reverse(Acc), "&");
+query_args_to_string([{K, V} | Rest], Acc) ->
+    query_args_to_string(Rest, [K ++ "=" ++ couch_httpd:quote(V) | Acc]).
+
+
+oauth_header(#httpdb{oauth = nil}, _ConnParams) ->
+    [];
+oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
+    Consumer = {
+        OAuth#oauth.consumer_key,
+        OAuth#oauth.consumer_secret,
+        OAuth#oauth.signature_method
+    },
+    Method = case get_value(method, ConnParams, get) of
+    get -> "GET";
+    post -> "POST";
+    put -> "PUT";
+    head -> "HEAD"
+    end,
+    QSL = get_value(qs, ConnParams, []),
+    OAuthParams = oauth:sign(Method,
+        BaseUrl ++ get_value(path, ConnParams, []),
+        QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL,
+    [{"Authorization",
+        "OAuth " ++ oauth:header_params_encode(OAuthParams)}].
+
+
+do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
+    release_worker(Worker, HttpDb),
+    RedirectUrl = redirect_url(Headers, Url),
+    {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
+    send_req(HttpDb2, Params2, Cb).
+
+
+redirect_url(RespHeaders, OrigUrl) ->
+    MochiHeaders = mochiweb_headers:make(RespHeaders),
+    RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
+    #url{
+        host = Host,
+        host_type = HostType,
+        port = Port,
+        path = Path,  % includes query string
+        protocol = Proto
+    } = ibrowse_lib:parse_url(RedUrl),
+    #url{
+        username = User,
+        password = Passwd
+    } = ibrowse_lib:parse_url(OrigUrl),
+    Creds = case is_list(User) andalso is_list(Passwd) of
+    true ->
+        User ++ ":" ++ Passwd ++ "@";
+    false ->
+        []
+    end,
+    HostPart = case HostType of
+    ipv6_address ->
+        "[" ++ Host ++ "]";
+    _ ->
+        Host
+    end,
+    atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++
+        integer_to_list(Port) ++ Path.
+
+after_redirect(RedirectUrl, 303, HttpDb, Params) ->
+    after_redirect(RedirectUrl, HttpDb, ?replace(Params, method, get));
+after_redirect(RedirectUrl, _Code, HttpDb, Params) ->
+    after_redirect(RedirectUrl, HttpDb, Params).
+
+after_redirect(RedirectUrl, HttpDb, Params) ->
+    Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
+    {HttpDb#httpdb{url = RedirectUrl}, Params2}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/add91738/apps/couch_replicator/src/couch_replicator_httpc_pool.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_httpc_pool.erl b/apps/couch_replicator/src/couch_replicator_httpc_pool.erl
new file mode 100644
index 0000000..a82a2df
--- /dev/null
+++ b/apps/couch_replicator/src/couch_replicator_httpc_pool.erl
@@ -0,0 +1,138 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_httpc_pool).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/2, stop/1]).
+-export([get_worker/1, release_worker/2]).
+
+% gen_server API
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-import(couch_util, [
+    get_value/2,
+    get_value/3
+]).
+
+-record(state, {
+    url,
+    limit,                  % max # of workers allowed
+    free = [],              % free workers (connections)
+    busy = [],              % busy workers (connections)
+    waiting = queue:new()   % blocked clients waiting for a worker
+}).
+
+
+start_link(Url, Options) ->
+    gen_server:start_link(?MODULE, {Url, Options}, []).
+
+
+stop(Pool) ->
+    ok = gen_server:call(Pool, stop, infinity).
+
+
+get_worker(Pool) ->
+    {ok, _Worker} = gen_server:call(Pool, get_worker, infinity).
+
+
+release_worker(Pool, Worker) ->
+    ok = gen_server:cast(Pool, {release_worker, Worker}).
+
+
+init({Url, Options}) ->
+    process_flag(trap_exit, true),
+    State = #state{
+        url = Url,
+        limit = get_value(max_connections, Options)
+    },
+    {ok, State}.
+
+
+handle_call(get_worker, From, #state{waiting = Waiting} = State) ->
+    #state{url = Url, limit = Limit, busy = Busy, free = Free} = State,
+    case length(Busy) >= Limit of
+    true ->
+        {noreply, State#state{waiting = queue:in(From, Waiting)}};
+    false ->
+        case Free of
+        [] ->
+           {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
+           Free2 = Free;
+        [Worker | Free2] ->
+           ok
+        end,
+        NewState = State#state{free = Free2, busy = [Worker | Busy]},
+        {reply, {ok, Worker}, NewState}
+    end;
+
+handle_call(stop, _From, State) ->
+    {stop, normal, ok, State}.
+
+
+handle_cast({release_worker, Worker}, #state{waiting = Waiting} = State) ->
+    case is_process_alive(Worker) andalso
+        lists:member(Worker, State#state.busy) of
+    true ->
+        case queue:out(Waiting) of
+        {empty, Waiting2} ->
+            Busy2 = State#state.busy -- [Worker],
+            Free2 = [Worker | State#state.free];
+        {{value, From}, Waiting2} ->
+            gen_server:reply(From, {ok, Worker}),
+            Busy2 = State#state.busy,
+            Free2 = State#state.free
+        end,
+        NewState = State#state{
+           busy = Busy2,
+           free = Free2,
+           waiting = Waiting2
+        },
+        {noreply, NewState};
+   false ->
+        {noreply, State}
+   end.
+
+
+handle_info({'EXIT', Pid, _Reason}, #state{busy = Busy, free = Free} = State) ->
+    case Free -- [Pid] of
+    Free ->
+        case Busy -- [Pid] of
+        Busy ->
+            {noreply, State};
+        Busy2 ->
+            case queue:out(State#state.waiting) of
+            {empty, _} ->
+                {noreply, State#state{busy = Busy2}};
+            {{value, From}, Waiting2} ->
+                {ok, Worker} = ibrowse:spawn_link_worker_process(State#state.url),
+                gen_server:reply(From, {ok, Worker}),
+                {noreply, State#state{busy = [Worker | Busy2], waiting = Waiting2}}
+            end
+        end;
+    Free2 ->
+        {noreply, State#state{free = Free2}}
+    end.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+terminate(_Reason, State) ->
+    lists:foreach(fun ibrowse_http_client:stop/1, State#state.free),
+    lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).
+


Mime
View raw message