couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jch...@apache.org
Subject svn commit: r727171 - /couchdb/trunk/src/couchdb/couch_rep.erl
Date Tue, 16 Dec 2008 21:48:42 GMT
Author: jchris
Date: Tue Dec 16 13:48:41 2008
New Revision: 727171

URL: http://svn.apache.org/viewvc?rev=727171&view=rev
Log:
Adam Kocoloski replication performance improvements (closes COUCHDB-160)

Modified:
    couchdb/trunk/src/couchdb/couch_rep.erl

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=727171&r1=727170&r2=727171&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Tue Dec 16 13:48:41 2008
@@ -135,93 +135,11 @@
     end.
 
 pull_rep(DbTarget, DbSource, SourceSeqNum) ->
-    SaveDocsPid = spawn_link(fun() ->
-            save_docs_loop(DbTarget, 0) end),
-    OpenDocsPid = spawn_link(fun() ->
-            open_doc_revs_loop(DbSource, SaveDocsPid, 0) end),
-    OpenDocsPid ! got_it, % prime queue with got_it
-    MissingRevsPid = spawn_link(fun() ->
-            get_missing_revs_loop(DbTarget, OpenDocsPid, 0, 0) end),
-    MissingRevsPid ! got_it, % prime queue with got_it
-    self() ! got_it,
-    {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum,
-        fun(SrcDocInfo, _, _) ->
-            #doc_info{id=Id,
-                rev=Rev,
-                conflict_revs=Conflicts,
-                deleted_conflict_revs=DelConflicts,
-                update_seq=Seq} = SrcDocInfo,
-            SrcRevs = [Rev | Conflicts] ++ DelConflicts,
-            receive got_it -> ok end,
-            MissingRevsPid !  {self(), Id, SrcRevs}, % send to the missing revs process
-            {ok, Seq}
-        end, SourceSeqNum),
-    
-    receive got_it -> ok end,
-    
-    MissingRevsPid ! {self(), shutdown},
-    receive {done, MissingRevsPid, Stats1} -> ok end,
-    
-    OpenDocsPid ! {self(), shutdown},
-    receive {done, OpenDocsPid, Stats2} -> ok end,
-    
-    SaveDocsPid ! {self(), shutdown},
-    receive {done, SaveDocsPid, Stats3} -> ok end,
-    
-    {NewSeq, Stats1 ++ Stats2 ++ Stats3}.
-
-
-get_missing_revs_loop(DbTarget, OpenDocsPid, RevsChecked, MissingFound) ->
-    receive got_it -> ok end,
-    receive
-    {Src, Id, Revs} ->
-        Src ! got_it,
-
-        MissingRevs =
-        case get_missing_revs(DbTarget, [{Id, Revs}]) of
-        {ok, [{Id, MissingRevs0}]} ->
-            OpenDocsPid ! {self(), Id, MissingRevs0},
-            MissingRevs0;
-        {ok, []} ->
-            % prime our message queue
-            self() ! got_it,
-            []
-        end,
-        get_missing_revs_loop(DbTarget, OpenDocsPid,
-                RevsChecked + length(Revs),
-                MissingFound + length(MissingRevs));
-    {Src, shutdown} ->
-        Src ! {done, self(), [{<<"missing_checked">>, RevsChecked},
-                                 {<<"missing_found">>, MissingFound}]}
-    end.
-    
-
-open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead) ->
-    receive got_it -> ok end,
-    receive
-    {Src, Id, MissingRevs} ->
-        Src ! got_it,
-        {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
-        % only save successful reads
-        Docs = [RevDoc || {ok, RevDoc} <- DocResults],
-        SaveDocsPid ! {self(), docs, Docs},
-        open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead + length(Docs));
-    {Src, shutdown} ->
-        Src ! {done, self(), [{<<"docs_read">>, DocsRead}]}
-    end.
-
-
-        
-save_docs_loop(DbTarget, DocsWritten) ->
-    receive
-    {Src, docs, Docs} ->
-        Src ! got_it,
-        ok = update_docs(DbTarget, Docs, [], false),
-        save_docs_loop(DbTarget, DocsWritten + length(Docs));
-    {Src, shutdown} ->
-        Src ! {done, self(), [{<<"docs_written">>, DocsWritten}]}
-    end.
-
+    http:set_options([{max_pipeline_length, 101}, {pipeline_timeout, 5000}]),
+    {ok, {NewSeq, Stats}} = 
+        enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, []}),
+    http:set_options([{max_pipeline_length, 2}, {pipeline_timeout, 0}]),
+    {NewSeq, Stats}.
 
 do_http_request(Url, Action, Headers) ->
     do_http_request(Url, Action, Headers, []).
@@ -243,14 +161,96 @@
         ?JSON_DECODE(ResponseBody)
     end.
 
-enum_docs0(_InFun, [], Acc) ->
-    Acc;
-enum_docs0(InFun, [DocInfo | Rest], Acc) ->
-    case InFun(DocInfo, 0, Acc) of
-    {ok, Acc2} -> enum_docs0(InFun, Rest, Acc2);
-    {stop, Acc2} -> Acc2
+save_docs_buffer(DbTarget, DocsBuffer, []) ->
+    receive
+    {Src, shutdown} ->
+        ok = update_docs(DbTarget, lists:reverse(DocsBuffer), [], false),
+        Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
+    end;
+save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences) ->
+    [NextSeq|Rest] = UpdateSequences,
+    receive
+    {Src, skip, NextSeq} ->
+        Src ! got_it,
+        save_docs_buffer(DbTarget, DocsBuffer, Rest);
+    {Src, docs, {NextSeq, Docs}} ->
+        Src ! got_it,
+        case couch_util:should_flush() of
+            true ->
+                ok = update_docs(DbTarget, lists:reverse(Docs++DocsBuffer), [], 
+                    false),
+                save_docs_buffer(DbTarget, [], Rest);
+            false ->
+                save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest)
+        end;
+        {Src, shutdown} ->
+        ?LOG_ERROR("received shutdown while waiting for more update_seqs", []),
+        ok = update_docs(DbTarget, lists:reverse(DocsBuffer), [], false),
+        Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
     end.
 
+pmap(F,List) ->
+    [wait_result(Worker) || Worker <- [spawn_worker(self(),F,E) || E <- List]].
+
+spawn_worker(Parent, F, E) ->
+    erlang:spawn_monitor(fun() -> Parent ! {self(), F(E)} end).
+
+wait_result({Pid,Ref}) ->
+    receive
+    {'DOWN', Ref, _, _, normal} -> receive {Pid,Result} -> Result end;
+    {'DOWN', Ref, _, _, Reason} -> exit(Reason)
+end.
+
+enum_docs_parallel(DbS, DbT, DocInfoList) ->
+    UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
+    SaveDocsPid = spawn_link(fun() -> save_docs_buffer(DbT,[],UpdateSeqs) end),
+    
+    Stats = pmap(fun(SrcDocInfo) ->
+        #doc_info{id=Id,
+            rev=Rev,
+            conflict_revs=Conflicts,
+            deleted_conflict_revs=DelConflicts,
+            update_seq=Seq} = SrcDocInfo,
+        SrcRevs = [Rev | Conflicts] ++ DelConflicts,
+        
+        case get_missing_revs(DbT, [{Id, SrcRevs}]) of
+        {ok, [{Id, MissingRevs}]} ->
+            {ok, DocResults} = open_doc_revs(DbS, Id, MissingRevs, [latest]),
+            
+            % only save successful reads
+            Docs = [RevDoc || {ok, RevDoc} <- DocResults],
+            
+            % include update_seq so we save docs in order
+            SaveDocsPid ! {self(), docs, {Seq, Docs}},
+            receive got_it -> ok end,
+            [{missing_checked, length(SrcRevs)},
+             {missing_found, length(MissingRevs)},
+             {docs_read, length(Docs)}];
+        {ok, []} ->
+            SaveDocsPid ! {self(), skip, Seq},
+            receive got_it -> ok end,
+            [{missing_checked, length(SrcRevs)}]
+        end    
+    end, DocInfoList),
+    
+    SaveDocsPid ! {self(), shutdown},
+    
+    {MissingChecked, MissingFound, DocsRead} = lists:foldl(fun(S, {C, F, R}) ->
+        C1 = C + proplists:get_value(missing_checked, S, 0),
+        F1 = F + proplists:get_value(missing_found, S, 0),
+        R1 = R + proplists:get_value(docs_read, S, 0),
+        {C1, F1, R1}
+    end, {0, 0, 0}, Stats),
+    
+    receive
+        {done, SaveDocsPid, [{<<"docs_written">>, DocsWritten}]} -> ok
+    end,
+    
+    [ {<<"missing_checked">>, MissingChecked},
+      {<<"missing_found">>, MissingFound}, 
+      {<<"docs_read">>, DocsRead},
+      {<<"docs_written">>, DocsWritten} ].
+
 fix_url(UrlBin) ->
     Url = binary_to_list(UrlBin),
     case lists:last(Url) of
@@ -276,12 +276,10 @@
 close_db(Db)->
     couch_db:close(Db).
 
-
-enum_docs_since(#http_db{uri=DbUrl, headers=Headers}=Db, Start, InFun, InAcc)->
-    Url = DbUrl ++ "_all_docs_by_seq?count=100&startkey="
-            ++ integer_to_list(Start),
+get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) ->
+    Url = DbUrl ++ "_all_docs_by_seq?count=100&startkey=" 
+        ++ integer_to_list(StartSeq),
     {Results} = do_http_request(Url, get, Headers),
-    DocInfoList=
     lists:map(fun({RowInfoList}) ->
         {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList),
         #doc_info{
@@ -292,18 +290,44 @@
                 proplists:get_value(<<"conflicts">>, RowValueProps, []),
             deleted_conflict_revs =
                 proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []),
-            deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)}
-        end, proplists:get_value(<<"rows">>, Results)),
+            deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)
+        }
+    end, proplists:get_value(<<"rows">>, Results));
+get_doc_info_list(DbSource, StartSeq) ->
+    {ok, {_Count, DocInfoList}} = couch_db:enum_docs_since(DbSource, StartSeq, 
+    fun (_, _, {100, DocInfoList}) ->
+            {stop, {100, DocInfoList}};
+        (DocInfo, _, {Count, DocInfoList}) -> 
+            {ok, {Count+1, [DocInfo|DocInfoList]}} 
+    end, {0, []}),
+    lists:reverse(DocInfoList).
+
+enum_docs_since(DbSource, DbTarget, StartSeq, InAcc) ->
+    DocInfoList = get_doc_info_list(DbSource, StartSeq),
     case DocInfoList of
     [] ->
         {ok, InAcc};
     _ ->
-        Acc2 = enum_docs0(InFun, DocInfoList, InAcc),
+        Stats = enum_docs_parallel(DbSource, DbTarget, DocInfoList),
+        OldStats = element(2, InAcc),
+        TotalStats = [
+            {<<"missing_checked">>, 
+                proplists:get_value(<<"missing_checked">>, OldStats, 0) +
+                proplists:get_value(<<"missing_checked">>, Stats, 0)},
+            {<<"missing_found">>, 
+                proplists:get_value(<<"missing_found">>, OldStats, 0) +
+                proplists:get_value(<<"missing_found">>, Stats, 0)},
+            {<<"docs_read">>, 
+                proplists:get_value(<<"docs_read">>, OldStats, 0) +
+                proplists:get_value(<<"docs_read">>, Stats, 0)},
+            {<<"docs_written">>, 
+                proplists:get_value(<<"docs_written">>, OldStats, 0) +
+                proplists:get_value(<<"docs_written">>, Stats, 0)}
+        ],
+        
         #doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
-        enum_docs_since(Db, LastSeq, InFun, Acc2)
-    end;
-enum_docs_since(DbSource, StartSeq, Fun, Acc) ->
-    couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc).
+        enum_docs_since(DbSource, DbTarget, LastSeq, {LastSeq, TotalStats})
+    end.
 
 get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) ->
     {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers,



Mime
View raw message