couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject [21/50] git commit: Replicator: update task status more often
Date Wed, 26 Oct 2011 18:05:33 GMT
Replicator: update task status more often

Instead of having workers report their stats only when they
finish a batch, allow them to report them more often so that
the replication entry in _active_tasks is updated more often.
This avoids giving the wrong idea to applications that the
replicator is stuck when workers are processing heavy batches.


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/d480f654
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/d480f654
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/d480f654

Branch: refs/heads/1319-large-headers-are-corrupted
Commit: d480f654f8a12ccd4ee0c0640290304d2ae0b054
Parents: 0dd0168
Author: Filipe David Manana <fdmanana@apache.org>
Authored: Mon Oct 3 11:50:04 2011 -0700
Committer: Filipe David Manana <fdmanana@apache.org>
Committed: Mon Oct 3 11:50:04 2011 -0700

----------------------------------------------------------------------
 src/couchdb/couch_replicator.erl        |   25 +---
 src/couchdb/couch_replicator_utils.erl  |   13 ++
 src/couchdb/couch_replicator_worker.erl |  197 +++++++++++++-------------
 3 files changed, 115 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/d480f654/src/couchdb/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_replicator.erl b/src/couchdb/couch_replicator.erl
index 24f87cb..efea373 100644
--- a/src/couchdb/couch_replicator.erl
+++ b/src/couchdb/couch_replicator.erl
@@ -385,6 +385,11 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State)
->
 handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
     {reply, {ok, Rep}, State};
 
+handle_call({add_stats, Stats}, From, State) ->
+    gen_server:reply(From, ok),
+    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
+    {noreply, State#rep_state{stats = NewStats}};
+
 handle_call({report_seq_done, Seq, StatsInc}, From,
     #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
         current_through_seq = ThroughSeq, stats = Stats} = State) ->
@@ -410,7 +415,7 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
             NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
     SourceCurSeq = source_cur_seq(State),
     NewState = State#rep_state{
-        stats = sum_stats([Stats, StatsInc]),
+        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
         current_through_seq = NewThroughSeq,
         seqs_in_progress = NewSeqsInProgress,
         highest_seq_done = NewHighestDone,
@@ -888,24 +893,6 @@ has_session_id(SessionId, [{Props} | Rest]) ->
     end.
 
 
-sum_stats([Stats1 | RestStats]) ->
-    lists:foldl(
-        fun(Stats, Acc) ->
-            #rep_stats{
-                missing_checked = Stats#rep_stats.missing_checked +
-                    Acc#rep_stats.missing_checked,
-                missing_found = Stats#rep_stats.missing_found +
-                    Acc#rep_stats.missing_found,
-                docs_read = Stats#rep_stats.docs_read + Acc#rep_stats.docs_read,
-                docs_written = Stats#rep_stats.docs_written +
-                    Acc#rep_stats.docs_written,
-                doc_write_failures = Stats#rep_stats.doc_write_failures +
-                    Acc#rep_stats.doc_write_failures
-            }
-        end,
-        Stats1, RestStats).
-
-
 db_monitor(#db{} = Db) ->
     couch_db:monitor(Db);
 db_monitor(_HttpDb) ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/d480f654/src/couchdb/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_replicator_utils.erl b/src/couchdb/couch_replicator_utils.erl
index 905b794..3c61400 100644
--- a/src/couchdb/couch_replicator_utils.erl
+++ b/src/couchdb/couch_replicator_utils.erl
@@ -16,6 +16,7 @@
 -export([open_db/1, close_db/1]).
 -export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
 -export([replication_id/2]).
+-export([sum_stats/2]).
 
 -include("couch_db.hrl").
 -include("couch_api_wrap.hrl").
@@ -360,3 +361,15 @@ stop_db_compaction_notifier(nil) ->
     ok;
 stop_db_compaction_notifier(Notifier) ->
     couch_db_update_notifier:stop(Notifier).
+
+
+sum_stats(#rep_stats{} = S1, #rep_stats{} = S2) ->
+    #rep_stats{
+        missing_checked =
+            S1#rep_stats.missing_checked + S2#rep_stats.missing_checked,
+        missing_found = S1#rep_stats.missing_found + S2#rep_stats.missing_found,
+        docs_read = S1#rep_stats.docs_read + S2#rep_stats.docs_read,
+        docs_written = S1#rep_stats.docs_written + S2#rep_stats.docs_written,
+        doc_write_failures =
+            S1#rep_stats.doc_write_failures + S2#rep_stats.doc_write_failures
+    }.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/d480f654/src/couchdb/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_replicator_worker.erl b/src/couchdb/couch_replicator_worker.erl
index 18320cb..aa04dc5 100644
--- a/src/couchdb/couch_replicator_worker.erl
+++ b/src/couchdb/couch_replicator_worker.erl
@@ -29,6 +29,10 @@
 -define(DOC_BUFFER_LEN, 10).                 % for local targets, # of documents
 -define(MAX_BULK_ATT_SIZE, 64 * 1024).
 -define(MAX_BULK_ATTS_PER_DOC, 8).
+-define(STATS_DELAY, 10000000).              % 10 seconds (in microseconds)
+
+-define(inc_stat(StatPos, Stats, Inc),
+    setelement(StatPos, Stats, element(StatPos, Stats) + Inc)).
 
 -import(couch_replicator_utils, [
     open_db/1,
@@ -49,6 +53,7 @@
 }).
 
 -record(state, {
+    cp,
     loop,
     max_parallel_conns,
     source,
@@ -67,6 +72,7 @@
 
 start_link(Cp, #db{} = Source, Target, ChangesManager, _MaxConns) ->
     Pid = spawn_link(fun() ->
+        erlang:put(last_stats_report, now()),
         queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager)
     end),
     {ok, Pid};
@@ -82,7 +88,9 @@ init({Cp, Source, Target, ChangesManager, MaxConns}) ->
     LoopPid = spawn_link(fun() ->
         queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
     end),
+    erlang:put(last_stats_report, now()),
     State = #state{
+        cp = Cp,
         max_parallel_conns = MaxConns,
         loop = LoopPid,
         source = open_db(Source),
@@ -95,25 +103,18 @@ init({Cp, Source, Target, ChangesManager, MaxConns}) ->
     {ok, State}.
 
 
-handle_call({fetch_doc, {_Id, Revs, _PAs} = Params}, {Pid, _} = From,
+handle_call({fetch_doc, {_Id, _Revs, _PAs} = Params}, {Pid, _} = From,
     #state{loop = Pid, readers = Readers, pending_fetch = nil,
-        stats = Stats, source = Src, target = Tgt,
-        max_parallel_conns = MaxConns} = State) ->
-    Stats2 = Stats#rep_stats{
-        missing_checked = Stats#rep_stats.missing_checked + length(Revs),
-        missing_found = Stats#rep_stats.missing_found + length(Revs)
-    },
+        source = Src, target = Tgt, max_parallel_conns = MaxConns} = State) ->
     case length(Readers) of
     Size when Size < MaxConns ->
         Reader = spawn_doc_reader(Src, Tgt, Params),
         NewState = State#state{
-            stats = Stats2,
             readers = [Reader | Readers]
         },
         {reply, ok, NewState};
     _ ->
         NewState = State#state{
-            stats = Stats2,
             pending_fetch = {From, Params}
         },
         {noreply, NewState}
@@ -123,27 +124,11 @@ handle_call({batch_doc, Doc}, From, State) ->
     gen_server:reply(From, ok),
     {noreply, maybe_flush_docs(Doc, State)};
 
-handle_call({doc_flushed, true}, _From, #state{stats = Stats} = State) ->
-    NewStats = Stats#rep_stats{
-        docs_read = Stats#rep_stats.docs_read + 1,
-        docs_written = Stats#rep_stats.docs_written + 1
-    },
-    {reply, ok, State#state{stats = NewStats}};
-
-handle_call({doc_flushed, false}, _From, #state{stats = Stats} = State) ->
-    NewStats = Stats#rep_stats{
-        docs_read = Stats#rep_stats.docs_read + 1,
-        doc_write_failures = Stats#rep_stats.doc_write_failures + 1
-    },
-    {reply, ok, State#state{stats = NewStats}};
-
-handle_call({add_write_stats, Written, Failed}, _From,
-    #state{stats = Stats} = State) ->
-    NewStats = Stats#rep_stats{
-        docs_written = Stats#rep_stats.docs_written + Written,
-        doc_write_failures = Stats#rep_stats.doc_write_failures + Failed
-    },
-    {reply, ok, State#state{stats = NewStats}};
+handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) ->
+    gen_server:reply(From, ok),
+    NewStats = couch_replicator_utils:sum_stats(Stats, IncStats),
+    NewStats2 = maybe_report_stats(State#state.cp, NewStats),
+    {noreply, State#state{stats = NewStats2}};
 
 handle_call(flush, {Pid, _} = From,
     #state{loop = Pid, writer = nil, flush_waiter = nil,
@@ -236,62 +221,50 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
         ok;
     {changes, ChangesManager, Changes, ReportSeq} ->
         Target2 = open_db(Target),
-        {IdRevs, NotMissingCount} = find_missing(Changes, Target2),
+        {IdRevs, Stats0} = find_missing(Changes, Target2),
         case Source of
         #db{} ->
             Source2 = open_db(Source),
             Stats = local_process_batch(
-                IdRevs, Source2, Target2, #batch{}, #rep_stats{}),
+                IdRevs, Cp, Source2, Target2, #batch{}, Stats0),
             close_db(Source2);
         #httpdb{} ->
+            ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
             remote_process_batch(IdRevs, Parent),
             {ok, Stats} = gen_server:call(Parent, flush, infinity)
         end,
         close_db(Target2),
-        Stats2 = Stats#rep_stats{
-            missing_checked = Stats#rep_stats.missing_checked + NotMissingCount
-        },
-        ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats2}, infinity),
+        ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
+        erlang:put(last_stats_report, now()),
         ?LOG_DEBUG("Worker reported completion of seq ~p", [ReportSeq]),
         queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
     end.
 
 
-local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats) ->
+local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) ->
     Stats;
 
-local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
+local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
     case Target of
     #httpdb{} ->
         ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
     #db{} ->
         ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size])
     end,
-    {Written, WriteFailures} = flush_docs(Target, Docs),
-    Stats#rep_stats{
-        docs_written = Stats#rep_stats.docs_written + Written,
-        doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
-    };
-
-local_process_batch([IdRevs | Rest], Source, Target, Batch, Stats) ->
-    {_Id, Revs, _PAs} = IdRevs,
-    {ok, {_, DocList, Written0, WriteFailures0}} = fetch_doc(
-        Source, IdRevs, fun local_doc_handler/2, {Target, [], 0, 0}),
-    Read = length(DocList) + Written0 + WriteFailures0,
-    {Batch2, Written, WriteFailures} = lists:foldl(
-        fun(Doc, {Batch0, W0, F0}) ->
-            {Batch1, W, F} = maybe_flush_docs(Target, Batch0, Doc),
-            {Batch1, W0 + W, F0 + F}
+    Stats2 = flush_docs(Target, Docs),
+    Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2),
+    local_process_batch([], Cp, Source, Target, #batch{}, Stats3);
+
+local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) ->
+    {ok, {_, DocList, Stats2, _}} = fetch_doc(
+        Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}),
+    {Batch2, Stats3} = lists:foldl(
+        fun(Doc, {Batch0, Stats0}) ->
+            {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc),
+            {Batch1, couch_replicator_utils:sum_stats(Stats0, S)}
         end,
-        {Batch, Written0, WriteFailures0}, DocList),
-    Stats2 = Stats#rep_stats{
-        missing_checked = Stats#rep_stats.missing_checked + length(Revs),
-        missing_found = Stats#rep_stats.missing_found + length(Revs),
-        docs_read = Stats#rep_stats.docs_read + Read,
-        docs_written = Stats#rep_stats.docs_written + Written,
-        doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
-    },
-    local_process_batch(Rest, Source, Target, Batch2, Stats2).
+        {Batch, Stats2}, DocList),
+    local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3).
 
 
 remote_process_batch([], _Parent) ->
@@ -333,21 +306,24 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
     end.
 
 
-local_doc_handler({ok, Doc}, {Target, DocList, W, F}) ->
+local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
+    Stats2 = ?inc_stat(#rep_stats.docs_read, Stats, 1),
     case batch_doc(Doc) of
     true ->
-        {ok, {Target, [Doc | DocList], W, F}};
+        {ok, {Target, [Doc | DocList], Stats2, Cp}};
     false ->
         ?LOG_DEBUG("Worker flushing doc with attachments", []),
         Target2 = open_db(Target),
         Success = (flush_doc(Target2, Doc) =:= ok),
         close_db(Target2),
-        case Success of
+        Stats3 = case Success of
         true ->
-            {ok, {Target, DocList, W + 1, F}};
+            ?inc_stat(#rep_stats.docs_written, Stats2, 1);
         false ->
-            {ok, {Target, DocList, W, F + 1}}
-        end
+            ?inc_stat(#rep_stats.doc_write_failures, Stats2, 1)
+        end,
+        Stats4 = maybe_report_stats(Cp, Stats3),
+        {ok, {Target, DocList, Stats4, Cp}}
     end;
 local_doc_handler(_, Acc) ->
     {ok, Acc}.
@@ -361,17 +337,19 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
     % source. The data property of each attachment is a function that starts
     % streaming the attachment data from the remote source, therefore it's
     % convenient to call it ASAP to avoid ibrowse inactivity timeouts.
+    Stats = #rep_stats{docs_read = 1},
     ?LOG_DEBUG("Worker flushing doc with attachments", []),
     Target2 = open_db(Target),
     Success = (flush_doc(Target2, Doc) =:= ok),
-    ok = gen_server:call(Parent, {doc_flushed, Success}, infinity),
     close_db(Target2),
-    case Success of
+    {Result, Stats2} = case Success of
     true ->
-        {ok, Acc};
+        {{ok, Acc}, ?inc_stat(#rep_stats.docs_written, Stats, 1)};
     false ->
-        {skip, Acc}
-    end;
+        {{skip, Acc}, ?inc_stat(#rep_stats.doc_write_failures, Stats, 1)}
+    end,
+    ok = gen_server:call(Parent, {add_stats, Stats2}, infinity),
+    Result;
 remote_doc_handler(_, Acc) ->
     {ok, Acc}.
 
@@ -389,15 +367,15 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
     spawn_link(
         fun() ->
             Target2 = open_db(Target),
-            {Written, Failed} = flush_docs(Target2, DocList),
+            Stats = flush_docs(Target2, DocList),
             close_db(Target2),
-            ok = gen_server:call(
-                Parent, {add_write_stats, Written, Failed}, infinity)
+            ok = gen_server:call(Parent, {add_stats, Stats}, infinity)
         end).
 
 
 after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
     gen_server:reply(Waiter, {ok, Stats}),
+    erlang:put(last_stats_report, now()),
     State#state{
         stats = #rep_stats{},
         flush_waiter = nil,
@@ -406,40 +384,38 @@ after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State)
->
     }.
 
 
-maybe_flush_docs(Doc, #state{target = Target, batch = Batch,
-        stats = Stats} = State) ->
-    {Batch2, W, F} = maybe_flush_docs(Target, Batch, Doc),
-    Stats2 = Stats#rep_stats{
-        docs_read = Stats#rep_stats.docs_read + 1,
-        docs_written = Stats#rep_stats.docs_written + W,
-        doc_write_failures = Stats#rep_stats.doc_write_failures + F
-    },
-    State#state{
-        stats = Stats2,
-        batch = Batch2
-    }.
+maybe_flush_docs(Doc,State) ->
+    #state{
+        target = Target, batch = Batch,
+        stats = Stats, cp = Cp
+    } = State,
+    {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc),
+    Stats2 = couch_replicator_utils:sum_stats(Stats, WStats),
+    Stats3 = ?inc_stat(#rep_stats.docs_read, Stats2, 1),
+    Stats4 = maybe_report_stats(Cp, Stats3),
+    State#state{stats = Stats4, batch = Batch2}.
 
 
-maybe_flush_docs(#httpdb{} = Target,
-    #batch{docs = DocAcc, size = SizeAcc} = Batch, Doc) ->
+maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
+    #batch{docs = DocAcc, size = SizeAcc} = Batch,
     case batch_doc(Doc) of
     false ->
         ?LOG_DEBUG("Worker flushing doc with attachments", []),
         case flush_doc(Target, Doc) of
         ok ->
-            {Batch, 1, 0};
+            {Batch, #rep_stats{docs_written = 1}};
         _ ->
-            {Batch, 0, 1}
+            {Batch, #rep_stats{doc_write_failures = 1}}
         end;
     true ->
         JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
         case SizeAcc + iolist_size(JsonDoc) of
         SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE ->
             ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [SizeAcc2]),
-            {Written, Failed} = flush_docs(Target, [JsonDoc | DocAcc]),
-            {#batch{}, Written, Failed};
+            Stats = flush_docs(Target, [JsonDoc | DocAcc]),
+            {#batch{}, Stats};
         SizeAcc2 ->
-            {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, 0, 0}
+            {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, #rep_stats{}}
         end
     end;
 
@@ -447,10 +423,10 @@ maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc},
Doc) ->
     case SizeAcc + 1 of
     SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
         ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [SizeAcc2]),
-        {Written, Failed} = flush_docs(Target, [Doc | DocAcc]),
-        {#batch{}, Written, Failed};
+        Stats = flush_docs(Target, [Doc | DocAcc]),
+        {#batch{}, Stats};
     SizeAcc2 ->
-        {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, 0, 0}
+        {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, #rep_stats{}}
     end.
 
 
@@ -465,7 +441,7 @@ batch_doc(#doc{atts = Atts}) ->
 
 
 flush_docs(_Target, []) ->
-    {0, 0};
+    #rep_stats{};
 
 flush_docs(Target, DocList) ->
     {ok, Errors} = couch_api_wrap:update_docs(
@@ -478,7 +454,10 @@ flush_docs(Target, DocList) ->
                 [get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
                     get_value(error, Props, ""), get_value(reason, Props, "")])
         end, Errors),
-    {length(DocList) - length(Errors), length(Errors)}.
+    #rep_stats{
+        docs_written = length(DocList) - length(Errors),
+        doc_write_failures = length(Errors)
+    }.
 
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
     try couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
@@ -517,4 +496,20 @@ find_missing(DocInfos, Target) ->
     MissingRevsCount = lists:foldl(
         fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
         0, Missing),
-    {Missing, AllRevsCount - MissingRevsCount}.
+    Stats = #rep_stats{
+        missing_checked = AllRevsCount,
+        missing_found = MissingRevsCount
+    },
+    {Missing, Stats}.
+
+
+maybe_report_stats(Cp, Stats) ->
+    Now = now(),
+    case timer:now_diff(erlang:get(last_stats_report), Now) >= ?STATS_DELAY of
+    true ->
+        ok = gen_server:call(Cp, {add_stats, Stats}, infinity),
+        erlang:put(last_stats_report, Now),
+        #rep_stats{};
+    false ->
+        Stats
+    end.


Mime
View raw message