couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject svn commit: r802888 - in /couchdb/trunk/src/couchdb: Makefile.am couch_httpd_misc_handlers.erl couch_rep.erl couch_rep_att.erl couch_rep_changes_feed.erl couch_rep_missing_revs.erl couch_rep_reader.erl couch_rep_writer.erl
Date Mon, 10 Aug 2009 18:37:43 GMT
Author: kocolosk
Date: Mon Aug 10 18:37:43 2009
New Revision: 802888

URL: http://svn.apache.org/viewvc?rev=802888&view=rev
Log:
new replicator using _changes feed for continuous replication

Added:
    couchdb/trunk/src/couchdb/couch_rep_att.erl
    couchdb/trunk/src/couchdb/couch_rep_reader.erl
    couchdb/trunk/src/couchdb/couch_rep_writer.erl
Modified:
    couchdb/trunk/src/couchdb/Makefile.am
    couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl
    couchdb/trunk/src/couchdb/couch_rep.erl
    couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
    couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl

Modified: couchdb/trunk/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/Makefile.am?rev=802888&r1=802887&r2=802888&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/Makefile.am (original)
+++ couchdb/trunk/src/couchdb/Makefile.am Mon Aug 10 18:37:43 2009
@@ -74,10 +74,13 @@
     couch_query_servers.erl \
     couch_ref_counter.erl \
     couch_rep.erl \
+    couch_rep_att.erl \
     couch_rep_changes_feed.erl \
     couch_rep_httpc.erl \
     couch_rep_missing_revs.erl \
+    couch_rep_reader.erl \
     couch_rep_sup.erl \
+    couch_rep_writer.erl \
     couch_server.erl \
     couch_server_sup.erl \
     couch_stats_aggregator.erl \
@@ -123,10 +126,13 @@
     couch_query_servers.beam \
     couch_ref_counter.beam \
     couch_rep.beam \
+    couch_rep_att.beam \
     couch_rep_changes_feed.beam \
-    couch_rep_missing_revs.beam \
     couch_rep_httpc.beam \
+    couch_rep_missing_revs.beam \
+    couch_rep_reader.beam \
     couch_rep_sup.beam \
+    couch_rep_writer.beam \
     couch_server.beam \
     couch_server_sup.beam \
     couch_stats_aggregator.beam \

Modified: couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl?rev=802888&r1=802887&r2=802888&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl Mon Aug 10 18:37:43 2009
@@ -77,32 +77,9 @@
 handle_task_status_req(Req) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
-% add trailing slash if missing
-fix_db_url(UrlBin) ->
-    ?l2b(case lists:last(Url = ?b2l(UrlBin)) of
-    $/ -> Url;
-    _  -> Url ++ "/"
-    end).
-
-
-get_rep_endpoint(_Req, {Props}) ->
-    Url = proplists:get_value(<<"url">>, Props),
-    {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
-    Auth = proplists:get_value(<<"auth">>, Props, undefined),
-    ?LOG_DEBUG("AUTH ~p", [Auth]),
-    {remote, fix_db_url(Url), [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], Auth};
-get_rep_endpoint(_Req, <<"http://",_/binary>>=Url) ->
-    {remote, fix_db_url(Url), [], []};
-get_rep_endpoint(_Req, <<"https://",_/binary>>=Url) ->
-    {remote, fix_db_url(Url), [], []};
-get_rep_endpoint(#httpd{user_ctx=UserCtx}, <<DbName/binary>>) ->
-    {local, DbName, UserCtx}.
-
 handle_replicate_req(#httpd{method='POST'}=Req) ->
-    {Props} = couch_httpd:json_body_obj(Req),
-    Source = get_rep_endpoint(Req, proplists:get_value(<<"source">>, Props)),
-    Target = get_rep_endpoint(Req, proplists:get_value(<<"target">>, Props)),
-    case couch_rep:replicate(Source, Target) of
+    PostBody = couch_httpd:json_body_obj(Req),
+    case couch_rep:replicate(PostBody, Req#httpd.user_ctx) of
     {ok, {JsonResults}} ->
         send_json(Req, {[{ok, true} | JsonResults]});
     {error, {Type, Details}} ->

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=802888&r1=802887&r2=802888&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Mon Aug 10 18:37:43 2009
@@ -12,155 +12,97 @@
 
 -module(couch_rep).
 -behaviour(gen_server).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
     code_change/3]).
 
 -export([replicate/2]).
 
--define(BUFFER_NDOCS, 1000).
--define(BUFFER_NATTACHMENTS, 50).
--define(BUFFER_MEMORY, 10000000). %% bytes
-
 -include("couch_db.hrl").
--include("../ibrowse/ibrowse.hrl").
 
-%% @spec replicate(Source::binary(), Target::binary()) ->
-%%      {ok, Stats} | {error, Reason}
-%% @doc Triggers a replication.  Stats is a JSON Object with the following
-%%      keys: session_id (UUID), source_last_seq (integer), and history (array).
-%%      Each element of the history is an Object with keys start_time, end_time,
-%%      start_last_seq, end_last_seq, missing_checked, missing_found, docs_read,
-%%      and docs_written.
-%%
-%%      The supervisor will try to restart the replication in case of any error
-%%      other than shutdown.  Just call this function again to listen for the
-%%      result of the retry.
-replicate(Source, Target) ->
+-record(state, {
+    changes_feed,
+    missing_revs,
+    reader,
+    writer,
 
-    {ok, HostName} = inet:gethostname(),
-    RepId = couch_util:to_hex(
-            erlang:md5(term_to_binary([HostName, Source, Target]))),
-    Args = [?MODULE, [RepId, Source,Target], []],
+    source,
+    target,
+    init_args,
+
+    start_seq,
+    history,
+    source_log,
+    target_log,
+    rep_starttime,
+    src_starttime,
+    tgt_starttime,
+    checkpoint_history = nil,
 
+    listeners = [],
+    complete = false,
+    committed_seq = 0,
+
+    stats = nil
+}).
+
+%% convenience function to do a simple replication from the shell
+replicate(Source, Target) when is_list(Source) ->
+    replicate(?l2b(Source), Target);
+replicate(Source, Target) when is_binary(Source), is_list(Target) ->
+    replicate(Source, ?l2b(Target));
+replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
+    replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{});
+
+%% function handling POST to _replicate
+replicate(PostBody, UserCtx) ->
+    RepId = make_replication_id(PostBody, UserCtx),
     Replicator = {RepId,
-        {gen_server, start_link, Args},
+        {gen_server, start_link, [?MODULE, [RepId, PostBody, UserCtx], []]},
         transient,
         1,
         worker,
         [?MODULE]
     },
 
-    Server = case supervisor:start_child(couch_rep_sup, Replicator) of
-        {ok, Pid} ->
-            ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
-            Pid;
-        {error, already_present} ->
-            case supervisor:restart_child(couch_rep_sup, RepId) of
-                {ok, Pid} ->
-                    ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
-                    Pid;
-                {error, running} ->
-                    %% this error occurs if multiple replicators are racing
-                    %% each other to start and somebody else won.  Just grab
-                    %% the Pid by calling start_child again.
-                    {error, {already_started, Pid}} =
-                        supervisor:start_child(couch_rep_sup, Replicator),
-                    ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
-                    Pid
-            end;
-        {error, {already_started, Pid}} ->
-            ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
-            Pid
-    end,
+    Server = start_replication_server(Replicator),
 
-    case gen_server:call(Server, get_result, infinity) of
-        retry -> replicate(Source, Target);
-        Else -> Else
+    try gen_server:call(Server, get_result, infinity) of
+    retry -> replicate(PostBody, UserCtx);
+    Else -> Else
+    catch
+    exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} ->
+        %% oops, this replication just finished -- restart it.
+        replicate(PostBody, UserCtx);
+    exit:{normal, {gen_server, call, [Server, get_result , infinity]}} ->
+        %% we made the call during terminate
+        replicate(PostBody, UserCtx)
     end.
 
-%%=============================================================================
-%% gen_server callbacks
-%%=============================================================================
-
--record(old_http_db, {
-    uri,
-    headers,
-    oauth
-}).
-
-
--record(state, {
-    context,
-    current_seq,
-    source,
-    target,
-    stats,
-    enum_pid,
-    docs_buffer = [],
-    listeners = [],
-    done = false
-}).
-
-
-init([RepId, Source, Target]) ->
+init([RepId, {PostProps}, UserCtx] = InitArgs) ->
     process_flag(trap_exit, true),
 
-    {ok, DbSrc, SrcName} = open_db(Source),
-    {ok, DbTgt, TgtName} =  open_db(Target),
-
-    DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+    SourceProps = proplists:get_value(<<"source">>, PostProps),
+    TargetProps = proplists:get_value(<<"target">>, PostProps),
 
-    {ok, InfoSrc} = get_db_info(DbSrc),
-    {ok, InfoTgt} = get_db_info(DbTgt),
+    Source = open_db(SourceProps, UserCtx),
+    Target = open_db(TargetProps, UserCtx),
 
-    ReplicationStartTime = httpd_util:rfc1123_date(),
-    SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc),
-    TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt),
-
-    RepRecDocSrc =
-    case open_doc(DbSrc, DocKey, []) of
-    {ok, SrcDoc} ->
-        ?LOG_DEBUG("Found existing replication record on source", []),
-        SrcDoc;
-    _ -> #doc{id=DocKey}
-    end,
-
-    RepRecDocTgt =
-    case open_doc(DbTgt, DocKey, []) of
-    {ok, TgtDoc} ->
-        ?LOG_DEBUG("Found existing replication record on target", []),
-        TgtDoc;
-    _ -> #doc{id=DocKey}
-    end,
-
-    #doc{body={RepRecProps}} = RepRecDocSrc,
-    #doc{body={RepRecPropsTgt}} = RepRecDocTgt,
-
-    case proplists:get_value(<<"session_id">>, RepRecProps) ==
-            proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
-    true ->
-        % if the records have the same session id,
-        % then we have a valid replication history
-        OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
-        OldHistory = proplists:get_value(<<"history">>, RepRecProps, []);
-    false ->
-        ?LOG_INFO("Replication records differ. "
-                "Performing full replication instead of incremental.", []),
-        ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
-                [RepRecProps, RepRecPropsTgt]),
-        OldSeqNum = 0,
-        OldHistory = []
-    end,
+    SourceLog = open_replication_log(Source, RepId),
+    TargetLog = open_replication_log(Target, RepId),
 
-    Context = [
-        {start_seq, OldSeqNum},
-        {history, OldHistory},
-        {rep_starttime, ReplicationStartTime},
-        {src_starttime, SrcInstanceStartTime},
-        {tgt_starttime, TgtInstanceStartTime},
-        {src_record, RepRecDocSrc},
-        {tgt_record, RepRecDocTgt}
-    ],
+    SourceInfo = dbinfo(Source),
+    TargetInfo = dbinfo(Target),
+    
+    {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
+
+    {ok, ChangesFeed} =
+    couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
+    {ok, MissingRevs} =
+    couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
+    {ok, Reader} =
+    couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
+    {ok, Writer} =
+    couch_rep_writer:start_link(self(), Target, Reader, PostProps),
 
     Stats = ets:new(replication_stats, [set, private]),
     ets:insert(Stats, {total_revs,0}),
@@ -169,158 +111,116 @@
     ets:insert(Stats, {docs_written, 0}),
     ets:insert(Stats, {doc_write_failures, 0}),
 
-    couch_task_status:add_task("Replication", <<SrcName/binary, " -> ",
-        TgtName/binary>>, "Starting"),
-
-    Parent = self(),
-    Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end),
+    {ShortId, _} = lists:split(6, RepId),
+    couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
+        [ShortId, dbname(Source), dbname(Target)]), "Starting"),
 
     State = #state{
-        context = Context,
-        current_seq = OldSeqNum,
-        enum_pid = Pid,
-        source = DbSrc,
-        target = DbTgt,
-        stats = Stats
-    },
+        changes_feed = ChangesFeed,
+        missing_revs = MissingRevs,
+        reader = Reader,
+        writer = Writer,
 
-    {ok, State}.
-handle_call(get_result, From, #state{listeners=L,done=true} = State) ->
-    {stop, normal, State#state{listeners=[From|L]}};
-handle_call(get_result, From, #state{listeners=L} = State) ->
-    {noreply, State#state{listeners=[From|L]}};
-
-handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
-    #state{
-        context = Context,
-        current_seq = Seq,
-        docs_buffer = Buffer,
         source = Source,
         target = Target,
-        stats = Stats
-    } = State,
+        init_args = InitArgs,
+        stats = Stats,
 
-    ets:update_counter(Stats, missing_revs, length(Revs)),
+        start_seq = StartSeq,
+        history = History,
+        source_log = SourceLog,
+        target_log = TargetLog,
+        rep_starttime = httpd_util:rfc1123_date(),
+        src_starttime = proplists:get_value(instance_start_time, SourceInfo),
+        tgt_starttime = proplists:get_value(instance_start_time, TargetInfo)
+    },
+    {ok, State}.
 
-    %% get document(s)
-    {ok, DocResults} = open_doc_revs(Source, Id, Revs, [latest]),
-    Docs = [RevDoc || {ok, RevDoc} <- DocResults],
-    ets:update_counter(Stats, docs_read, length(Docs)),
-
-    %% save them (maybe in a buffer)
-    {NewBuffer, NewContext} =
-    case should_flush(lists:flatlength([Docs|Buffer])) of
-        true ->
-            Docs2 = lists:flatten([Docs|Buffer]),
-            try update_docs(Target, Docs2, [], replicated_changes) of
-            {ok, Errors} ->
-                dump_update_errors(Errors),
-                ets:update_counter(Stats, doc_write_failures, length(Errors)),
-                ets:update_counter(Stats, docs_written, length(Docs2) -
-                        length(Errors)),
-                {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
-                {[], Ctxt}
-            catch
-            throw:attachment_write_failed ->
-                ?LOG_ERROR("attachment request failed during write to disk", []),
-                exit({internal_server_error, replication_link_failure})
-            end;
-        false ->
-            {[Docs | Buffer], Context}
-    end,
+handle_call(get_result, From, #state{complete=true, listeners=[]} = State) ->
+    {stop, normal, State#state{listeners=[From]}};
+handle_call(get_result, From, State) ->
+    Listeners = State#state.listeners,
+    {noreply, State#state{listeners=[From|Listeners]}}.
 
-    {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}};
+handle_cast(_Msg, State) ->
+    {noreply, State}.
 
-handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
-    ets:update_counter(State#state.stats, total_revs, RevsCount),
-    case State#state.listeners of
-    [] ->
-        % still waiting for the first listener to send a request
-        {noreply, State#state{current_seq=LastSeq,done=true}};
-    _ ->
-        {stop, normal, ok, State#state{current_seq=LastSeq}}
-    end.
+handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
+    couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
+    {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+
+handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
+        when SourceSeq > N ->
+    MissingRevs = State#state.missing_revs,
+    ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
+    couch_task_status:update("W Processed source update #~p", [SourceSeq]),
+    {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+handle_info({writer_checkpoint, _}, State) ->
+    {noreply, State};
 
-handle_cast({increment_update_seq, Seq}, State) ->
-    couch_task_status:update("Processed source update #~p", [Seq]),
-    {noreply, State#state{current_seq=Seq}}.
+handle_info({update_stats, Key, N}, State) ->
+    ets:update_counter(State#state.stats, Key, N),
+    {noreply, State};
 
-handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) ->
-    ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]),
-    #state{
-        current_seq = Seq,
-        source = Src,
-        target = Tgt,
-        enum_pid = Pid
-    } = State,
-    Parent = self(),
-    NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end),
-    {noreply, State#state{enum_pid=NewPid}};
+handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
+    case State#state.listeners of
+    [] ->
+        {noreply, State#state{complete = true}};
+    _Else ->
+        {stop, normal, State}
+    end;
 
-%% if any linked process dies, respawn the enumerator to get things going again
-handle_info({'EXIT', _From, normal}, State) ->
-    {noreply, State};
-handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) ->
-    ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]),
-    exit(EnumPid, pls_restart_kthxbye),
+handle_info({'EXIT', _, normal}, State) ->
     {noreply, State};
-
-handle_info(_Msg, State) ->
-    {noreply, State}.
+handle_info({'EXIT', Pid, Reason}, State) ->
+    ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]),
+    {stop, Reason, State}.
 
 terminate(normal, State) ->
+    % ?LOG_DEBUG("replication terminating normally", []),
     #state{
-        context = Context,
-        current_seq = Seq,
-        docs_buffer = Buffer,
+        checkpoint_history = CheckpointHistory,
+        committed_seq = NewSeq,
         listeners = Listeners,
         source = Source,
         target = Target,
-        stats = Stats
+        stats = Stats,
+        source_log = #doc{body={OldHistory}}
     } = State,
-
-    try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of
-    {ok, Errors} ->
-        dump_update_errors(Errors),
-        ets:update_counter(Stats, doc_write_failures, length(Errors)),
-        ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
-                length(Errors))
-    catch
-    throw:attachment_write_failed ->
-        ?LOG_ERROR("attachment request failed during final write", []),
-        exit({internal_server_error, replication_link_failure})
-    end,
-
     couch_task_status:update("Finishing"),
-
-    {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats),
     ets:delete(Stats),
     close_db(Target),
+    
+    NewRepHistory = case CheckpointHistory of
+    nil ->
+        {[{<<"no_changes">>, true} | OldHistory]};
+    _Else ->
+        CheckpointHistory
+    end,
 
-    [Original|Rest] = Listeners,
+    %% reply to original requester
+    [Original|OtherListeners] = lists:reverse(Listeners),
     gen_server:reply(Original, {ok, NewRepHistory}),
 
     %% maybe trigger another replication. If this replicator uses a local
     %% source Db, changes to that Db since we started will not be included in
     %% this pass.
-    case up_to_date(Source, Seq) of
+    case up_to_date(Source, NewSeq) of
         true ->
-            [gen_server:reply(R, {ok, NewRepHistory}) || R <- Rest];
+            [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners];
         false ->
-            [gen_server:reply(R, retry) || R <- Rest]
+            [gen_server:reply(R, retry) || R <- OtherListeners]
     end,
     close_db(Source);
+
 terminate(Reason, State) ->
-    ?LOG_ERROR("replicator terminating with reason ~p", [Reason]),
     #state{
         listeners = Listeners,
         source = Source,
         target = Target,
         stats = Stats
     } = State,
-
     [gen_server:reply(L, {error, Reason}) || L <- Listeners],
-
     ets:delete(Stats),
     close_db(Target),
     close_db(Source).
@@ -328,560 +228,284 @@
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%=============================================================================
-%% internal functions
-%%=============================================================================
-
-
-% we should probably write these to a special replication log
-% or have a callback where the caller decides what to do with replication
-% errors.
-dump_update_errors([]) -> ok;
-dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
-    ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p",
-        [Id, couch_doc:rev_to_str(Rev), Error]),
-    dump_update_errors(Rest).
+% internal funs
 
-attachment_loop(ReqId, Conn) ->
-    couch_util:should_flush(),
-    receive
-        {From, {set_req_id, NewId}} ->
-            %% we learn the ReqId to listen for
-            From ! {self(), {ok, NewId}},
-            attachment_loop(NewId, Conn);
-        {ibrowse_async_headers, ReqId, Status, Headers} ->
-            %% we got header, give the controlling process a chance to react
-            receive
-                {From, gimme_status} ->
-                    %% send status/headers to controller
-                    From ! {self(), {status, Status, Headers}},
-                    receive
-                        {From, continue} ->
-                            %% normal case
-                            attachment_loop(ReqId, Conn);
-                        {From, fail} ->
-                            %% error, failure code
-                            ?LOG_ERROR(
-                                "streaming attachment failed with status ~p",
-                                [Status]),
-                            catch ibrowse:stop_worker_process(Conn),
-                            exit(attachment_request_failed);
-                        {From, stop_ok} ->
-                            %% stop looping, controller will start a new loop
-                            catch ibrowse:stop_worker_process(Conn),
-                            stop_ok
-                    end
-            end,
-            attachment_loop(ReqId, Conn);
-        {ibrowse_async_response, ReqId, {chunk_start,_}} ->
-            attachment_loop(ReqId, Conn);
-        {ibrowse_async_response, ReqId, chunk_end} ->
-            attachment_loop(ReqId, Conn);
-        {ibrowse_async_response, ReqId, {error, Err}} ->
-            ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
-            catch ibrowse:stop_worker_process(Conn),
-            exit(attachment_request_failed);
-        {ibrowse_async_response, ReqId, Data} ->
-            receive {From, gimme_data} -> From ! {self(), Data} end,
-            attachment_loop(ReqId, Conn);
-        {ibrowse_async_response_end, ReqId} ->
-            catch ibrowse:stop_worker_process(Conn),
-            exit(normal)
+start_replication_server(Replicator) ->
+    RepId = element(1, Replicator),
+    case supervisor:start_child(couch_rep_sup, Replicator) of
+    {ok, Pid} ->
+        ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
+        Pid;
+    {error, already_present} ->
+        case supervisor:restart_child(couch_rep_sup, RepId) of
+        {ok, Pid} ->
+            ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
+            Pid;
+        {error, running} ->
+            %% this error occurs if multiple replicators are racing
+            %% each other to start and somebody else won.  Just grab
+            %% the Pid by calling start_child again.
+            {error, {already_started, Pid}} =
+                supervisor:start_child(couch_rep_sup, Replicator),
+            ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
+            Pid
+        end;
+    {error, {already_started, Pid}} ->
+        ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
+        Pid
     end.
 
-att_stub_converter(DbS, Id, Rev,
-        #att{name=Name,data=stub,type=Type,len=Length}=Att) ->
-    #old_http_db{uri=DbUrl, headers=Headers} = DbS,
-    {Pos, [RevId|_]} = Rev,
-    Url = lists:flatten([DbUrl, couch_util:url_encode(Id), "/", couch_util:url_encode(?b2l(Name)),
-        "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
-    ?LOG_DEBUG("Attachment URL ~s", [Url]),
-    {ok, RcvFun} = make_att_stub_receiver(Url, Headers, Name,
-        Type, Length),
-    Att#att{name=Name,type=Type,data=RcvFun,len=Length}.
-
-make_att_stub_receiver(Url, Headers, Name, Type, Length) ->
-    make_att_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000).
-
-make_att_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) ->
-    ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s",
-        [Url]),
-    exit({attachment_request_failed, ?l2b(["failed to replicate ", Url])});
-
-make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) ->
-    %% start the process that receives attachment data from ibrowse
-    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
-    {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
-    Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end),
-
-    %% make the async request
-    Opts = [{stream_to, Pid}, {response_format, binary}],
-    ReqId =
-    case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of
-    {ibrowse_req_id, X} ->
-        X;
-    {error, Reason} ->
-        ?LOG_INFO("retrying couch_rep attachment request in ~p " ++
-            "seconds due to {error, ~p}: ~s", [Pause/1000, Reason, Url]),
-        catch ibrowse:stop_worker_process(Conn),
-        timer:sleep(Pause),
-        make_att_stub_receiver(Url, Headers, Name, Type, Length,
-            Retries-1, 2*Pause)
-    end,
-
-    %% tell our receiver about the ReqId it needs to look for
-    Pid ! {self(), {set_req_id, ReqId}},
-    receive
-    {Pid, {ok, ReqId}} ->
-        ok;
-    {'EXIT', Pid, _Reason} ->
-        catch ibrowse:stop_worker_process(Conn),
-        timer:sleep(Pause),
-        make_att_stub_receiver(Url, Headers, Name, Type, Length,
-            Retries-1, 2*Pause)
-    end,
-
-    %% wait for headers to ensure that we have a 200 status code
-    %% this is where we follow redirects etc
-    Pid ! {self(), gimme_status},
-    receive
-    {'EXIT', Pid, attachment_request_failed} ->
-        catch ibrowse:stop_worker_process(Conn),
-        make_att_stub_receiver(Url, Headers, Name, Type, Length,
-            Retries-1, Pause);
-    {Pid, {status, StreamStatus, StreamHeaders}} ->
-        ?LOG_DEBUG("streaming attachment Status ~p Headers ~p",
-            [StreamStatus, StreamHeaders]),
-
-        ResponseCode = list_to_integer(StreamStatus),
-        if
-        ResponseCode >= 200, ResponseCode < 300 ->
-            % the normal case
-            Pid ! {self(), continue},
-            %% this function goes into the streaming attachment code.
-            %% It gets executed by the replication gen_server, so it can't
-            %% be the one to actually receive the ibrowse data.
-            {ok, fun() ->
-                Pid ! {self(), gimme_data},
-                receive
-                    {Pid, Data} ->
-                        Data;
-                    {'EXIT', Pid, attachment_request_failed} ->
-                        throw(attachment_write_failed)
-                end
-            end};
-        ResponseCode >= 300, ResponseCode < 400 ->
-            % follow the redirect
-            Pid ! {self(), stop_ok},
-            RedirectUrl = mochiweb_headers:get_value("Location",
-                mochiweb_headers:make(StreamHeaders)),
-            catch ibrowse:stop_worker_process(Conn),
-            make_att_stub_receiver(RedirectUrl, Headers, Name, Type,
-                Length, Retries - 1, Pause);
-        ResponseCode >= 400, ResponseCode < 500 ->
-            % an error... log and fail
-            ?LOG_ERROR("streaming attachment failed with code ~p: ~s",
-                [ResponseCode, Url]),
-            Pid ! {self(), fail},
-            exit(attachment_request_failed);
-        ResponseCode == 500 ->
-            % an error... log and retry
-            ?LOG_INFO("retrying couch_rep attachment request in ~p " ++
-                "seconds due to 500 response: ~s", [Pause/1000, Url]),
-            Pid ! {self(), fail},
-            catch ibrowse:stop_worker_process(Conn),
-            timer:sleep(Pause),
-            make_att_stub_receiver(Url, Headers, Name, Type, Length,
-                Retries - 1, 2*Pause)
-        end
+compare_replication_logs(SrcDoc, TgtDoc) ->
+    #doc{body={RepRecProps}} = SrcDoc,
+    #doc{body={RepRecPropsTgt}} = TgtDoc,
+    case proplists:get_value(<<"session_id">>, RepRecProps) ==
+            proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
+    true ->
+        % if the records have the same session id,
+        % then we have a valid replication history
+        OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
+        OldHistory = proplists:get_value(<<"history">>, RepRecProps, []),
+        {OldSeqNum, OldHistory};
+    false ->
+        SourceHistory = proplists:get_value(<<"history">>, RepRecProps, []),
+        TargetHistory = proplists:get_value(<<"history">>, RepRecPropsTgt, []),
+        ?LOG_INFO("Replication records differ. "
+                "Scanning histories to find a common ancestor.", []),
+        ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+                [RepRecProps, RepRecPropsTgt]),
+        compare_rep_history(SourceHistory, TargetHistory)
     end.
 
-
-open_db({remote, Url, Headers, Auth})->
-    {ok, #old_http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url};
-open_db({local, DbName, UserCtx})->
-    case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
-    {ok, Db} -> {ok, Db, DbName};
-    Error -> Error
+compare_rep_history(S, T) when length(S) =:= 0 orelse length(T) =:= 0 ->
+    ?LOG_INFO("no common ancestry -- performing full replication", []),
+    {0, []};
+compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) ->
+    SourceId = proplists:get_value(<<"session_id">>, S),
+    case has_session_id(SourceId, Target) of
+    true ->
+        RecordSeqNum = proplists:get_value(<<"recorded_seq">>, S, 0),
+        ?LOG_INFO("found a common replication record with source_seq ~p",
+            [RecordSeqNum]),
+        {RecordSeqNum, SourceRest};
+    false ->
+        TargetId = proplists:get_value(<<"session_id">>, T),
+        case has_session_id(TargetId, SourceRest) of
+        true ->
+            RecordSeqNum = proplists:get_value(<<"recorded_seq">>, T, 0),
+            ?LOG_INFO("found a common replication record with source_seq ~p",
+                [RecordSeqNum]),
+            {RecordSeqNum, TargetRest};
+        false ->
+            compare_rep_history(SourceRest, TargetRest)
+        end
     end.
 
-
-close_db(#old_http_db{})->
+close_db(#http_db{})->
     ok;
 close_db(Db)->
     couch_db:close(Db).
 
-do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
-    ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
-    [
-        {start_seq, StartSeqNum},
-        {history, OldHistory},
-        {rep_starttime, ReplicationStartTime},
-        {src_starttime, SrcInstanceStartTime},
-        {tgt_starttime, TgtInstanceStartTime},
-        {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc},
-        {tgt_record, RepRecDocTgt}
-    ] = Context,
-
-    case NewSeqNum == StartSeqNum andalso OldHistory /= [] of
-    true ->
-        % nothing changed, don't record results
-        {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context};
-    false ->
-        % something changed, record results for incremental replication,
-
-        % commit changes to both src and tgt. The src because if changes
-        % we replicated are lost, we'll record the a seq number ahead
-        % of what was committed. If those changes are lost and the seq number
-        % reverts to a previous committed value, we will skip future changes
-        % when new doc updates are given our already replicated seq nums.
-
-        % commit the src async
-        ParentPid = self(),
-        SrcCommitPid = spawn_link(fun() ->
-                ParentPid ! {self(), ensure_full_commit(Source)} end),
-
-        % commit tgt sync
-        {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
-
-        SrcInstanceStartTime2 =
-        receive
-        {SrcCommitPid, {ok, Timestamp}} ->
-            Timestamp;
-        {'EXIT', SrcCommitPid, {http_request_failed, _}} ->
-            exit(replication_link_failure)
-        end,
-
-        RecordSeqNum =
-        if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
-                TgtInstanceStartTime2 == TgtInstanceStartTime ->
-            NewSeqNum;
-        true ->
-            ?LOG_INFO("A server has restarted sinced replication start. "
-                "Not recording the new sequence number to ensure the "
-                "replication is redone and documents reexamined.", []),
-            StartSeqNum
-        end,
-
-        NewHistoryEntry = {
-            [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
-            {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
-            {<<"start_last_seq">>, StartSeqNum},
-            {<<"end_last_seq">>, NewSeqNum},
-            {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
-            {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
-            {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
-            {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
-            {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)}
-            ]},
-        % limit history to 50 entries
-        HistEntries =lists:sublist([NewHistoryEntry |  OldHistory], 50),
-
-        NewRepHistory =
-                {[{<<"session_id">>, couch_util:new_uuid()},
-                  {<<"source_last_seq">>, RecordSeqNum},
-                  {<<"history">>, HistEntries}]},
-
-        {ok, {SrcRevPos,SrcRevId}} = update_doc(Source,
-                RepRecDocSrc#doc{body=NewRepHistory}, []),
-        {ok, {TgtRevPos,TgtRevId}} = update_doc(Target,
-                RepRecDocTgt#doc{body=NewRepHistory}, []),
-
-        NewContext = [
-            {start_seq, StartSeqNum},
-            {history, OldHistory},
-            {rep_starttime, ReplicationStartTime},
-            {src_starttime, SrcInstanceStartTime},
-            {tgt_starttime, TgtInstanceStartTime},
-            {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}},
-            {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}}
-        ],
-
-        {ok, NewRepHistory, NewContext}
-
+dbname(#http_db{} = Db) ->
+    Db#http_db.url;
+dbname(Db) ->
+    Db#db.name.
+
+dbinfo(#http_db{} = Db) ->
+    {DbProps} = couch_rep_httpc:request(Db),
+    [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps];
+dbinfo(Db) ->
+    {ok, Info} = couch_db:get_db_info(Db),
+    Info.
+
+has_session_id(_SessionId, []) ->
+    false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+    case proplists:get_value(<<"session_id">>, Props, nil) of
+    SessionId ->
+        true;
+    _Else ->
+        has_session_id(SessionId, Rest)
     end.
 
-do_http_request(Url, Action, Headers, Auth) ->
-    do_http_request(Url, Action, Headers, Auth, []).
-
-do_http_request(Url, Action, Headers, Auth, JsonBody) ->
-    Headers0 = case Auth of
-        {Props} ->
-            % Add OAuth header
-            {OAuth} = proplists:get_value(<<"oauth">>, Props),
-            ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, OAuth)),
-            Token = ?b2l(proplists:get_value(<<"token">>, OAuth)),
-            TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, OAuth)),
-            ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, OAuth)),
-            Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1},
-            Method = case Action of
-                get -> "GET";
-                post -> "POST";
-                put -> "PUT"
-            end,
-            Params = oauth:signed_params(Method, Url, [], Consumer, Token, TokenSecret),
-            [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)} | Headers];
-        _Else ->
-            Headers
-    end,
-    do_http_request0(Url, Action, Headers0, JsonBody, 10, 1000).
-
-do_http_request0(Url, Action, Headers, Body, Retries, Pause) when is_binary(Url) ->
-    do_http_request0(?b2l(Url), Action, Headers, Body, Retries, Pause);
-do_http_request0(Url, Action, _Headers, _JsonBody, 0, _Pause) ->
-    ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
-        [Action, Url]),
-    exit({http_request_failed, ?l2b(["failed to replicate ", Url])});
-do_http_request0(Url, Action, Headers, JsonBody, Retries, Pause) ->
-    ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
-    Body =
-    case JsonBody of
-    [] ->
-        <<>>;
+make_replication_id({Props}, UserCtx) ->
+    %% funky algorithm to preserve backwards compatibility
+    {ok, HostName} = inet:gethostname(),
+    Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)),
+    Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)),    
+    couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))).
+
+maybe_add_trailing_slash(Url) ->
+    re:replace(Url, "[^/]$", "&/", [{return, list}]).
+
+get_rep_endpoint(_UserCtx, {Props}) ->
+    Url = maybe_add_trailing_slash(proplists:get_value(<<"url">>, Props)),
+    {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
+    {Auth} = proplists:get_value(<<"auth">>, Props, {[]}),
+    case proplists:get_value(<<"oauth">>, Auth) of
+    undefined ->
+        {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+    {OAuth} ->
+        {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
+    end;
+get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) ->
+    {remote, maybe_add_trailing_slash(Url), []};
+get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) ->
+    {remote, maybe_add_trailing_slash(Url), []};
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+    {local, DbName, UserCtx}.
+
+open_replication_log(#http_db{}=Db, RepId) ->
+    DocId = ?LOCAL_DOC_PREFIX ++ RepId,
+    Req = Db#http_db{resource=couch_util:url_encode(DocId)},
+    case couch_rep_httpc:request(Req) of
+    {[{<<"error">>, _}, {<<"reason">>, _}]} ->
+        ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
+        #doc{id=?l2b(DocId)};
+    Doc ->
+        ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
+        couch_doc:from_json_obj(Doc)
+    end;
+open_replication_log(Db, RepId) ->
+    DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+    case couch_db:open_doc(Db, DocId, []) of
+    {ok, Doc} ->
+        ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
+        Doc;
     _ ->
-        iolist_to_binary(?JSON_ENCODE(JsonBody))
-    end,
-    Options = case Action of
-        get -> [];
-        _ -> [{transfer_encoding, {chunked, 65535}}]
-    end ++ [
-        {content_type, "application/json; charset=utf-8"},
-        {max_pipeline_size, 101},
-        {response_format, binary}
-    ],
-    case ibrowse:send_req(Url, Headers, Action, Body, Options, infinity) of
-    {ok, Status, ResponseHeaders, ResponseBody} ->
-        ResponseCode = list_to_integer(Status),
-        if
-        ResponseCode >= 200, ResponseCode < 300 ->
-            ?JSON_DECODE(ResponseBody);
-        ResponseCode >= 300, ResponseCode < 400 ->
-            RedirectUrl = mochiweb_headers:get_value("Location",
-                mochiweb_headers:make(ResponseHeaders)),
-            do_http_request0(RedirectUrl, Action, Headers, JsonBody, Retries-1,
-                Pause);
-        ResponseCode >= 400, ResponseCode < 500 ->
-            ?JSON_DECODE(ResponseBody);
-        ResponseCode == 500 ->
-            ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds " ++
-                "due to 500 error: ~s", [Action, Pause/1000, Url]),
-            timer:sleep(Pause),
-            do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause)
-        end;
-    {error, Reason} ->
-        ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds due to " ++
-            "{error, ~p}: ~s", [Action, Pause/1000, Reason, Url]),
-        timer:sleep(Pause),
-        do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause)
+        ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
+        #doc{id=DocId}
     end.
 
-ensure_full_commit(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
-    {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post,
-        Headers, OAuth, true),
-    true = proplists:get_value(<<"ok">>, ResultProps),
-    {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
-ensure_full_commit(Db) ->
-    couch_db:ensure_full_commit(Db).
-
-enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
-    case get_doc_info_list(DbSource, StartSeq) of
-    [] ->
-        gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity);
-    DocInfoList ->
-        SrcRevsList = lists:map(fun(#doc_info{id=Id,revs=RevInfos}) ->
-            SrcRevs = [Rev || #rev_info{rev=Rev} <- RevInfos],
-            {Id, SrcRevs}
-        end, DocInfoList),
-        {ok, MissingRevs} = get_missing_revs(DbTarget, SrcRevsList),
-
-        %% do we need to check for success here?
-        [gen_server:call(Pid, {replicate_doc, Info}, infinity)
-            || Info <- MissingRevs ],
-
-        #doc_info{high_seq=LastSeq} = lists:last(DocInfoList),
-        RevsCount2 = RevsCount + length(SrcRevsList),
-        gen_server:cast(Pid, {increment_update_seq, LastSeq}),
-
-        enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
-    end.
-
-
-
-get_db_info(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
-    {DbProps} = do_http_request(DbUrl, get, Headers, OAuth),
-    {ok, [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]};
-get_db_info(Db) ->
-    couch_db:get_db_info(Db).
-
-get_doc_info_list(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) ->
-    Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey="
-        ++ integer_to_list(StartSeq),
-    {Results} = do_http_request(Url, get, Headers, OAuth),
-    lists:map(fun({RowInfoList}) ->
-        {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList),
-        Seq = proplists:get_value(<<"key">>, RowInfoList),
-        Revs =
-            [#rev_info{rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)} |
-                [#rev_info{rev=Rev,deleted=false} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, []))] ++
-                [#rev_info{rev=Rev,deleted=true} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []))]],
-        #doc_info{
-            id=proplists:get_value(<<"id">>, RowInfoList),
-            high_seq = Seq,
-            revs = Revs
-        }
-    end, proplists:get_value(<<"rows">>, Results));
-get_doc_info_list(DbSource, StartSeq) ->
-    {ok, {_Count, DocInfoList}} = couch_db:enum_docs_since(DbSource, StartSeq,
-    fun (_, _, {100, DocInfoList}) ->
-            {stop, {100, DocInfoList}};
-        (DocInfo, _, {Count, DocInfoList}) ->
-            {ok, {Count+1, [DocInfo|DocInfoList]}}
-    end, {0, []}),
-    lists:reverse(DocInfoList).
-
-get_missing_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) ->
-    DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList],
-    {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, OAuth,
-            {DocIdRevsList2}),
-    {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
-    DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList],
-    {ok, DocMissingRevsList2};
-get_missing_revs(Db, DocId) ->
-    couch_db:get_missing_revs(Db, DocId).
-
-
-open_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) ->
-    [] = Options,
-    case do_http_request(DbUrl ++ couch_util:url_encode(DocId), get, Headers, OAuth) of
-    {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
-        {couch_util:to_existing_atom(ErrId), Reason};
-    Doc  ->
-        {ok, couch_doc:from_json_obj(Doc)}
+open_db({Props}, _UserCtx) ->
+    Url = maybe_add_trailing_slash(proplists:get_value(<<"url">>, Props)),
+    {AuthProps} = proplists:get_value(<<"auth">>, Props, {[]}),
+    {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
+    Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders],
+    DefaultHeaders = (#http_db{})#http_db.headers,
+    Db = #http_db{
+        url = Url,
+        auth = AuthProps,
+        headers = lists:ukeymerge(1, Headers, DefaultHeaders)
+    },
+    case couch_rep_httpc:db_exists(Db) of
+    true -> Db;
+    false -> throw({db_not_found, Url})
     end;
-open_doc(Db, DocId, Options) ->
-    couch_db:open_doc(Db, DocId, Options).
-
-open_doc_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0,
-        [latest]) ->
-    Revs = couch_doc:rev_to_strs(Revs0),
-    BaseUrl = DbUrl ++ couch_util:url_encode(DocId) ++ "?revs=true&latest=true",
-
-    %% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests
-    MaxN = trunc((8192 - length(BaseUrl))/14),
-
-    JsonResults = case length(Revs) > MaxN of
-    false ->
-        Url = ?l2b(BaseUrl ++ "&open_revs=" ++ ?JSON_ENCODE(Revs)),
-        do_http_request(Url, get, Headers, OAuth);
-    true ->
-        {_, Rest, Acc} = lists:foldl(
-        fun(Rev, {Count, RevsAcc, AccResults}) when Count =:= MaxN ->
-            QSRevs = ?JSON_ENCODE(lists:reverse(RevsAcc)),
-            Url = ?l2b(BaseUrl ++ "&open_revs=" ++ QSRevs),
-            {1, [Rev], AccResults++do_http_request(Url, get, Headers, OAuth)};
-        (Rev, {Count, RevsAcc, AccResults}) ->
-            {Count+1, [Rev|RevsAcc], AccResults}
-        end, {0, [], []}, Revs),
-        Acc ++ do_http_request(?l2b(BaseUrl ++ "&open_revs=" ++
-            ?JSON_ENCODE(lists:reverse(Rest))), get, Headers, OAuth)
-    end,
+open_db(<<"http://",_/binary>>=Url, _) ->
+    open_db({[{<<"url">>,Url}]}, []);
+open_db(<<"https://",_/binary>>=Url, _) ->
+    open_db({[{<<"url">>,Url}]}, []);
+open_db(<<DbName/binary>>, UserCtx) ->
+    case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+    {ok, Db} -> Db;
+    {not_found, no_db_file} -> throw({db_not_found, DbName})
+    end.
 
-    Results =
-    lists:map(
-        fun({[{<<"missing">>, Rev}]}) ->
-            {{not_found, missing}, couch_doc:parse_rev(Rev)};
-        ({[{<<"ok">>, JsonDoc}]}) ->
-        #doc{id=Id, revs=Rev, atts=Atts} = Doc =
-            couch_doc:from_json_obj(JsonDoc),
-        {ok, Doc#doc{atts=[att_stub_converter(DbS,Id,Rev,A) || A <- Atts]}}
-        end, JsonResults),
-    {ok, Results};
-open_doc_revs(Db, DocId, Revs, Options) ->
-    couch_db:open_doc_revs(Db, DocId, Revs, Options).
-
-%% @spec should_flush() -> true | false
-%% @doc Calculates whether it's time to flush the document buffer. Considers
-%%        - memory utilization
-%%        - number of pending document writes
-%%        - approximate number of pending attachment writes
-should_flush(DocCount) when DocCount > ?BUFFER_NDOCS ->
-    true;
-should_flush(_DocCount) ->
-    MeAndMyLinks = [self()|
-        [P || P <- element(2,process_info(self(),links)), is_pid(P)]],
+do_checkpoint(State) ->
+    #state{
+        source = Source,
+        target = Target,
+        committed_seq = NewSeqNum,
+        start_seq = StartSeqNum,
+        history = OldHistory,
+        source_log = SourceLog,
+        target_log = TargetLog,
+        rep_starttime = ReplicationStartTime,
+        src_starttime = SrcInstanceStartTime,
+        tgt_starttime = TgtInstanceStartTime,
+        stats = Stats
+    } = State,
+    ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
+    RecordSeqNum = case commit_to_both(Source, Target) of
+    {SrcInstanceStartTime, TgtInstanceStartTime} ->
+        NewSeqNum;
+    _Else ->
+        ?LOG_INFO("A server has restarted sinced replication start. "
+            "Not recording the new sequence number to ensure the "
+            "replication is redone and documents reexamined.", []),
+        StartSeqNum
+    end,
+    SessionId = couch_util:new_uuid(),
+    NewHistoryEntry = {[
+        {<<"session_id">>, SessionId},
+        {<<"start_time">>, list_to_binary(ReplicationStartTime)},
+        {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+        {<<"start_last_seq">>, StartSeqNum},
+        {<<"end_last_seq">>, NewSeqNum},
+        {<<"recorded_seq">>, RecordSeqNum},
+        {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+        {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+        {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+        {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+        {<<"doc_write_failures">>, 
+            ets:lookup_element(Stats, doc_write_failures, 2)}
+    ]},
+    % limit history to 50 entries
+    NewRepHistory = {[
+        {<<"session_id">>, SessionId},
+        {<<"source_last_seq">>, RecordSeqNum},
+        {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
+    ]},
+    % ?LOG_DEBUG("updating src doc ~p", [SourceLog]),
+    {SrcRevPos,SrcRevId} =
+        update_doc(Source, SourceLog#doc{body=NewRepHistory}, []),
+    % ?LOG_DEBUG("updating tgt doc ~p", [TargetLog]),
+    {TgtRevPos,TgtRevId} =
+        update_doc(Target, TargetLog#doc{body=NewRepHistory}, []),
+    State#state{
+        checkpoint_history = NewRepHistory,
+        source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+        target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+    }.
+
+commit_to_both(Source, Target) ->
+    % commit the src async
+    ParentPid = self(),
+    SrcCommitPid = spawn_link(fun() ->
+            ParentPid ! {self(), ensure_full_commit(Source)} end),
 
-    case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of
-    true -> true;
-    false ->
-        case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of
-        true ->
-            [garbage_collect(Pid) || Pid <- MeAndMyLinks],
-            memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY;
-        false -> false
-        end
-    end.
+    % commit tgt sync
+    TargetStartTime = ensure_full_commit(Target),
 
-%% @spec memory_footprint([pid()]) -> integer()
-%% @doc Sum of process and binary memory utilization for all processes in list
-memory_footprint(PidList) ->
-    memory_footprint(PidList, {0,0}).
-
-memory_footprint([], {ProcessMemory, BinaryMemory}) ->
-    ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]),
-    ProcessMemory + BinaryMemory;
-memory_footprint([Pid|Rest], {ProcAcc, BinAcc}) ->
-    case is_process_alive(Pid) of
-    true ->
-        ProcMem = element(2,process_info(Pid, memory)),
-        BinMem = binary_memory(Pid),
-        memory_footprint(Rest, {ProcMem + ProcAcc, BinMem + BinAcc});
-    false ->
-        memory_footprint(Rest, {ProcAcc, BinAcc})
-    end.
+    SourceStartTime =
+    receive
+    {SrcCommitPid, Timestamp} ->
+        Timestamp;
+    {'EXIT', SrcCommitPid, {http_request_failed, _}} ->
+        exit(replication_link_failure)
+    end,
+    {SourceStartTime, TargetStartTime}.
+    
+ensure_full_commit(#http_db{} = Db) ->
+    Req = Db#http_db{
+        resource = "_ensure_full_commit",
+        method = post,
+        body = true
+    },
+    {ResultProps} = couch_rep_httpc:request(Req),
+    true = proplists:get_value(<<"ok">>, ResultProps),
+    proplists:get_value(<<"instance_start_time">>, ResultProps);
+ensure_full_commit(Db) ->
+    {ok, StartTime} = couch_db:ensure_full_commit(Db),
+    StartTime.
 
-%% @spec binary_memory(pid()) -> integer()
-%% @doc Memory utilization of all binaries referenced by this process.
-binary_memory(Pid) ->
-    lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
-        0, element(2,process_info(Pid, binary))).
-
-update_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) ->
-    [] = Options,
-    Url = DbUrl ++ couch_util:url_encode(DocId),
-    {ResponseMembers} = do_http_request(Url, put, Headers, OAuth,
-            couch_doc:to_json_obj(Doc, [attachments])),
+update_doc(#http_db{} = Db, #doc{id=DocId} = Doc, []) ->
+    Req = Db#http_db{
+        resource = couch_util:url_encode(DocId),
+        method = put,
+        body = couch_doc:to_json_obj(Doc, [attachments])
+    },
+    {ResponseMembers} = couch_rep_httpc:request(Req),
     Rev = proplists:get_value(<<"rev">>, ResponseMembers),
-    {ok, couch_doc:parse_rev(Rev)};
+    couch_doc:parse_rev(Rev);
 update_doc(Db, Doc, Options) ->
-    couch_db:update_doc(Db, Doc, Options).
+    {ok, Result} = couch_db:update_doc(Db, Doc, Options),
+    Result.
 
-update_docs(_, [], _, _) ->
-    {ok, []};
-update_docs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) ->
-    JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
-    ErrorsJson =
-        do_http_request(DbUrl ++ "_bulk_docs", post, Headers, OAuth,
-                {[{new_edits, false}, {docs, JsonDocs}]}),
-    ErrorsList =
-    lists:map(
-        fun({Props}) ->
-            Id = proplists:get_value(<<"id">>, Props),
-            Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
-            ErrId = couch_util:to_existing_atom(
-                    proplists:get_value(<<"error">>, Props)),
-            Reason = proplists:get_value(<<"reason">>, Props),
-            Error = {ErrId, Reason},
-            {{Id, Rev}, Error}
-        end, ErrorsJson),
-    {ok, ErrorsList};
-update_docs(Db, Docs, Options, UpdateType) ->
-    couch_db:update_docs(Db, Docs, Options, UpdateType).
-
-up_to_date(#old_http_db{}, _Seq) ->
+up_to_date(#http_db{}, _Seq) ->
     true;
 up_to_date(Source, Seq) ->
     {ok, NewDb} = couch_db:open(Source#db.name, []),
     T = NewDb#db.update_seq == Seq,
     couch_db:close(NewDb),
     T.
-

Added: couchdb/trunk/src/couchdb/couch_rep_att.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_att.erl?rev=802888&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_att.erl (added)
+++ couchdb/trunk/src/couchdb/couch_rep_att.erl Mon Aug 10 18:37:43 2009
@@ -0,0 +1,100 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_att).
+
+-export([convert_stub/2, cleanup/0]).
+
+-include("couch_db.hrl").
+
+convert_stub(#att{data=stub} = Attachment, {#http_db{} = Db, Id, Rev}) ->
+    {Pos, [RevId|_]} = Rev,
+    Name = Attachment#att.name,
+    Request = Db#http_db{
+        resource = lists:flatten([couch_util:url_encode(Id), "/",
+            couch_util:url_encode(Name)]),
+        qs = [{rev, couch_doc:rev_to_str({Pos,RevId})}]
+    },
+    Ref = make_ref(),
+    RcvFun = fun() -> attachment_receiver(Ref, Request) end,
+    Attachment#att{data=RcvFun}.
+
+cleanup() ->
+    receive 
+    {ibrowse_async_response, _, _} ->
+        %% TODO maybe log, didn't expect to have data here
+        cleanup();
+    {ibrowse_async_response_end, _} -> 
+        cleanup()
+    after 0 ->
+        erase(),
+        ok
+    end.
+        
+% internal funs
+
+attachment_receiver(Ref, Request) ->
+    case get(Ref) of
+    undefined ->
+        ReqId = start_http_request(Request),
+        put(Ref, ReqId),
+        receive_data(Ref, ReqId);
+    ReqId ->
+        receive_data(Ref, ReqId)
+    end.
+
+receive_data(Ref, ReqId) ->
+    receive
+    {ibrowse_async_response, ReqId, {chunk_start,_}} ->
+        receive_data(Ref, ReqId);
+    {ibrowse_async_response, ReqId, chunk_end} ->
+        receive_data(Ref, ReqId);
+    {ibrowse_async_response, ReqId, {error, Err}} ->
+        ?LOG_ERROR("streaming attachment ~p failed with ~p", [ReqId, Err]),
+        throw({attachment_request_failed, Err});
+    {ibrowse_async_response, ReqId, Data} ->
+        % ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]),
+        Data;
+    {ibrowse_async_response_end, ReqId} ->
+        ?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]),
+        throw({attachment_request_failed, premature_end})
+    end.
+
+start_http_request(Req) ->
+    %% set stream_to here because self() has changed
+    Req2 = Req#http_db{options = [{stream_to,self()} | Req#http_db.options]},
+    {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req2),
+    receive {ibrowse_async_headers, ReqId, Code, Headers} ->
+        case validate_headers(Req2, list_to_integer(Code), Headers) of
+        ok ->
+            ReqId;
+        {ok, NewReqId} ->
+            NewReqId
+        end
+    end.
+
+validate_headers(_Req, 200, _Headers) ->
+    ok;
+validate_headers(Req, Code, Headers) when Code > 299, Code < 400 ->
+    %% TODO check that the qs is actually included in the Location header
+    %% TODO this only supports one level of redirection
+    Url = mochiweb_headers:get_value("Location",mochiweb_headers:make(Headers)),
+    NewReq = Req#http_db{url=Url, resource="", qs=[]},
+    {ibrowse_req_id, ReqId} = couch_rep_httpc:request(NewReq),
+    receive {ibrowse_async_headers, ReqId, NewCode, NewHeaders} ->
+        ok = validate_headers(NewReq, list_to_integer(NewCode), NewHeaders)
+    end,
+    {ok, ReqId};
+validate_headers(Req, Code, _Headers) ->
+    #http_db{url=Url, resource=Resource} = Req,
+    ?LOG_ERROR("got ~p for ~s~s", [Code, Url, Resource]),
+    throw({attachment_request_failed, {bad_code, Code}}).

Modified: couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl?rev=802888&r1=802887&r2=802888&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl Mon Aug 10 18:37:43 2009
@@ -47,6 +47,7 @@
     gen_server:call(Server, stop).
 
 init([_Parent, #http_db{}=Source, Since, PostProps]) ->
+    process_flag(trap_exit, true),
     Feed = case proplists:get_value(<<"continuous">>, PostProps, false) of
     false ->
         normal;

Modified: couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl?rev=802888&r1=802887&r2=802888&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_missing_revs.erl Mon Aug 10 18:37:43 2009
@@ -51,19 +51,17 @@
     {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}.
 
 handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) ->
+    State#state.parent ! {update_stats, missing_revs, length(Revs)},
     handle_add_missing_revs(HighSeq, Revs, From, State);
 
 handle_call(next_missing_revs, From, State) ->
-    handle_next_missing_revs(From, State);
+    handle_next_missing_revs(From, State).
 
-handle_call({update_committed_seq, N}, _From, State) ->
+handle_cast({update_committed_seq, N}, State) ->
     if State#state.high_committed_seq < N ->
         ?LOG_DEBUG("missing_revs updating committed seq to ~p", [N]);
     true -> ok end,
-    {reply, ok, State#state{high_committed_seq=N}}.
-
-handle_cast(_Msg, State) ->
-    {noreply, State}.
+    {noreply, State#state{high_committed_seq=N}}.
 
 handle_info({'EXIT', Pid, Reason}, #state{changes_loop=Pid} = State) ->
     handle_changes_loop_exit(Reason, State);
@@ -84,8 +82,9 @@
 %internal funs
 
 handle_add_missing_revs(HighSeq, [], _From, State) ->
-    maybe_checkpoint(State),
-    {reply, ok, State#state{high_source_seq=HighSeq}};
+    NewState = State#state{high_source_seq=HighSeq},
+    maybe_checkpoint(NewState),
+    {reply, ok, NewState};
 handle_add_missing_revs(HighSeq, Revs, From, #state{reply_to=nil} = State) ->
     #state{rows=Rows, count=Count} = State,
     NewState = State#state{

Added: couchdb/trunk/src/couchdb/couch_rep_reader.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_reader.erl?rev=802888&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_reader.erl (added)
+++ couchdb/trunk/src/couchdb/couch_rep_reader.erl Mon Aug 10 18:37:43 2009
@@ -0,0 +1,268 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_reader).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
+    code_change/3]).
+
+-export([start_link/4, next/1]).
+
+-import(couch_util, [url_encode/1]).
+
+-define (BUFFER_SIZE, 1000).
+-define (MAX_CONCURRENT_REQUESTS, 100).
+-define (MAX_CONNECTIONS, 20).
+-define (MAX_PIPELINE_SIZE, 50).
+
+-include("couch_db.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-record (state, {
+    parent,
+    source,
+    missing_revs,
+    reader_loop,
+    reader_from = nil,
+    count = 0,
+    docs = queue:new(),
+    reply_to = nil,
+    complete = false,
+    monitor_count = 0,
+    monitor_count_by_seq = ets:new(monitor_count_by_seq, [set, private]),
+    monitors_by_ref = ets:new(monitors_by_ref, [set, private]),
+    pending_doc_request = nil,
+    high_missing_seq = 0
+}).
+
+start_link(Parent, Source, MissingRevs, PostProps) ->
+    gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []).
+
+next(Pid) ->
+    gen_server:call(Pid, next_docs, infinity).
+
+init([Parent, Source, MissingRevs, _PostProps]) ->
+    process_flag(trap_exit, true),
+    if is_record(Source, http_db) ->
+        #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url),
+        ibrowse:set_max_sessions(Host, Port, ?MAX_CONNECTIONS),
+        ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
+    true -> ok end,
+    Self = self(),
+    ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),
+    State = #state{
+        parent = Parent,
+        source = Source,
+        missing_revs = MissingRevs,
+        reader_loop = ReaderLoop
+    },
+    {ok, State}.
+
+handle_call({add_docs, Docs}, From, State) ->
+    State#state.parent ! {update_stats, docs_read, length(Docs)},
+    handle_add_docs(lists:flatten(Docs), From, State);
+
+handle_call(next_docs, From, State) ->
+    handle_next_docs(From, State);
+
+handle_call({open_doc_revs, Id, Revs, HighSeq}, From, State) ->
+    handle_open_doc_revs(Id, Revs, HighSeq, From, State);
+
+handle_call({set_monitor_count, Seq, Count}, _From, State) ->
+    ets:insert(State#state.monitor_count_by_seq, {Seq,Count}),
+    {reply, ok, State};
+
+handle_call({update_high_seq, HighSeq}, _From, State) ->
+    {reply, ok, State#state{high_missing_seq=HighSeq}}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({'DOWN', Ref, _, _, Reason}, State) ->
+    handle_monitor_down(Reason, Ref, State);
+
+handle_info({'EXIT', Loop, complete}, #state{reader_loop=Loop} = State) ->
+    handle_reader_loop_complete(State).
+
+terminate(Reason, _State) ->
+    % ?LOG_INFO("rep reader terminating with reason ~p", [Reason]),
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%internal funs
+
+handle_add_docs(DocsToAdd, From, #state{reply_to=nil} = State) ->
+    NewState = State#state{
+        docs = queue:join(State#state.docs, queue:from_list(DocsToAdd)),
+        count = State#state.count + length(DocsToAdd)
+    },
+    if NewState#state.count < ?BUFFER_SIZE ->
+        {reply, ok, NewState};
+    true ->
+        {noreply, NewState#state{reader_from=From}}
+    end;
+handle_add_docs(DocsToAdd, _From, #state{count=0} = State) ->
+    HighSeq = State#state.high_missing_seq,
+    gen_server:reply(State#state.reply_to, {HighSeq, DocsToAdd}),
+    {reply, ok, State#state{reply_to=nil}}.
+
+handle_next_docs(From, #state{count=0} = State) ->
+    if State#state.complete ->
+        {stop, normal, {complete, State#state.high_missing_seq}, State};
+    true ->
+        {noreply, State#state{reply_to=From}}
+    end;
+handle_next_docs(_From, State) ->
+    #state{
+        reader_from = ReaderFrom,
+        docs = Docs,
+        high_missing_seq = HighSeq
+    } = State,
+    if ReaderFrom =/= nil ->
+        gen_server:reply(ReaderFrom, ok);
+    true -> ok end,
+    NewState = State#state{count=0, reader_from=nil, docs=queue:new()},
+    {reply, {HighSeq, queue:to_list(Docs)}, NewState}.
+
+handle_open_doc_revs(Id, Revs, Seq, From, #state{monitor_count=N} = State)
+        when N > ?MAX_CONCURRENT_REQUESTS ->
+    {noreply, State#state{pending_doc_request={From,Id,Revs,Seq}}};
+handle_open_doc_revs(Id, Revs, Seq, _From, #state{source=#http_db{}} = State) ->
+    #state{
+        monitor_count = Count,
+        monitors_by_ref = MonitorsByRef,
+        source = Source
+    } = State,
+    {_, Ref} = spawn_document_request(Source, Id, Revs),
+    ets:insert(MonitorsByRef, {Ref, Seq}),
+    {reply, ok, State#state{monitor_count = Count+1}}.
+
+handle_monitor_down(normal, Ref, #state{pending_doc_request=nil,
+        monitor_count=1, complete=waiting_on_monitors} = State) ->
+    N = calculate_new_high_seq(State, Ref),
+    {noreply, State#state{complete=true, monitor_count=0, high_missing_seq=N}};
+handle_monitor_down(normal, Ref, #state{pending_doc_request=nil} = State) ->
+    #state{monitor_count = Count} = State,
+    HighSeq = calculate_new_high_seq(State, Ref),
+    {noreply, State#state{monitor_count = Count-1, high_missing_seq=HighSeq}};
+handle_monitor_down(normal, Ref, State) ->
+    #state{
+        source = Source,
+        monitors_by_ref = MonitorsByRef,
+        pending_doc_request = {From, Id, Revs, Seq}
+    } = State,
+    HighSeq = calculate_new_high_seq(State, Ref),
+    gen_server:reply(From, ok),
+    {_, NewRef} = spawn_document_request(Source, Id, Revs),
+    ets:insert(MonitorsByRef, {NewRef, Seq}),
+    {noreply, State#state{pending_doc_request=nil, high_missing_seq=HighSeq}};
+handle_monitor_down(Reason, _, State) ->
+    {stop, Reason, State}.
+
+handle_reader_loop_complete(#state{reply_to=nil, monitor_count=0} = State) ->
+    {noreply, State#state{complete = true}};
+handle_reader_loop_complete(#state{monitor_count=0} = State) ->
+    HighSeq = State#state.high_missing_seq,
+    gen_server:reply(State#state.reply_to, {complete, HighSeq}),
+    {stop, normal, State};
+handle_reader_loop_complete(State) ->
+    {noreply, State#state{complete = waiting_on_monitors}}.
+
+split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
+    case Length+size(Rev) > 8192 of
+    false ->
+        {[[Rev|CurrentAcc] | Rest], BaseLength, Length+size(Rev)};
+    true ->
+        {[[Rev],CurrentAcc|Rest], BaseLength, BaseLength}
+    end.
+
+open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
+    %% all this logic just splits up revision lists that are too long for
+    %% MochiWeb into multiple requests
+    BaseQS = [{revs,true}, {latest,true}],
+    BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS},
+    BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs=
+
+    {RevLists, _, _} = lists:foldl(fun split_revlist/2,
+        {[[]], BaseLength, BaseLength}, couch_doc:rev_to_strs(Revs)),
+
+    Requests = [BaseReq#http_db{
+        qs = [{open_revs, ?JSON_ENCODE(RevList)} | BaseQS]
+    } || RevList <- RevLists],
+    JsonResults = lists:flatten([couch_rep_httpc:request(R) || R <- Requests]),
+
+    Transform =
+    fun({[{<<"missing">>, Rev}]}) ->
+        {{not_found, missing}, couch_doc:parse_rev(Rev)};
+    ({[{<<"ok">>, Json}]}) ->
+        #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
+        Doc#doc{atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]}
+    end,
+    [Transform(Result) || Result <- JsonResults].
+
+reader_loop(ReaderServer, Source, MissingRevsServer) ->
+    case couch_rep_missing_revs:next(MissingRevsServer) of
+    complete ->
+        % ?LOG_INFO("reader_loop terminating with complete", []),
+        exit(complete);
+    {HighSeq, IdsRevs} ->
+        % ?LOG_DEBUG("got IdsRevs ~p", [IdsRevs]),
+        case Source of
+        #http_db{} ->
+            N = length(IdsRevs),
+            gen_server:call(ReaderServer, {set_monitor_count, HighSeq, N}),
+            [gen_server:call(ReaderServer, {open_doc_revs, Id, Revs, HighSeq})
+                || {Id,Revs} <- IdsRevs];
+        _Local ->
+            lists:foreach(fun({Id,Revs}) ->
+                {ok, Docs} = couch_db:open_doc_revs(Source, Id, Revs, [latest]),
+                JustTheDocs = [Doc || {ok, Doc} <- Docs],
+                gen_server:call(ReaderServer, {add_docs, JustTheDocs})
+            end, IdsRevs),
+            gen_server:call(ReaderServer, {update_high_seq, HighSeq})
+        end
+    end,
+    reader_loop(ReaderServer, Source, MissingRevsServer).
+
+spawn_document_request(Source, Id, Revs) ->
+    Server = self(),
+    SpawnFun = fun() ->
+        Results = open_doc_revs(Source, Id, Revs),
+        gen_server:call(Server, {add_docs, Results})
+    end,
+    spawn_monitor(SpawnFun).
+
+%% check if any more HTTP requests are pending for this update sequence
+calculate_new_high_seq(State, Ref) ->
+    #state{
+        monitors_by_ref = MonitorsByRef,
+        monitor_count_by_seq = MonitorCountBySeq,
+        high_missing_seq = OldSeq
+    } = State,
+    Seq = ets:lookup_element(MonitorsByRef, Ref, 2),
+    ets:delete(MonitorsByRef, Ref),
+    case ets:update_counter(MonitorCountBySeq, Seq, -1) of
+    0 ->
+        ets:delete(MonitorCountBySeq, Seq),
+        case ets:first(MonitorCountBySeq) of
+        Key when Key > Seq ->
+            Seq;
+        '$end_of_table' ->
+            Seq;
+        _Else ->
+            OldSeq
+        end;
+    _Else ->
+        OldSeq
+    end.

Added: couchdb/trunk/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_writer.erl?rev=802888&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_writer.erl (added)
+++ couchdb/trunk/src/couchdb/couch_rep_writer.erl Mon Aug 10 18:37:43 2009
@@ -0,0 +1,68 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_writer).
+
+-export([start_link/4]).
+
+-include("couch_db.hrl").
+
+start_link(Parent, Target, Reader, _PostProps) ->
+    {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
+
+writer_loop(Parent, Reader, Target) ->
+    % ?LOG_DEBUG("writer loop begin", []),
+    case couch_rep_reader:next(Reader) of
+    {complete, FinalSeq} ->
+        % ?LOG_INFO("writer terminating normally", []),
+        Parent ! {writer_checkpoint, FinalSeq},
+        ok;
+    {HighSeq, Docs} ->
+        % ?LOG_DEBUG("writer loop trying to write ~p", [Docs]),
+        DocCount = length(Docs),
+        try write_docs(Target, Docs) of
+        {ok, []} ->
+            Parent ! {update_stats, docs_written, DocCount};
+        {ok, Errors} ->
+            ErrorCount = length(Errors),
+            Parent ! {update_stats, doc_write_failures, ErrorCount},
+            Parent ! {update_stats, docs_written, DocCount - ErrorCount}
+        catch
+        {attachment_request_failed, Err} ->
+            ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]),
+            exit({attachment_request_failed, Err, Docs})
+        end,
+        Parent ! {writer_checkpoint, HighSeq},
+        couch_rep_att:cleanup(),
+        writer_loop(Parent, Reader, Target)
+    end.
+
+write_docs(#http_db{} = Db, Docs) ->
+    JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+    ErrorsJson = couch_rep_httpc:request(Db#http_db{
+        resource = "_bulk_docs",
+        method = post,
+        body = {[{new_edits, false}, {docs, JsonDocs}]}
+    }),
+    ErrorsList =
+    lists:map(
+        fun({Props}) ->
+            Id = proplists:get_value(<<"id">>, Props),
+            Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
+            ErrId = couch_util:to_existing_atom(
+                    proplists:get_value(<<"error">>, Props)),
+            Reason = proplists:get_value(<<"reason">>, Props),
+            {{Id, Rev}, {ErrId, Reason}}
+        end, ErrorsJson),
+    {ok, ErrorsList};
+write_docs(Db, Docs) ->
+    couch_db:update_docs(Db, Docs, [], replicated_changes).



Mime
View raw message