couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1031266 - in /couchdb/branches/new_replicator/src/couchdb: couch_api_wrap.erl couch_replicator_doc_copiers.erl
Date Thu, 04 Nov 2010 21:36:27 GMT
Author: fdmanana
Date: Thu Nov  4 21:36:27 2010
New Revision: 1031266

URL: http://svn.apache.org/viewvc?rev=1031266&view=rev
Log:
New replicator:

- transformed the doc copy process into a gen_server;
- allow for parallel reads;
- flush document buffers after a threshold is reached in order to avoid too large buffers


Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1031266&r1=1031265&r2=1031266&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Thu Nov  4 21:36:27 2010
@@ -278,25 +278,26 @@ update_docs(#httpdb{} = HttpDb, DocList,
     FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
     Part1 = case UpdateType of
     replicated_changes ->
-        <<"{\"new_edits\":false,\"docs\":[">>;
+        {prefix, <<"{\"new_edits\":false,\"docs\":[">>};
     interactive_edit ->
-        <<"{\"docs\":[">>
+        {prefix, <<"{\"docs\":[">>}
     end,
     BodyFun = fun(eof) ->
             eof;
         ([]) ->
             {ok, <<"]}">>, eof};
-        ([Part | RestParts]) when is_binary(Part) ->
-            {ok, Part, RestParts};
-        ([Doc | RestParts]) ->
+        ([{prefix, Prefix} | Rest]) ->
+            {ok, Prefix, Rest};
+        ([Doc]) when is_binary(Doc) ->
+            {ok, Doc, []};
+        ([Doc | RestDocs]) when is_binary(Doc) ->
+            {ok, [Doc, ","], RestDocs};
+        ([Doc]) when is_record(Doc, doc) ->
             DocJson = couch_doc:to_json_obj(Doc, [revs, attachments]),
-            Data = case RestParts of
-            [] ->
-                ?JSON_ENCODE(DocJson);
-            _ ->
-                [?JSON_ENCODE(DocJson), ","]
-            end,
-            {ok, Data, RestParts}
+            {ok, ?JSON_ENCODE(DocJson), []};
+        ([Doc | RestDocs]) when is_record(Doc, doc) ->
+            DocJson = couch_doc:to_json_obj(Doc, [revs, attachments]),
+            {ok, [?JSON_ENCODE(DocJson), ","], RestDocs}
     end,
     send_req(
         HttpDb,

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1031266&r1=1031265&r2=1031266&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Thu Nov 
4 21:36:27 2010
@@ -11,112 +11,286 @@
 % the License.
 
 -module(couch_replicator_doc_copiers).
+-behaviour(gen_server).
 
+% public API
 -export([spawn_doc_copiers/5]).
 
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
 -include("couch_db.hrl").
 -include("couch_api_wrap.hrl").
 
+-define(MAX_PARALLEL_FETCHS, 10).
+-define(DOC_BUFFER_BYTE_SIZE, 1024 * 1024). % for remote targets
+-define(DOC_BUFFER_LEN, 100).               % for local targets, # of documents
+-define(MAX_BULK_ATT_SIZE, 64 * 1024).
+
+-record(state, {
+    loop,
+    cp,
+    source,
+    target,
+    report_seq = nil,
+    docs = [],
+    size_docs = 0,
+    readers = [],
+    writer = nil,
+    pending_fetch = nil,
+    pending_flush = nil,
+    stats = #rep_stats{}
+}).
+
 
 spawn_doc_copiers(Cp, Source, Target, MissingRevsQueue, CopiersCount) ->
     lists:map(
         fun(_) ->
-            spawn_link(fun() ->
-                doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
-            end)
+            {ok, Pid} = start_link(Cp, Source, Target, MissingRevsQueue),
+            Pid
         end,
         lists:seq(1, CopiersCount)).
 
 
--record(doc_acc, {
-    docs = [],
-    read = 0,
-    written = 0,
-    wfail = 0
-}).
+start_link(Cp, Source, Target, MissingRevsQueue) ->
+    gen_server:start_link(?MODULE, {Cp, Source, Target, MissingRevsQueue}, []).
 
-doc_copy_loop(Cp, Source, Target, MissingRevsQueue) ->
-    Result = case couch_work_queue:dequeue(MissingRevsQueue, 1) of
-    closed ->
-        stop;
 
-    {ok, [{doc_id, _} | _] = DocIds} ->
-        Acc = lists:foldl(
-            fun({doc_id, Id}, Acc) ->
-                ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [self(), Id]),
-                {ok, Acc2} = couch_api_wrap:open_doc_revs(
-                    Source, Id, all, [],
-                    fun(R, A) -> doc_handler(R, Target, A) end, Acc),
-                Acc2
-            end,
-            #doc_acc{}, DocIds),
-        {Source, Acc, nil};
+init({Cp, Source, Target, MissingRevsQueue}) ->
+    process_flag(trap_exit, true),
+    Parent = self(),
+    LoopPid = spawn_link(
+        fun() -> queue_fetch_loop(Parent, MissingRevsQueue) end
+    ),
+    State = #state{
+        cp = Cp,
+        loop = LoopPid,
+        source = Source,
+        target = Target
+    },
+    {ok, State}.
 
-    {ok, [{ReportSeq, IdRevList}]} ->
-        {NewSource, Acc} = lists:foldl(
-            fun({Id, Revs, PossibleAncestors, Seq} = IdRev, {SrcDb, BulkAcc}) ->
-                ?LOG_DEBUG("Doc copier ~p got ~p", [self(), IdRev]),
-                SrcDb2 = couch_api_wrap:maybe_reopen_db(SrcDb, Seq),
-                {ok, BulkAcc2} = couch_api_wrap:open_doc_revs(
-                    SrcDb2, Id, Revs, [{atts_since, PossibleAncestors}],
-                    fun(R, A) -> doc_handler(R, Target, A) end,
-                    BulkAcc),
-                {SrcDb2, BulkAcc2}
-            end,
-            {Source, #doc_acc{}}, IdRevList),
-        {NewSource, Acc, ReportSeq}
+
+handle_call({fetch_doc, Params}, {Pid, _} = From,
+    #state{loop = Pid, readers = Readers, pending_fetch = nil} = State) ->
+    case length(Readers) of
+    Size when Size < ?MAX_PARALLEL_FETCHS ->
+        {Fetcher, Source2} = spawn_doc_reader(State#state.source, Params),
+        NewState = State#state{
+            source = Source2,
+            readers = [Fetcher | Readers]
+        },
+        {reply, ok, NewState};
+    _ ->
+        {noreply, State#state{pending_fetch = {From, Params}}}
+    end;
+
+handle_call({add_doc, Doc}, _From, #state{target = Target} = State) ->
+    #state{docs = DocAcc, size_docs = SizeAcc, stats = S} = State,
+    {DocAcc2, SizeAcc2, W, F} = maybe_flush_docs(Target, DocAcc, SizeAcc, Doc),
+    NewStats = S#rep_stats{
+        docs_read = S#rep_stats.docs_read + 1,
+        docs_written = S#rep_stats.docs_written + W,
+        doc_write_failures = S#rep_stats.doc_write_failures + F
+    },
+    NewState = State#state{
+        docs = DocAcc2, size_docs = SizeAcc2, stats = NewStats
+    },
+    {reply, ok, NewState};
+
+handle_call({add_write_stats, Stats}, _From, State) ->
+    #rep_stats{
+        docs_written = W, doc_write_failures = Wf
+    } = S = State#state.stats,
+    NewStats = S#rep_stats{
+        docs_written = W + Stats#rep_stats.docs_written,
+        doc_write_failures = Wf + Stats#rep_stats.doc_write_failures
+    },
+    {reply, ok, State#state{stats = NewStats}};
+
+handle_call({flush, ReportSeq}, {Pid, _} = From,
+    #state{loop = Pid, writer = nil, report_seq = nil,
+        pending_flush = nil} = State) ->
+    State2 = case State#state.readers of
+    [] ->
+        Writer = spawn_writer(State#state.target, State#state.docs),
+        State#state{writer = Writer};
+    _ ->
+        State
     end,
+    {noreply, State2#state{report_seq = ReportSeq, pending_flush = From}}.
+
+
+handle_cast(Msg, State) ->
+    {stop, {unexpected_async_call, Msg}, State}.
 
-    case Result of
-    {Source2, DocAcc, SeqDone} ->
-        #doc_acc{
-            written = W,
-            read = R
-        } = DocAcc2 = bulk_write_docs(DocAcc, Target),
-        DocAcc2 = bulk_write_docs(DocAcc, Target),
-        seq_done(SeqDone, Cp),
-        send_stats(DocAcc2, Cp),
-        ?LOG_DEBUG("Replicator copy process: "
-            "read ~p documents, wrote ~p documents", [R, W]),
-        doc_copy_loop(Cp, Source2, Target, MissingRevsQueue);
-    stop ->
-        ok
-    end.
 
+handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) ->
+    #state{
+        docs = [], readers = [], writer = nil,
+        pending_fetch = nil, pending_flush = nil
+    } = State,
+    {stop, normal, State};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = Pid, cp = Cp} = State) ->
+    Stats = State#state.stats,
+    ok = gen_server:cast(Cp, {add_stats, Stats}),
+    seq_done(Cp, State#state.report_seq),
+    gen_server:reply(State#state.pending_flush, ok),
+    NewState = State#state{
+        report_seq = nil,
+        writer = nil,
+        docs = [],
+        stats = #rep_stats{},
+        pending_flush = nil
+    },
+    ?LOG_DEBUG("Replication copy process, read ~p documents, wrote ~p documents",
+        [Stats#rep_stats.docs_read, Stats#rep_stats.docs_written]),
+    {noreply, NewState};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
+    #state{
+        readers = Readers, writer = Writer, docs = Docs,
+        source = Source, target = Target,
+        pending_fetch = Fetch, pending_flush = Flush
+    } = State,
+    case Readers -- [Pid] of
+    Readers ->
+        % an ibrowse worker
+        {noreply, State};
+    Readers2 ->
+        State2 = case Fetch of
+        nil ->
+            case (Flush =/= nil) andalso (Writer =:= nil) andalso
+                (Readers2 =:= [])  of
+            true ->
+                State#state{
+                    readers = Readers2,
+                    writer = spawn_writer(Target, Docs)
+                };
+            false ->
+                State#state{readers = Readers2}
+            end;
+        {From, FetchParams} ->
+            {Fetcher, Source2} = spawn_doc_reader(Source, FetchParams),
+            gen_server:reply(From, ok),
+            State#state{
+                source = Source2,
+                readers = [Fetcher | Readers2],
+                pending_fetch = nil
+            }
+        end,
+        {noreply, State2}
+    end;
 
-doc_handler({ok, #doc{atts = []} = Doc}, _Target, Acc) ->
-    update_bulk_doc_acc(Acc, Doc);
+handle_info({'EXIT', Pid, Reason}, State) ->
+   {stop, {process_died, Pid, Reason}, State}.
 
-doc_handler({ok, Doc}, Target, Acc) ->
-    write_doc(Doc, Target, Acc);
 
-doc_handler(_, _, Acc) ->
-    Acc.
+terminate(_Reason, _State) ->
+    ok.
 
 
-update_bulk_doc_acc(#doc_acc{docs = DocAcc, read = Read} = Acc, Doc) ->
-    Acc#doc_acc{docs = [Doc | DocAcc], read = Read + 1}.
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
 
 
-write_doc(Doc, Db, #doc_acc{written = W, wfail = F, read = R} = Acc) ->
-    case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
-    {ok, _} ->
-        Acc#doc_acc{written = W + 1, read = R + 1};
-    {error, <<"unauthorized">>} ->
-        ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
-            [Doc#doc.id, couch_api_wrap:db_uri(Db)]),
-        Acc#doc_acc{wfail = F + 1, read = R + 1};
-    _ ->
-        Acc#doc_acc{wfail = F + 1, read = R + 1}
+queue_fetch_loop(Parent, MissingRevsQueue) ->
+    case couch_work_queue:dequeue(MissingRevsQueue, 1) of
+    closed ->
+        ok;
+
+    {ok, [{doc_id, _} | _] = DocIdList} ->
+        lists:foreach(
+            fun({doc_id, Id}) ->
+                ok = gen_server:call(
+                    Parent, {fetch_doc, {Id, all, [], 0}}, infinity)
+            end,
+            DocIdList),
+        ok = gen_server:call(Parent, {flush, nil}, infinity),
+        queue_fetch_loop(Parent, MissingRevsQueue);
+
+    {ok, [{ReportSeq, IdRevList}]} ->
+        lists:foreach(
+            fun({Id, Revs, PAs, Seq}) ->
+                ok = gen_server:call(
+                    Parent, {fetch_doc, {Id, Revs, PAs, Seq}}, infinity)
+            end,
+            IdRevList),
+        ok = gen_server:call(Parent, {flush, ReportSeq}, infinity),
+        queue_fetch_loop(Parent, MissingRevsQueue)
     end.
 
 
-bulk_write_docs(#doc_acc{docs = []} = Acc, _) ->
-    Acc;
-bulk_write_docs(#doc_acc{docs = Docs, written = W, wfail = Wf} = Acc, Db) ->
+spawn_doc_reader(Source, {_, _, _, Seq} = FetchParams) ->
+    Source2 = couch_api_wrap:maybe_reopen_db(Source, Seq),
+    Parent = self(),
+    Pid = spawn_link(fun() -> fetch_doc(Parent, Source2, FetchParams) end),
+    {Pid, Source2}.
+
+
+fetch_doc(Parent, Source, {Id, Revs, PAs, _Seq}) ->
+    couch_api_wrap:open_doc_revs(
+        Source, Id, Revs, [{atts_since, PAs}], fun doc_handler/2, Parent).
+
+
+doc_handler({ok, Doc}, Parent) ->
+    ok = gen_server:call(Parent, {add_doc, Doc}, infinity),
+    Parent;
+doc_handler(_, Parent) ->
+    Parent.
+
+
+spawn_writer(Target, DocList) ->
+    Parent = self(),
+    spawn_link(
+        fun() ->
+            {Written, Failed} = flush_docs(Target, DocList),
+            Stats = #rep_stats{
+                docs_written = Written,
+                doc_write_failures = Failed
+            },
+            ok = gen_server:call(Parent, {add_write_stats, Stats}, infinity)
+        end).
+
+
+maybe_flush_docs(#httpdb{} = Target, DocAcc, SizeAcc, Doc) ->
+    case lists:any(
+        fun(Att) -> Att#att.disk_len > ?MAX_BULK_ATT_SIZE end, Doc#doc.atts) of
+    true ->
+        {Written, Failed} = flush_docs(Target, Doc),
+        {DocAcc, SizeAcc, Written, Failed};
+    false ->
+        JsonDoc = iolist_to_binary(
+            ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments]))),
+        case SizeAcc + byte_size(JsonDoc) of
+        SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE ->
+            {Written, Failed} = flush_docs(Target, [JsonDoc | DocAcc]),
+            {[], 0, Written, Failed};
+        SizeAcc2 ->
+            {[JsonDoc | DocAcc], SizeAcc2, 0, 0}
+        end
+    end;
+
+maybe_flush_docs(#db{} = Target, DocAcc, SizeAcc, #doc{atts = []} = Doc) ->
+    case SizeAcc + 1 of
+    SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
+        {Written, Failed} = flush_docs(Target, [Doc | DocAcc]),
+        {[], 0, Written, Failed};
+    SizeAcc2 ->
+        {[Doc | DocAcc], SizeAcc2, 0, 0}
+    end;
+
+maybe_flush_docs(#db{} = Target, DocAcc, SizeAcc, Doc) ->
+    {Written, Failed} = flush_docs(Target, Doc),
+    {DocAcc, SizeAcc, Written, Failed}.
+
+
+flush_docs(Target, DocList) when is_list(DocList) ->
     {ok, Errors} = couch_api_wrap:update_docs(
-        Db, Docs, [delay_commit], replicated_changes),
-    DbUri = couch_api_wrap:db_uri(Db),
+        Target, DocList, [delay_commit], replicated_changes),
+    DbUri = couch_api_wrap:db_uri(Target),
     lists:foreach(
         fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>}
]}) ->
                 ?LOG_ERROR("Replicator: unauthorized to write document"
@@ -124,22 +298,22 @@ bulk_write_docs(#doc_acc{docs = Docs, wr
             (_) ->
                 ok
         end, Errors),
-    Acc#doc_acc{
-        wfail = Wf + length(Errors),
-        written = W + length(Docs) - length(Errors)
-    }.
+    {length(DocList) - length(Errors), length(Errors)};
+
+flush_docs(Target, Doc) ->
+    case couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
+    {ok, _} ->
+        {1, 0};
+    {error, <<"unauthorized">>} ->
+        ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
+            [Doc#doc.id, couch_api_wrap:db_uri(Target)]),
+        {0, 1};
+    _ ->
+        {0, 1}
+    end.
 
 
-seq_done(nil, _Cp) ->
+seq_done(_Cp, nil) ->
     ok;
-seq_done(SeqDone, Cp) ->
+seq_done(Cp, SeqDone) ->
     ok = gen_server:cast(Cp, {seq_done, SeqDone}).
-
-
-send_stats(#doc_acc{read = R, written = W, wfail = Wf}, Cp) ->
-    Stats = #rep_stats{
-        docs_read = R,
-        docs_written = W,
-        doc_write_failures = Wf
-    },
-    ok = gen_server:cast(Cp, {add_stats, Stats}).



Mime
View raw message