couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dam...@apache.org
Subject svn commit: r955773 - in /couchdb/branches/new_replicator/src/couchdb: couch_api_wrap.erl couch_api_wrap.hrl couch_doc.erl couch_httpd_rep.erl couch_replicate.erl couch_replicate.hrl
Date Thu, 17 Jun 2010 22:35:49 GMT
Author: damien
Date: Thu Jun 17 22:35:49 2010
New Revision: 955773

URL: http://svn.apache.org/viewvc?rev=955773&view=rev
Log:
Reorganized and commented the new replicator code for easier understanding.

Added:
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
      - copied unchanged from r955001, couchdb/branches/new_replicator/src/couchdb/couch_replicate.hrl
Removed:
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.hrl
Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_doc.erl
    couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=955773&r1=955772&r2=955773&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Thu Jun 17 22:35:49 2010
@@ -12,13 +12,29 @@
 
 -module(couch_api_wrap).
 
+% This module wraps the native erlang API, and allows for performing
+% operations on a remote vs. local databases via the same API.
+%
+% Notes:
+% Many options and apis aren't yet supported here, they are added as needed.
+%
+% This file neesds a lot of work to "robustify" the common failures, and
+% convert the json errors back to Erlang style errors.
+%
+% Also, we open a new connection for every HTTP call, to avoid the
+% problems when requests are pipelined over a single connection and earlier
+% requests that fail and disconnect don't cause network errors for other
+% requests. This should eventually be optimized so each process has it's own
+% connection that's kept alive between requests.
+%
 
 -include("couch_db.hrl").
--include("couch_replicate.hrl").
+-include("couch_api_wrap.hrl").
 -include("../ibrowse/ibrowse.hrl").
 
 -export([
     db_open/2,
+    db_close/1,
     get_db_info/1,
     open_doc/3,
     update_doc/3,
@@ -34,6 +50,12 @@ db_open(#httpdb{}=Db, _Options) ->
 db_open(DbName, Options) ->
     couch_db:open(DbName,Options).
 
+db_close(#httpdb{}) ->
+    ok;
+db_close(DbName) ->
+    couch_db:close(DbName).
+
+
 get_db_info(#httpdb{url=Url,oauth=OAuth,headers=Headers}) ->
     Headers2 = oauth_header(Url, [], get, OAuth) ++ Headers,
     case ibrowse:send_req(Url, Headers2, get, [], [ 
@@ -67,9 +89,11 @@ open_doc(#httpdb{url=Url,oauth=OAuth,hea
 open_doc(Db, DocId, Options) ->
     couch_db:open_doc(Db, DocId, Options).
 
+
 update_doc(Db, Doc, Options) ->
     update_doc(Db,Doc,Options,interactive_edit).
 
+
 ensure_full_commit(#httpdb{url=Url,oauth=OAuth,headers=Headers}) ->
     Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers,
     #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
@@ -85,6 +109,7 @@ ensure_full_commit(#httpdb{url=Url,oauth
 ensure_full_commit(Db) ->
     couch_db:ensure_full_commit(Db).
 
+
 get_missing_revs(#httpdb{url=Url,oauth=OAuth,headers=Headers}, IdRevs) ->
     Json = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs],
     Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers,
@@ -109,22 +134,6 @@ get_missing_revs(Db, IdRevs) ->
     couch_db:get_missing_revs(Db, IdRevs).
 
 
-options_to_query_args([], Acc) ->
-    lists:reverse(Acc);
-options_to_query_args([delay_commit|Rest], Acc) ->
-    options_to_query_args(Rest, Acc);
-options_to_query_args([{atts_since,[]}|Rest], Acc) ->
-    options_to_query_args(Rest, Acc);
-options_to_query_args([{atts_since,PossibleAncestors}|Rest], Acc) ->
-    options_to_query_args(Rest, [{"atts_since",?JSON_ENCODE(
-            couch_doc:revs_to_strs(PossibleAncestors))} | Acc]).
-
-query_args_to_string([], []) ->
-    "";
-query_args_to_string([], Acc) ->
-    "?" ++ string:join(lists:reverse(Acc), "&");
-query_args_to_string([{K,V}|Rest], Acc) ->
-    query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]).
 
 open_doc_revs(#httpdb{url=Url,oauth=OAuth,headers=Headers}, Id, Revs, 
         Options, Fun, Acc) ->
@@ -163,6 +172,107 @@ open_doc_revs(Db, Id, Revs, Options, Fun
     {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
     {ok, lists:foldl(Fun, Acc, Results)}.
 
+    
+
+update_doc(#httpdb{url=Url,headers=Headers,oauth=OAuth},Doc,Options,Type) ->
+    QArgs = if Type == replicated_changes ->
+        [{"new_edits", "false"}]; true -> [] end ++ 
+        options_to_query_args(Options, []),
+    
+    Boundary = couch_uuids:random(),
+    JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments,follows|Options])),
+    {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
+            JsonBytes, Doc#doc.atts, false),
+    Self = self(),
+    Headers2 = case lists:member(delay_commit, Options) of 
+            true -> [{"X-Couch-Full-Commit", "false"}];
+            false ->  []
+            end ++ [{"Content-Type", ?b2l(ContentType)}] ++ 
+            oauth_header(Url, QArgs, put, OAuth) ++ Headers,
+    Ref = make_ref(),
+    % this streams the doc data to the ibrowse requester
+    DocStreamer = spawn_link(fun() ->
+                couch_doc:doc_to_multi_part_stream(Boundary,
+                    JsonBytes, Doc#doc.atts,
+                    fun(Data) ->
+                        receive {get_data, Ref, Pid} ->
+                            Pid ! {data, Ref, Data}
+                        end
+                    end,
+                    false),
+                unlink(Self)
+            end),
+    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+    case ibrowse:send_req_direct(Worker, Url ++ couch_util:url_encode(Doc#doc.id) ++ query_args_to_string(QArgs,
[]),
+            [{"Content-Length",Len}|Headers2], put, 
+            {fun(0) ->
+                eof;
+             (LenLeft) when LenLeft > 0 ->
+                DocStreamer ! {get_data, Ref, self()},
+                receive {data, Ref, Data} ->
+                    {ok, Data, LenLeft - iolist_size(Data)}
+                end
+            end, Len}, [], infinity) of
+    {ok, [$2,$0, _], _RespHeaders, Body} ->
+        catch ibrowse:stop_worker_process(Worker),
+        {Props} = ?JSON_DECODE(Body),
+        {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))}
+    end;
+update_doc(Db,Doc,Options,Type) ->
+    couch_db:update_doc(Db,Doc,Options,Type).
+
+changes_since(#httpdb{url=Url,headers=Headers,oauth=OAuth}, Style,
+        StartSeq, UserFun, Acc) ->
+    Url2 = Url ++ "_changes",
+    QArgs = [{"style", atom_to_list(Style)},
+            {"since", integer_to_list(StartSeq)}],
+    Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,        
+    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+    {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, Url2 ++ query_args_to_string(QArgs,
""), 
+            Headers2, get, [], [
+            {response_format,binary},
+            {stream_to, {self(), once}}], infinity),
+    DataFun = fun() ->
+            receive {ibrowse_async_headers, ReqId, "200", _Headers} ->
+                stream_data_self(ReqId)
+            end
+        end,
+    EventFun = fun(Ev) ->
+            changes_ev1(Ev, UserFun, Acc)
+        end,
+    try
+        json_stream_parse:events(DataFun, EventFun)
+    after
+        catch ibrowse:stop_worker_process(Worker)
+    end;
+changes_since(Db, Style, StartSeq, UserFun, Acc) ->
+    couch_db:changes_since(Db, Style, StartSeq, UserFun, Acc).
+
+
+% internal functions
+
+options_to_query_args([], Acc) ->
+    lists:reverse(Acc);
+options_to_query_args([delay_commit|Rest], Acc) ->
+    options_to_query_args(Rest, Acc);
+options_to_query_args([{atts_since,[]}|Rest], Acc) ->
+    options_to_query_args(Rest, Acc);
+options_to_query_args([{atts_since,PossibleAncestors}|Rest], Acc) ->
+    % NOTE, we should limit the # of PossibleAncestors sent. Since a large
+    % # can exceed the max URL length. Limiting the # only results in
+    % attachments being fully copied from source to target, instead of
+    % incrementally.
+    options_to_query_args(Rest, [{"atts_since",?JSON_ENCODE(
+            couch_doc:revs_to_strs(PossibleAncestors))} | Acc]).
+
+query_args_to_string([], []) ->
+    "";
+query_args_to_string([], Acc) ->
+    "?" ++ string:join(lists:reverse(Acc), "&");
+query_args_to_string([{K,V}|Rest], Acc) ->
+    query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]).
 
 receive_docs(Streamer, UserFun, UserAcc) ->
     Streamer ! {get_headers, self()},
@@ -242,82 +352,6 @@ mp_parse_mixed(body_end) ->
         mp_parse_mixed(Next)
     end.
 
-update_doc(#httpdb{url=Url,headers=Headers,oauth=OAuth},Doc,Options,Type) ->
-    QArgs = if Type == replicated_changes ->
-        [{"new_edits", "false"}]; true -> [] end ++ 
-        options_to_query_args(Options, []),
-    
-    Boundary = couch_uuids:random(),
-    JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments,follows|Options])),
-    {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
-            JsonBytes, Doc#doc.atts, false),
-    Self = self(),
-    Headers2 = case lists:member(delay_commit, Options) of 
-            true -> [{"X-Couch-Full-Commit", "false"}];
-            false ->  []
-            end ++ [{"Content-Type", ?b2l(ContentType)}] ++ 
-            oauth_header(Url, QArgs, put, OAuth) ++ Headers,
-    Ref = make_ref(),
-    % this streams the doc data to the ibrowse requester
-    DocStreamer = spawn_link(fun() ->
-                couch_doc:doc_to_multi_part_stream(Boundary,
-                    JsonBytes, Doc#doc.atts,
-                    fun(Data) ->
-                        receive {get_data, Ref, Pid} ->
-                            Pid ! {data, Ref, Data}
-                        end
-                    end,
-                    false),
-                unlink(Self)
-            end),
-    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
-    case ibrowse:send_req_direct(Worker, Url ++ couch_util:url_encode(Doc#doc.id) ++ query_args_to_string(QArgs,
[]),
-            [{"Content-Length",Len}|Headers2], put, 
-            {fun(0) ->
-                eof;
-             (LenLeft) when LenLeft > 0 ->
-                DocStreamer ! {get_data, Ref, self()},
-                receive {data, Ref, Data} ->
-                    {ok, Data, LenLeft - iolist_size(Data)}
-                end
-            end, Len}, [], infinity) of
-    {ok, [$2,$0, _], _RespHeaders, Body} ->
-        catch ibrowse:stop_worker_process(Worker),
-        {Props} = ?JSON_DECODE(Body),
-        {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))}
-    end;
-update_doc(Db,Doc,Options,Type) ->
-    couch_db:update_doc(Db,Doc,Options,Type).
-
-changes_since(#httpdb{url=Url,headers=Headers,oauth=OAuth}, Style,
-        StartSeq, UserFun, Acc) ->
-    Url2 = Url ++ "_changes",
-    QArgs = [{"style", atom_to_list(Style)},
-            {"since", integer_to_list(StartSeq)}],
-    Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,        
-    #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
-    {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, Url2 ++ query_args_to_string(QArgs,
""), 
-            Headers2, get, [], [
-            {response_format,binary},
-            {stream_to, {self(), once}}], infinity),
-    DataFun = fun() ->
-            receive {ibrowse_async_headers, ReqId, "200", _Headers} ->
-                stream_data_self(ReqId)
-            end
-        end,
-    EventFun = fun(Ev) ->
-            changes_ev1(Ev, UserFun, Acc)
-        end,
-    try
-        json_stream_parse:events(DataFun, EventFun)
-    after
-        catch ibrowse:stop_worker_process(Worker)
-    end;
-changes_since(Db, Style, StartSeq, UserFun, Acc) ->
-    couch_db:changes_since(Db, Style, StartSeq, UserFun, Acc).
-
 stream_data_self(ReqId) ->
     ibrowse:stream_next(ReqId),
     receive {ibrowse_async_response, ReqId, Data} ->

Modified: couchdb/branches/new_replicator/src/couchdb/couch_doc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_doc.erl?rev=955773&r1=955772&r2=955773&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_doc.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_doc.erl Thu Jun 17 22:35:49 2010
@@ -309,8 +309,8 @@ att_foldl(#att{data=DataFun,att_len=Len}
 
 att_foldl_decode(#att{data={Fd,Sp},md5=Md5,encoding=Enc}, Fun, Acc) ->
     couch_stream:foldl_decode(Fd, Sp, Md5, Enc, Fun, Acc);
-att_foldl_decode(#att{data=Fun,att_len=Len, encoding=identity}, Fun, Acc) ->
-       fold_streamed_data(Fun, Len, Fun, Acc).
+att_foldl_decode(#att{data=Fun2,att_len=Len, encoding=identity}, Fun, Acc) ->
+       fold_streamed_data(Fun2, Len, Fun, Acc).
 
 att_to_bin(#att{data=Bin}) when is_binary(Bin) ->
     Bin;

Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=955773&r1=955772&r2=955773&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Thu Jun 17 22:35:49 2010
@@ -13,7 +13,7 @@
 -module(couch_httpd_rep).
 
 -include("couch_db.hrl").
--include("couch_replicate.hrl").
+-include("couch_api_wrap.hrl").
 
 -import(couch_httpd,
     [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,
@@ -24,6 +24,23 @@
     
 -export([handle_req/1]).
 
+
+handle_req(#httpd{method='POST'}=Req) ->
+    {PostBody} = couch_httpd:json_body_obj(Req),
+    SrcDb = parse_rep_db(couch_util:get_value(<<"source">>, PostBody)),
+    TgtDb = parse_rep_db(couch_util:get_value(<<"target">>, PostBody)),
+    Options = convert_options(PostBody),
+    try couch_replicate:start(SrcDb, TgtDb, Options, Req#httpd.user_ctx) of
+    {ok, {HistoryResults}} ->
+        send_json(Req, {[{ok, true} | HistoryResults]})
+    catch
+    throw:{db_not_found, Msg} ->
+        send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]})
+    end;
+handle_req(Req) ->
+    send_method_not_allowed(Req, "POST").
+
+
 maybe_add_trailing_slash(Url) ->
     re:replace(Url, "[^/]$", "&/", [{return, list}]).
 
@@ -83,17 +100,3 @@ convert_options([_|R])-> % skip unknown 
     convert_options(R).
 
 
-handle_req(#httpd{method='POST'}=Req) ->
-    {PostBody} = couch_httpd:json_body_obj(Req),
-    SrcDb = parse_rep_db(couch_util:get_value(<<"source">>, PostBody)),
-    TgtDb = parse_rep_db(couch_util:get_value(<<"target">>, PostBody)),
-    Options = convert_options(PostBody),
-    try couch_replicate:start(SrcDb, TgtDb, Options, Req#httpd.user_ctx) of
-    {ok, {HistoryResults}} ->
-        send_json(Req, {[{ok, true} | HistoryResults]})
-    catch
-    throw:{db_not_found, Msg} ->
-        send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]})
-    end;
-handle_req(Req) ->
-    send_method_not_allowed(Req, "POST").

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=955773&r1=955772&r2=955773&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Thu Jun 17 22:35:49 2010
@@ -15,7 +15,7 @@
 -export([start/4]).
 
 -include("couch_db.hrl").
--include("couch_replicate.hrl").
+-include("couch_api_wrap.hrl").
 
 
 -record(rep_state, {
@@ -49,37 +49,40 @@ start(Src, Tgt, Options, UserCtx) ->
     _Continuous = proplists:get_value(continuous, Options, false),
     _CreateTarget = proplists:get_value(create_target, Options, false),
     
+    % initalize the replication state, looking for existing rep records
+    % for incremental replication.
     #rep_state{source=Source,target=Target,start_seq=StartSeq} = State = 
             init_state(Src, Tgt, Options, UserCtx), 
     
+    % Create the work queues
     {ok, ChangesQueue} = couch_work_queue:new(100000, 500),
     {ok, MissingRevsQueue} = couch_work_queue:new(100000, 500),
     
+    % this is starts the _changes reader process. It adds the changes from
+    % the source db to the ChangesQueue.
     spawn_changes_reader(self(), StartSeq, Source, ChangesQueue),
+    
+    % this starts the missing revs finder, it checks the target for changes
+    % in the ChangesQueue to see if they exist on the target or not. If not, 
+    % adds them to MissingRevsQueue.
     spawn_missing_revs_finder(self(), Target, ChangesQueue, MissingRevsQueue),
+    
+    % This starts the doc copy process. It gets the documents from the
+    % MissingRevsQueue, copying them from the source to the target database.
     spawn_doc_copy(self(), Source, Target, MissingRevsQueue),
     
+    % This is the checkpoint loop, it updates the replication record in the
+    % database every X seconds, so that if the replication is interuppted,
+    % it can restart near where it left off.
     {ok, State2, _Stats} = checkpoint_loop(State, gb_trees:from_orddict([]),
             #stats{}),
+    couch_api_wrap:db_close(Source),        
+    couch_api_wrap:db_close(Target),
     {ok, State2#rep_state.checkpoint_history}.
-    
 
-spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue) ->
-    spawn_link(
-        fun()->
-            couch_api_wrap:changes_since(Source, all_docs, StartSeq,
-                fun(#doc_info{high_seq=Seq,revs=Revs}=DocInfo, _)->
-                    Cp ! {seq_start, {Seq, length(Revs)}},
-                    Cp ! {add_stat, {#stats.missing_checked, length(Revs)}},
-                    ok = couch_work_queue:queue(ChangesQueue, DocInfo),
-                    {ok, ok}
-                end, ok),
-            couch_work_queue:close(ChangesQueue)
-        end).
 
 
 init_state(Src,Tgt,Options,UserCtx)->    
-    
     {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
     {ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}]),
 
@@ -118,15 +121,134 @@ init_state(Src,Tgt,Options,UserCtx)->   
             self(), timed_checkpoint)}.
 
 
+spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue) ->
+    spawn_link(
+        fun()->
+            couch_api_wrap:changes_since(Source, all_docs, StartSeq,
+                fun(#doc_info{high_seq=Seq,revs=Revs}=DocInfo, _)->
+                    Cp ! {seq_start, {Seq, length(Revs)}},
+                    Cp ! {add_stat, {#stats.missing_checked, length(Revs)}},
+                    ok = couch_work_queue:queue(ChangesQueue, DocInfo),
+                    {ok, ok}
+                end, ok),
+            couch_work_queue:close(ChangesQueue)
+        end).
+
+
+spawn_missing_revs_finder(StatsProcess, 
+        Target, ChangesQueue, MissingRevsQueue) ->
+    % Note, we could spawn more missing revs processes here. Before that's
+    % possible the work_queue code needs to be modified to work with multiple
+    % dequeueing processes
+    spawn_link(fun() ->
+        missing_revs_finder_loop(StatsProcess, 
+                Target, ChangesQueue, MissingRevsQueue)
+        end).
+
+
+missing_revs_finder_loop(Cp, 
+        Target, ChangesQueue, MissingRevsQueue) ->
+    case couch_work_queue:dequeue(ChangesQueue) of
+    closed ->
+        couch_work_queue:close(MissingRevsQueue);
+    {ok, DocInfos} ->
+        IdRevs = [{Id, [Rev || #rev_info{rev=Rev} <- RevsInfo]} ||
+                #doc_info{id=Id,revs=RevsInfo} <- DocInfos],
+        {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs),
+        % Figured out which on the target are missing.
+        % Missing contains the id and revs missing, and any possible
+        % ancestors that already exist on the target. This enables
+        % incremental attachment replication, so the source only needs to send
+        % attachments modified since the common ancestor on target.
+
+        % Signal to the checkpointer any that are already on the target are
+        % now complete.
+        IdRevsSeqDict = dict:from_list(
+            [{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} ||
+                    #doc_info{id=Id,revs=RevsInfo,high_seq=Seq} <- DocInfos]),
+        NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing),
+        % signal the completion of these that aren't missing
+        lists:foreach(fun({_Id, {Revs, Seq}})->
+                Cp ! {seq_changes_done, {Seq, length(Revs)}}
+            end, dict:to_list(NonMissingIdRevsSeqDict)),
+
+        % Expand out each docs and seq into it's own work item
+        lists:foreach(fun({Id, Revs, PAs})->
+            % PA means "possible ancestor"
+            Cp ! {add_stat, {#stats.missing_found, length(Revs)}},
+            {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
+            ok = couch_work_queue:queue(MissingRevsQueue,
+                {Id, Revs, PAs, Seq})
+            end, Missing),
+        missing_revs_finder_loop(Cp, Target, ChangesQueue, 
+                MissingRevsQueue)
+    end.
+
+
+remove_missing(IdRevsSeqDict, []) ->
+    IdRevsSeqDict;
+remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _}|Rest]) ->
+    {AllChangedRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict),
+    case AllChangedRevs -- MissingRevs of
+    [] ->
+        remove_missing(dict:erase(MissingId, IdRevsSeqDict), Rest);
+    NotMissingRevs ->
+        IdRevsSeqDict2 =
+                dict:store(MissingId, {NotMissingRevs, Seq}, IdRevsSeqDict),
+        remove_missing(IdRevsSeqDict2, Rest)
+    end.
+
+
+spawn_doc_copy(Cp, Source, Target, MissingRevsQueue) ->
+    % Note, we could spawn many doc copy process here. Before that's possible
+    % the work_queue code needs to be modified to work with multiple
+    % dequeueing processes
+    spawn_link(fun() ->
+        doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
+    end).
+
+
+doc_copy_loop(Cp, Source, Target, MissingRevsQueue) ->
+    case couch_work_queue:dequeue(MissingRevsQueue,1) of
+    closed ->
+        Cp ! done;
+    {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
+        couch_api_wrap:open_doc_revs(Source, Id, Revs,
+                [{atts_since,PossibleAncestors}],
+                fun({ok, Doc}, _) ->
+                    % we are called for every rev read on the source
+                    Cp ! {add_stat, {#stats.docs_read, 1}},
+                    % now write the doc to the target.
+                    case couch_api_wrap:update_doc(Target, Doc, [],
+                            replicated_changes) of
+                    {ok, _} ->
+                        Cp ! {add_stat, {#stats.docs_written, 1}};
+                    _Error ->
+                        Cp ! {add_stat, {#stats.doc_write_failures, 1}}
+                    end;
+                (_, _) ->
+                    ok
+                end, []),
+        Cp ! {seq_changes_done, {Seq, length(Revs)}},
+        doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
+    end.
+
 checkpoint_loop(State, SeqsInProgress, Stats) ->
+    % SeqsInProgress contains the number of revs for each seq foiund by the
+    % changes process.
     receive
     {seq_start, {Seq, NumChanges}} ->
+        % Add this seq to the SeqsInProgress
         SeqsInProgress2 = gb_trees:insert(Seq, NumChanges, SeqsInProgress),
         checkpoint_loop(State, SeqsInProgress2, Stats);
     {seq_changes_done, {Seq, NumChangesDone}} ->
+        % decrement the # changes for this seq by NumChangesDone 
         TotalChanges = gb_trees:get(Seq, SeqsInProgress),
         case TotalChanges - NumChangesDone of
         0 ->
+            % this seq is completely processed. Chck to see if it was the
+            % smallest seq in progess. If so, we've made progress that can
+            % be checkpointed.
             State2 =
             case gb_trees:smallest(SeqsInProgress) of
             {Seq, _} ->
@@ -137,16 +259,19 @@ checkpoint_loop(State, SeqsInProgress, S
             checkpoint_loop(State2, 
                     gb_trees:delete(Seq,SeqsInProgress), Stats);
         NewTotalChanges when NewTotalChanges > 0 ->
+            % Still some changes that need work done. Put the new count back.
             SeqsInProgress2 =
                 gb_trees:update(Seq, NewTotalChanges, SeqsInProgress),
             checkpoint_loop(State, SeqsInProgress2, Stats)
         end;
     {add_stat, {StatPos, Val}} ->
+        % Increment the stat at the pos.
         Stat = element(StatPos, Stats),
         Stats2 = setelement(StatPos, Stats, Stat + Val),
         checkpoint_loop(State, SeqsInProgress, Stats2);
     done ->
-        io:format("checkpoint_loop done~n"),
+        % This means all the worker processes have completed their work.
+        % Assert that all the seqs have been processed
         0 = gb_trees:size(SeqsInProgress),
         State2 = do_checkpoint(State, Stats),
         erlang:cancel_timer(State2#rep_state.timer),
@@ -155,6 +280,7 @@ checkpoint_loop(State, SeqsInProgress, S
         end,
         {ok, State2, Stats};
     timed_checkpoint ->
+        % every checkpoint interval while processing
         State2 = do_checkpoint(State, Stats),
         Timer = erlang:start_timer(checkpoint_interval(State), 
                 self(), timed_checkpoint),
@@ -250,89 +376,6 @@ commit_to_both(Source, Target) ->
     {SourceStartTime, TargetStartTime}.
 
 
-spawn_missing_revs_finder(StatsProcess, 
-        Target, ChangesQueue, MissingRevsQueue) ->
-    spawn_link(fun() ->
-        missing_revs_finder_loop(StatsProcess, 
-                Target, ChangesQueue, MissingRevsQueue)
-        end).
-
-
-remove_missing(IdRevsSeqDict, []) ->
-    IdRevsSeqDict;
-remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _}|Rest]) ->
-    {AllChangedRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict),
-    case AllChangedRevs -- MissingRevs of
-    [] ->
-        remove_missing(dict:erase(MissingId, IdRevsSeqDict), Rest);
-    NotMissingRevs ->
-        IdRevsSeqDict2 =
-                dict:store(MissingId, {NotMissingRevs, Seq}, IdRevsSeqDict),
-        remove_missing(IdRevsSeqDict2, Rest)
-    end.
-
-
-missing_revs_finder_loop(Cp, 
-        Target, ChangesQueue, MissingRevsQueue) ->
-    case couch_work_queue:dequeue(ChangesQueue) of
-    closed ->
-        io:format("missing_revs_finder_loop done~n"),
-        couch_work_queue:close(MissingRevsQueue);
-    {ok, DocInfos} ->
-        IdRevs = [{Id, [Rev || #rev_info{rev=Rev} <- RevsInfo]} ||
-                #doc_info{id=Id,revs=RevsInfo} <- DocInfos],
-        {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs),
-        IdRevsSeqDict = dict:from_list(
-            [{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} ||
-                    #doc_info{id=Id,revs=RevsInfo,high_seq=Seq} <- DocInfos]),
-        NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing),
-        % signal the completion of these that aren't missing
-        lists:foreach(fun({_Id, {Revs, Seq}})->
-                Cp ! {seq_changes_done, {Seq, length(Revs)}}
-            end, dict:to_list(NonMissingIdRevsSeqDict)),
-        % Expand out each into it's own work item
-        lists:foreach(fun({Id, Revs, PAs})->
-            % PA means "possible ancestor"
-            Cp ! {add_stat, {#stats.missing_found, length(Revs)}},
-            {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
-            ok = couch_work_queue:queue(MissingRevsQueue,
-                {Id, Revs, PAs, Seq})
-            end, Missing),
-        missing_revs_finder_loop(Cp, Target, ChangesQueue, 
-                MissingRevsQueue)
-    end.
-
-
-spawn_doc_copy(Cp, Source, Target, MissingRevsQueue) ->
-    spawn_link(fun() ->
-        doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
-    end).
-
-
-doc_copy_loop(Cp, Source, Target, MissingRevsQueue) ->
-    case couch_work_queue:dequeue(MissingRevsQueue,1) of
-    closed ->
-        io:format("doc_copy_loop done~n"),
-        Cp ! done;
-    {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
-        couch_api_wrap:open_doc_revs(Source, Id, Revs,
-                [{atts_since,PossibleAncestors}],
-                fun({ok, Doc}, _) ->
-                    Cp ! {add_stat, {#stats.docs_read, 1}},
-                    case couch_api_wrap:update_doc(Target, Doc, [],
-                            replicated_changes) of
-                    {ok, _} ->
-                        Cp ! {add_stat, {#stats.docs_written, 1}};
-                    _Error ->
-                        Cp ! {add_stat, {#stats.doc_write_failures, 1}}
-                    end;
-                (_, _) ->
-                    io:format("doc error!!!!!!~n"),
-                    ok
-                end, []),
-        Cp ! {seq_changes_done, {Seq, length(Revs)}},
-        doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
-    end.
 
 
 make_replication_id(Source, Target, UserCtx, Options) ->



Mime
View raw message