couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject svn commit: r807719 - in /couchdb/branches/0.10.x: ./ etc/default/ src/ src/couchdb/ test/ test/etap/
Date Tue, 25 Aug 2009 17:12:16 GMT
Author: kocolosk
Date: Tue Aug 25 17:12:15 2009
New Revision: 807719

URL: http://svn.apache.org/viewvc?rev=807719&view=rev
Log:
do what i said i did before, but this time for real

Modified:
    couchdb/branches/0.10.x/   (props changed)
    couchdb/branches/0.10.x/etc/default/couchdb   (props changed)
    couchdb/branches/0.10.x/src/   (props changed)
    couchdb/branches/0.10.x/src/couchdb/   (props changed)
    couchdb/branches/0.10.x/src/couchdb/couch_httpd_db.erl
    couchdb/branches/0.10.x/src/couchdb/couch_rep.erl
    couchdb/branches/0.10.x/src/couchdb/couch_rep_changes_feed.erl
    couchdb/branches/0.10.x/src/couchdb/couch_rep_httpc.erl
    couchdb/branches/0.10.x/src/couchdb/couch_rep_missing_revs.erl
    couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl
    couchdb/branches/0.10.x/src/couchdb/couch_rep_writer.erl
    couchdb/branches/0.10.x/test/   (props changed)
    couchdb/branches/0.10.x/test/etap/   (props changed)
    couchdb/branches/0.10.x/test/etap/112-replication-missing-revs.t   (props changed)

Propchange: couchdb/branches/0.10.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Aug 25 17:12:15 2009
@@ -3,4 +3,4 @@
 /couchdb/branches/form:729440-730015
 /couchdb/branches/list-iterator:782292-784593
 /couchdb/branches/tail_header:775760-778477
-/couchdb/trunk:807208*,807308*,807320*,807342*,807345*,807354*,807360*,807459*,807461*,807468*,807473*,807477*
+/couchdb/trunk:807208-807478

Propchange: couchdb/branches/0.10.x/etc/default/couchdb
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Aug 25 17:12:15 2009
@@ -3,4 +3,5 @@
 /couchdb/branches/form/etc/default/couchdb:729440-730015
 /couchdb/branches/list-iterator/etc/default/couchdb:782292-784593
 /couchdb/branches/tail_header/etc/default/couchdb:775760-778477
+/couchdb/trunk/etc/default/couchdb:807208-807478
 /incubator/couchdb/trunk/etc/default/couchdb:642419-694440

Propchange: couchdb/branches/0.10.x/src/
            ('svn:mergeinfo' removed)

Propchange: couchdb/branches/0.10.x/src/couchdb/
            ('svn:mergeinfo' removed)

Modified: couchdb/branches/0.10.x/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_httpd_db.erl?rev=807719&r1=807718&r2=807719&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_httpd_db.erl Tue Aug 25 17:12:15 2009
@@ -327,13 +327,29 @@
     send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
 
 db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db)
->
-    % make the batch save
-    committed = couch_batch_save:commit_now(Db#db.name, Db#db.user_ctx),
-    {ok, DbStartTime} = couch_db:ensure_full_commit(Db),
+    UpdateSeq = couch_db:get_update_seq(Db),
+    CommittedSeq = couch_db:get_committed_update_seq(Db),
+    {ok, StartTime} =
+    case couch_httpd:qs_value(Req, "seq") of
+    undefined ->
+        committed = couch_batch_save:commit_now(Db#db.name, Db#db.user_ctx),
+        couch_db:ensure_full_commit(Db);
+    RequiredStr ->
+        RequiredSeq = list_to_integer(RequiredStr),
+        if RequiredSeq > UpdateSeq ->
+            throw({bad_request,
+                "can't do a full commit ahead of current update_seq"});
+        RequiredSeq > CommittedSeq ->
+            % user asked for an explicit sequence, don't commit any batches
+            couch_db:ensure_full_commit(Db);
+        true ->
+            {ok, Db#db.instance_start_time}
+        end
+    end,
     send_json(Req, 201, {[
-            {ok, true},
-            {instance_start_time, DbStartTime}
-        ]});
+        {ok, true},
+        {instance_start_time, StartTime}
+    ]});
 
 db_req(#httpd{path_parts=[_,<<"_ensure_full_commit">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "POST");

Modified: couchdb/branches/0.10.x/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_rep.erl?rev=807719&r1=807718&r2=807719&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_rep.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_rep.erl Tue Aug 25 17:12:15 2009
@@ -15,7 +15,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
     code_change/3]).
 
--export([replicate/2]).
+-export([replicate/2, checkpoint/1]).
 
 -include("couch_db.hrl").
 
@@ -28,6 +28,7 @@
     source,
     target,
     init_args,
+    checkpoint_scheduled = nil,
 
     start_seq,
     history,
@@ -73,6 +74,9 @@
         get_result(Server, PostBody, UserCtx)
     end.
 
+checkpoint(Server) ->
+    gen_server:cast(Server, do_checkpoint).
+
 get_result(Server, PostBody, UserCtx) ->
     try gen_server:call(Server, get_result, infinity) of
     retry -> replicate(PostBody, UserCtx);
@@ -137,6 +141,7 @@
         target = Target,
         init_args = InitArgs,
         stats = Stats,
+        checkpoint_scheduled = nil,
 
         start_seq = StartSeq,
         history = History,
@@ -154,19 +159,22 @@
     Listeners = State#state.listeners,
     {noreply, State#state{listeners=[From|Listeners]}}.
 
+handle_cast(do_checkpoint, State) ->
+    {noreply, do_checkpoint(State)};
+
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
 handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
     couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
-    {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+    {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
 
 handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
         when SourceSeq > N ->
     MissingRevs = State#state.missing_revs,
     ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
     couch_task_status:update("W Processed source update #~p", [SourceSeq]),
-    {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+    {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
 handle_info({writer_checkpoint, _}, State) ->
     {noreply, State};
 
@@ -188,41 +196,12 @@
     ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]),
     {stop, Reason, State}.
 
-terminate(normal, State) ->
-    #state{
-        checkpoint_history = CheckpointHistory,
-        committed_seq = NewSeq,
-        listeners = Listeners,
-        source = Source,
-        target = Target,
-        stats = Stats,
-        source_log = #doc{body={OldHistory}}
-    } = State,
-    couch_task_status:update("Finishing"),
-    ets:delete(Stats),
-    close_db(Target),
+terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
+    do_terminate(State);
     
-    NewRepHistory = case CheckpointHistory of
-    nil ->
-        {[{<<"no_changes">>, true} | OldHistory]};
-    _Else ->
-        CheckpointHistory
-    end,
-
-    %% reply to original requester
-    [Original|OtherListeners] = lists:reverse(Listeners),
-    gen_server:reply(Original, {ok, NewRepHistory}),
-
-    %% maybe trigger another replication. If this replicator uses a local
-    %% source Db, changes to that Db since we started will not be included in
-    %% this pass.
-    case up_to_date(Source, NewSeq) of
-        true ->
-            [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners];
-        false ->
-            [gen_server:reply(R, retry) || R <- OtherListeners]
-    end,
-    close_db(Source);
+terminate(normal, State) ->
+    timer:cancel(State#state.checkpoint_scheduled),
+    do_terminate(do_checkpoint(State));
 
 terminate(Reason, State) ->
     #state{
@@ -330,6 +309,42 @@
     {ok, Info} = couch_db:get_db_info(Db),
     Info.
 
+do_terminate(State) ->
+    #state{
+        checkpoint_history = CheckpointHistory,
+        committed_seq = NewSeq,
+        listeners = Listeners,
+        source = Source,
+        target = Target,
+        stats = Stats,
+        source_log = #doc{body={OldHistory}}
+    } = State,
+    couch_task_status:update("Finishing"),
+    ets:delete(Stats),
+    close_db(Target),
+    
+    NewRepHistory = case CheckpointHistory of
+    nil ->
+        {[{<<"no_changes">>, true} | OldHistory]};
+    _Else ->
+        CheckpointHistory
+    end,
+
+    %% reply to original requester
+    [Original|OtherListeners] = lists:reverse(Listeners),
+    gen_server:reply(Original, {ok, NewRepHistory}),
+
+    %% maybe trigger another replication. If this replicator uses a local
+    %% source Db, changes to that Db since we started will not be included in
+    %% this pass.
+    case up_to_date(Source, NewSeq) of
+        true ->
+            [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners];
+        false ->
+            [gen_server:reply(R, retry) || R <- OtherListeners]
+    end,
+    close_db(Source).
+
 has_session_id(_SessionId, []) ->
     false;
 has_session_id(SessionId, [{Props} | Rest]) ->
@@ -343,6 +358,7 @@
 make_replication_id({Props}, UserCtx) ->
     %% funky algorithm to preserve backwards compatibility
     {ok, HostName} = inet:gethostname(),
+    % Port = mochiweb_socket_server:get(couch_httpd, port),
     Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)),
     Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)),
   
     couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))).
@@ -414,6 +430,18 @@
     {not_found, no_db_file} -> throw({db_not_found, DbName})
     end.
 
+schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) ->
+    Server = self(),
+    case timer:apply_after(5000, couch_rep, checkpoint, [Server]) of
+    {ok, TRef} ->
+        State#state{checkpoint_scheduled = TRef};
+    Error ->
+        ?LOG_ERROR("tried to schedule a checkpoint but got ~p", [Error]),
+        State
+    end;
+schedule_checkpoint(State) ->
+    State.
+
 do_checkpoint(State) ->
     #state{
         source = Source,
@@ -429,7 +457,7 @@
         stats = Stats
     } = State,
     ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
-    RecordSeqNum = case commit_to_both(Source, Target) of
+    RecordSeqNum = case commit_to_both(Source, Target, NewSeqNum) of
     {SrcInstanceStartTime, TgtInstanceStartTime} ->
         NewSeqNum;
     _Else ->
@@ -462,10 +490,11 @@
 
     try
     {SrcRevPos,SrcRevId} =
-        update_doc(Source, SourceLog#doc{body=NewRepHistory}, []),
+        update_local_doc(Source, SourceLog#doc{body=NewRepHistory}),
     {TgtRevPos,TgtRevId} =
-        update_doc(Target, TargetLog#doc{body=NewRepHistory}, []),
+        update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
     State#state{
+        checkpoint_scheduled = nil,
         checkpoint_history = NewRepHistory,
         source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
         target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
@@ -476,11 +505,11 @@
     State
     end.
 
-commit_to_both(Source, Target) ->
+commit_to_both(Source, Target, RequiredSeq) ->
     % commit the src async
     ParentPid = self(),
     SrcCommitPid = spawn_link(fun() ->
-            ParentPid ! {self(), ensure_full_commit(Source)} end),
+            ParentPid ! {self(), ensure_full_commit(Source, RequiredSeq)} end),
 
     % commit tgt sync
     TargetStartTime = ensure_full_commit(Target),
@@ -494,8 +523,8 @@
     end,
     {SourceStartTime, TargetStartTime}.
     
-ensure_full_commit(#http_db{} = Db) ->
-    Req = Db#http_db{
+ensure_full_commit(#http_db{} = Target) ->
+    Req = Target#http_db{
         resource = "_ensure_full_commit",
         method = post,
         body = true
@@ -503,21 +532,58 @@
     {ResultProps} = couch_rep_httpc:request(Req),
     true = proplists:get_value(<<"ok">>, ResultProps),
     proplists:get_value(<<"instance_start_time">>, ResultProps);
-ensure_full_commit(Db) ->
-    {ok, StartTime} = couch_db:ensure_full_commit(Db),
-    StartTime.
+ensure_full_commit(Target) ->
+    {ok, NewDb} = couch_db:open(Target#db.name, []),
+    UpdateSeq = couch_db:get_update_seq(Target),
+    CommitSeq = couch_db:get_committed_update_seq(NewDb),
+    InstanceStartTime = NewDb#db.instance_start_time,
+    couch_db:close(NewDb),
+    if UpdateSeq > CommitSeq ->
+        ?LOG_DEBUG("replication needs a full commit: update ~p commit ~p",
+            [UpdateSeq, CommitSeq]),
+        {ok, DbStartTime} = couch_db:ensure_full_commit(Target),
+        DbStartTime;
+    true ->
+        ?LOG_DEBUG("replication doesn't need a full commit", []),
+        InstanceStartTime
+    end.
+
+ensure_full_commit(#http_db{} = Source, RequiredSeq) ->
+    Req = Source#http_db{
+        resource = "_ensure_full_commit",
+        method = post,
+        body = true,
+        qs = [{seq, RequiredSeq}]
+    },
+    {ResultProps} = couch_rep_httpc:request(Req),
+    case proplists:get_value(<<"ok">>, ResultProps) of
+    true ->
+        proplists:get_value(<<"instance_start_time">>, ResultProps);
+    undefined -> nil end;
+ensure_full_commit(Source, RequiredSeq) ->
+    {ok, NewDb} = couch_db:open(Source#db.name, []),
+    CommitSeq = couch_db:get_committed_update_seq(NewDb),
+    InstanceStartTime = NewDb#db.instance_start_time,
+    couch_db:close(NewDb),
+    if RequiredSeq > CommitSeq ->
+        {ok, DbStartTime} = couch_db:ensure_full_commit(Source),
+        DbStartTime;
+    true ->
+        InstanceStartTime
+    end.
 
-update_doc(#http_db{} = Db, #doc{id=DocId} = Doc, []) ->
+update_local_doc(#http_db{} = Db, #doc{id=DocId} = Doc) ->
     Req = Db#http_db{
         resource = couch_util:url_encode(DocId),
         method = put,
-        body = couch_doc:to_json_obj(Doc, [attachments])
+        body = couch_doc:to_json_obj(Doc, [attachments]),
+        headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers]
     },
     {ResponseMembers} = couch_rep_httpc:request(Req),
     Rev = proplists:get_value(<<"rev">>, ResponseMembers),
     couch_doc:parse_rev(Rev);
-update_doc(Db, Doc, Options) ->
-    {ok, Result} = couch_db:update_doc(Db, Doc, Options),
+update_local_doc(Db, Doc) ->
+    {ok, Result} = couch_db:update_doc(Db, Doc, [delay_commit]),
     Result.
 
 up_to_date(#http_db{}, _Seq) ->

Modified: couchdb/branches/0.10.x/src/couchdb/couch_rep_changes_feed.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_rep_changes_feed.erl?rev=807719&r1=807718&r2=807719&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_rep_changes_feed.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_rep_changes_feed.erl Tue Aug 25 17:12:15 2009
@@ -67,7 +67,7 @@
     {ibrowse_async_headers, ReqId, "200", _} ->
         ibrowse:stream_next(ReqId),
         {ok, #state{conn=Pid, last_seq=Since, reqid=ReqId}};
-    {ibrowse_async_headers, ReqId, "301", Hdrs} ->
+    {ibrowse_async_headers, ReqId, Code, Hdrs} when Code=="301"; Code=="302" ->
         catch ibrowse:stop_worker_process(Pid),
         Url2 = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
         %% TODO use couch_httpc:request instead of start_http_request
@@ -97,7 +97,12 @@
     false ->
         spawn_link(fun() -> send_local_changes_once(Server, Source, Since) end);
     true ->
-        spawn_link(fun() -> send_local_changes_forever(Server, Source, Since) end)
+        spawn_link(fun() ->
+            Self = self(),
+            {ok, _} = couch_db_update_notifier:start_link(fun(Msg) ->
+                local_update_notification(Self, Source#db.name, Msg) end),
+            send_local_changes_forever(Server, Source, Since)
+        end)
     end,
     {ok, #state{changes_loop=ChangesPid}}.
 
@@ -279,7 +284,7 @@
             {<<"id">>, Id},
             {<<"changes">>, [{[{<<"rev">>,R}]} || R <- ParsedRevs]}
         ]},
-        gen_server:call(Server, {add_change, Change}),
+        gen_server:call(Server, {add_change, Change}, infinity),
         Seq
     end, 0, Rows),
     by_seq_loop(Server, Source, EndSeq+1).
@@ -312,9 +317,6 @@
 
 send_local_changes_forever(Server, Db, Since) ->
     #db{name = DbName, user_ctx = UserCtx} = Db,
-    Self = self(),
-    {ok, _} = couch_db_update_notifier:start_link(
-        fun(Msg) -> local_update_notification(Self, DbName, Msg) end),
     {ok, NewSeq} = send_local_changes_once(Server, Db, Since),
     couch_db:close(Db),
     ok = wait_db_updated(),

Modified: couchdb/branches/0.10.x/src/couchdb/couch_rep_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_rep_httpc.erl?rev=807719&r1=807718&r2=807719&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_rep_httpc.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_rep_httpc.erl Tue Aug 25 17:12:15 2009
@@ -92,7 +92,7 @@
     Code = list_to_integer(Status),
     if Code =:= 200; Code =:= 201 ->
         ?JSON_DECODE(maybe_decompress(Headers, Body));
-    Code =:= 301 ->
+    Code =:= 301; Code =:= 302 ->
         MochiHeaders = mochiweb_headers:make(Headers),
         RedirectUrl = mochiweb_headers:get_value("Location", MochiHeaders),
         do_request(Req#http_db{url = RedirectUrl});

Modified: couchdb/branches/0.10.x/src/couchdb/couch_rep_missing_revs.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_rep_missing_revs.erl?rev=807719&r1=807718&r2=807719&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_rep_missing_revs.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_rep_missing_revs.erl Tue Aug 25 17:12:15 2009
@@ -148,6 +148,7 @@
     Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]})
->
         {Id, [couch_doc:rev_to_str(R) || {[{<<"rev">>, R}]} <- C]} end,
     IdRevsList = [Transform(Change) || Change <- Changes],
+    SeqDict = changes_dictionary(Changes),
     {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes),
     Request = Target#http_db{
         resource = "_missing_revs",
@@ -156,16 +157,23 @@
     },
     {Resp} = couch_rep_httpc:request(Request),
     {MissingRevs} = proplists:get_value(<<"missing_revs">>, Resp),
-    X = [{Id, couch_doc:parse_revs(RevStrs)} || {Id,RevStrs} <- MissingRevs],
+    X = [{Id, dict:fetch(Id, SeqDict), couch_doc:parse_revs(RevStrs)} ||
+        {Id,RevStrs} <- MissingRevs],
     {HighSeq, X};
         
 get_missing_revs(Target, Changes) ->
     Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]})
->
         {Id, [R || {[{<<"rev">>, R}]} <- C]} end,
     IdRevsList = [Transform(Change) || Change <- Changes],
+    SeqDict = changes_dictionary(Changes),
     {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes),
     {ok, Results} = couch_db:get_missing_revs(Target, IdRevsList),
-    {HighSeq, Results}.
+    {HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs} <- Results]}.
+
+changes_dictionary(ChangeList) ->
+    KVs = [{proplists:get_value(<<"id">>,C), proplists:get_value(<<"seq">>,C)}
+        || {C} <- ChangeList],
+    dict:from_list(KVs).
 
 %% save a checkpoint if no revs are missing on target so we don't
 %% rescan metadata unnecessarily

Modified: couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl?rev=807719&r1=807718&r2=807719&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl Tue Aug 25 17:12:15 2009
@@ -38,10 +38,9 @@
     reply_to = nil,
     complete = false,
     monitor_count = 0,
-    monitor_count_by_seq = ets:new(monitor_count_by_seq, [set, private]),
-    monitors_by_ref = ets:new(monitors_by_ref, [set, private]),
     pending_doc_request = nil,
-    high_missing_seq = 0
+    requested_seqs = [],
+    opened_seqs = []
 }).
 
 start_link(Parent, Source, MissingRevs, PostProps) ->
@@ -67,28 +66,25 @@
     },
     {ok, State}.
 
-handle_call({add_docs, Docs}, From, State) ->
+handle_call({add_docs, Seq, Docs}, From, State) ->
     State#state.parent ! {update_stats, docs_read, length(Docs)},
-    handle_add_docs(lists:flatten(Docs), From, State);
+    handle_add_docs(Seq, lists:flatten(Docs), From, State);
+
+handle_call({add_request_seqs, Seqs}, _From, State) ->
+    SeqList = State#state.requested_seqs,
+    {reply, ok, State#state{requested_seqs = lists:merge(Seqs, SeqList)}};
 
 handle_call(next_docs, From, State) ->
     handle_next_docs(From, State);
 
-handle_call({open_doc_revs, Id, Revs, HighSeq}, From, State) ->
-    handle_open_doc_revs(Id, Revs, HighSeq, From, State);
-
-handle_call({set_monitor_count, Seq, Count}, _From, State) ->
-    ets:insert(State#state.monitor_count_by_seq, {Seq,Count}),
-    {reply, ok, State};
-
-handle_call({update_high_seq, HighSeq}, _From, State) ->
-    {reply, ok, State#state{high_missing_seq=HighSeq}}.
+handle_call({open_remote_doc, Id, Seq, Revs}, From, State) ->
+    handle_open_remote_doc(Id, Seq, Revs, From, State).
 
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({'DOWN', Ref, _, _, Reason}, State) ->
-    handle_monitor_down(Reason, Ref, State);
+handle_info({'DOWN', _, _, _, Reason}, State) ->
+    handle_monitor_down(Reason, State);
 
 handle_info({'EXIT', Loop, complete}, #state{reader_loop=Loop} = State) ->
     handle_reader_loop_complete(State).
@@ -102,83 +98,87 @@
 
 %internal funs
 
-handle_add_docs(DocsToAdd, From, #state{reply_to=nil} = State) ->
-    NewState = State#state{
-        docs = queue:join(State#state.docs, queue:from_list(DocsToAdd)),
-        count = State#state.count + length(DocsToAdd)
+handle_add_docs(Seq, DocsToAdd, From, #state{reply_to=nil} = State) ->
+    State1 = update_sequence_lists(Seq, State),
+    NewState = State1#state{
+        docs = queue:join(State1#state.docs, queue:from_list(DocsToAdd)),
+        count = State1#state.count + length(DocsToAdd)
     },
     if NewState#state.count < ?BUFFER_SIZE ->
         {reply, ok, NewState};
     true ->
         {noreply, NewState#state{reader_from=From}}
     end;
-handle_add_docs(DocsToAdd, _From, #state{count=0} = State) ->
-    HighSeq = State#state.high_missing_seq,
+handle_add_docs(Seq, DocsToAdd, _From, #state{count=0} = State) ->
+    NewState = update_sequence_lists(Seq, State),
+    HighSeq = calculate_new_high_seq(NewState),
     gen_server:reply(State#state.reply_to, {HighSeq, DocsToAdd}),
-    {reply, ok, State#state{reply_to=nil}}.
+    {reply, ok, NewState#state{reply_to=nil}}.
 
 handle_next_docs(From, #state{count=0} = State) ->
     if State#state.complete ->
-        {stop, normal, {complete, State#state.high_missing_seq}, State};
+        {stop, normal, {complete, calculate_new_high_seq(State)}, State};
     true ->
         {noreply, State#state{reply_to=From}}
     end;
 handle_next_docs(_From, State) ->
     #state{
         reader_from = ReaderFrom,
-        docs = Docs,
-        high_missing_seq = HighSeq
+        docs = Docs
     } = State,
     if ReaderFrom =/= nil ->
         gen_server:reply(ReaderFrom, ok);
     true -> ok end,
     NewState = State#state{count=0, reader_from=nil, docs=queue:new()},
-    {reply, {HighSeq, queue:to_list(Docs)}, NewState}.
+    {reply, {calculate_new_high_seq(State), queue:to_list(Docs)}, NewState}.
 
-handle_open_doc_revs(Id, Revs, Seq, From, #state{monitor_count=N} = State)
+handle_open_remote_doc(Id, Seq, Revs, From, #state{monitor_count=N} = State)
         when N > ?MAX_CONCURRENT_REQUESTS ->
-    {noreply, State#state{pending_doc_request={From,Id,Revs,Seq}}};
-handle_open_doc_revs(Id, Revs, Seq, _From, #state{source=#http_db{}} = State) ->
+    {noreply, State#state{pending_doc_request={From,Id,Seq,Revs}}};
+handle_open_remote_doc(Id, Seq, Revs, _, #state{source=#http_db{}} = State) ->
     #state{
         monitor_count = Count,
-        monitors_by_ref = MonitorsByRef,
         source = Source
     } = State,
-    {_, Ref} = spawn_document_request(Source, Id, Revs),
-    ets:insert(MonitorsByRef, {Ref, Seq}),
+    {_, _Ref} = spawn_document_request(Source, Id, Seq, Revs),
     {reply, ok, State#state{monitor_count = Count+1}}.
 
-handle_monitor_down(normal, Ref, #state{pending_doc_request=nil,
+handle_monitor_down(normal, #state{pending_doc_request=nil,
         monitor_count=1, complete=waiting_on_monitors} = State) ->
-    N = calculate_new_high_seq(State, Ref),
-    {noreply, State#state{complete=true, monitor_count=0, high_missing_seq=N}};
-handle_monitor_down(normal, Ref, #state{pending_doc_request=nil} = State) ->
+    {noreply, State#state{complete=true, monitor_count=0}};
+handle_monitor_down(normal, #state{pending_doc_request=nil} = State) ->
     #state{monitor_count = Count} = State,
-    HighSeq = calculate_new_high_seq(State, Ref),
-    {noreply, State#state{monitor_count = Count-1, high_missing_seq=HighSeq}};
-handle_monitor_down(normal, Ref, State) ->
+    {noreply, State#state{monitor_count = Count-1}};
+handle_monitor_down(normal, State) ->
     #state{
         source = Source,
-        monitors_by_ref = MonitorsByRef,
-        pending_doc_request = {From, Id, Revs, Seq}
+        pending_doc_request = {From, Id, Seq, Revs}
     } = State,
-    HighSeq = calculate_new_high_seq(State, Ref),
     gen_server:reply(From, ok),
-    {_, NewRef} = spawn_document_request(Source, Id, Revs),
-    ets:insert(MonitorsByRef, {NewRef, Seq}),
-    {noreply, State#state{pending_doc_request=nil, high_missing_seq=HighSeq}};
-handle_monitor_down(Reason, _, State) ->
+    {_, _NewRef} = spawn_document_request(Source, Id, Seq, Revs),
+    {noreply, State#state{pending_doc_request=nil}};
+handle_monitor_down(Reason, State) ->
     {stop, Reason, State}.
 
 handle_reader_loop_complete(#state{reply_to=nil, monitor_count=0} = State) ->
     {noreply, State#state{complete = true}};
 handle_reader_loop_complete(#state{monitor_count=0} = State) ->
-    HighSeq = State#state.high_missing_seq,
+    HighSeq = calculate_new_high_seq(State),
     gen_server:reply(State#state.reply_to, {complete, HighSeq}),
     {stop, normal, State};
 handle_reader_loop_complete(State) ->
     {noreply, State#state{complete = waiting_on_monitors}}.
 
+calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) ->
+    Open;
+calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]})
+        when Req < Open ->
+    0;
+calculate_new_high_seq(#state{opened_seqs=[]}) ->
+    0;
+calculate_new_high_seq(State) ->
+    hd(State#state.opened_seqs).
+
 split_revlist(Rev, {[CurrentAcc|Rest], BaseLength, Length}) ->
     case Length+size(Rev) > 8192 of
     false ->
@@ -187,6 +187,31 @@
         {[[Rev],CurrentAcc|Rest], BaseLength, BaseLength}
     end.
 
+% We store outstanding requested sequences and a subset of already opened
+% sequences in 2 ordered lists.  The subset of opened seqs is a) the largest
+% opened seq smaller than the smallest outstanding request seq plus b) all the
+% opened seqs greater than the smallest outstanding request.  I believe its the
+% minimal set of info needed to correctly calculate which seqs have been
+% replicated (because remote docs can be opened out-of-order) -- APK
+update_sequence_lists(Seq, State) ->
+    Requested = lists:delete(Seq, State#state.requested_seqs),
+    AllOpened = lists:merge([Seq], State#state.opened_seqs),
+    Opened = case Requested of
+    [] ->
+        [lists:last(AllOpened)];
+    [EarliestReq|_] ->
+        case lists:splitwith(fun(X) -> X < EarliestReq end, AllOpened) of
+        {[], Greater} ->
+            Greater;
+        {Less, Greater} ->
+            [lists:last(Less) | Greater]
+        end
+    end,
+    State#state{
+        requested_seqs = Requested,
+        opened_seqs = Opened
+    }.
+
 open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
     %% all this logic just splits up revision lists that are too long for
     %% MochiWeb into multiple requests
@@ -214,25 +239,25 @@
 reader_loop(ReaderServer, Source, MissingRevsServer) ->
     case couch_rep_missing_revs:next(MissingRevsServer) of
     complete ->
-        % ?LOG_INFO("reader_loop terminating with complete", []),
         exit(complete);
     {HighSeq, IdsRevs} ->
-        % ?LOG_DEBUG("got IdsRevs ~p", [IdsRevs]),
+        % to be safe, make sure Results are sorted by source_seq
+        SortedIdsRevs = lists:keysort(2, IdsRevs),
+        RequestSeqs = [S || {_,S,_} <- SortedIdsRevs],
+        gen_server:call(ReaderServer, {add_request_seqs, RequestSeqs}, infinity),
         case Source of
         #http_db{} ->
-            N = length(IdsRevs),
-            gen_server:call(ReaderServer, {set_monitor_count, HighSeq, N}),
-            [gen_server:call(ReaderServer, {open_doc_revs, Id, Revs, HighSeq})
-                || {Id,Revs} <- IdsRevs],
+            [gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs},
+                infinity) || {Id,Seq,Revs} <- SortedIdsRevs],
             reader_loop(ReaderServer, Source, MissingRevsServer);
         _Local ->
             Source2 = maybe_reopen_db(Source, HighSeq),
-            lists:foreach(fun({Id,Revs}) ->
+            lists:foreach(fun({Id,Seq,Revs}) ->
                 {ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]),
                 JustTheDocs = [Doc || {ok, Doc} <- Docs],
-                gen_server:call(ReaderServer, {add_docs, JustTheDocs})
-            end, IdsRevs),
-            gen_server:call(ReaderServer, {update_high_seq, HighSeq}),
+                gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs},
+                    infinity)
+            end, SortedIdsRevs),
             reader_loop(ReaderServer, Source2, MissingRevsServer)
         end
     end.
@@ -243,34 +268,10 @@
 maybe_reopen_db(Db, _HighSeq) ->
     Db.
 
-spawn_document_request(Source, Id, Revs) ->
+spawn_document_request(Source, Id, Seq, Revs) ->
     Server = self(),
     SpawnFun = fun() ->
         Results = open_doc_revs(Source, Id, Revs),
-        gen_server:call(Server, {add_docs, Results})
+        gen_server:call(Server, {add_docs, Seq, Results}, infinity)
     end,
     spawn_monitor(SpawnFun).
-
-%% check if any more HTTP requests are pending for this update sequence
-calculate_new_high_seq(State, Ref) ->
-    #state{
-        monitors_by_ref = MonitorsByRef,
-        monitor_count_by_seq = MonitorCountBySeq,
-        high_missing_seq = OldSeq
-    } = State,
-    Seq = ets:lookup_element(MonitorsByRef, Ref, 2),
-    ets:delete(MonitorsByRef, Ref),
-    case ets:update_counter(MonitorCountBySeq, Seq, -1) of
-    0 ->
-        ets:delete(MonitorCountBySeq, Seq),
-        case ets:first(MonitorCountBySeq) of
-        Key when Key > Seq ->
-            Seq;
-        '$end_of_table' ->
-            Seq;
-        _Else ->
-            OldSeq
-        end;
-    _Else ->
-        OldSeq
-    end.

Modified: couchdb/branches/0.10.x/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_rep_writer.erl?rev=807719&r1=807718&r2=807719&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_rep_writer.erl Tue Aug 25 17:12:15 2009
@@ -48,7 +48,8 @@
     ErrorsJson = couch_rep_httpc:request(Db#http_db{
         resource = "_bulk_docs",
         method = post,
-        body = {[{new_edits, false}, {docs, JsonDocs}]}
+        body = {[{new_edits, false}, {docs, JsonDocs}]},
+        headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers]
     }),
     ErrorsList =
     lists:map(
@@ -62,4 +63,4 @@
         end, ErrorsJson),
     {ok, ErrorsList};
 write_docs(Db, Docs) ->
-    couch_db:update_docs(Db, Docs, [], replicated_changes).
+    couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes).

Propchange: couchdb/branches/0.10.x/test/
            ('svn:mergeinfo' removed)

Propchange: couchdb/branches/0.10.x/test/etap/
            ('svn:mergeinfo' removed)

Propchange: couchdb/branches/0.10.x/test/etap/112-replication-missing-revs.t
            ('svn:mergeinfo' removed)



Mime
View raw message