couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject svn commit: r807320 - in /couchdb/trunk/src/couchdb: couch_rep.erl couch_rep_reader.erl
Date Mon, 24 Aug 2009 17:57:51 GMT
Author: kocolosk
Date: Mon Aug 24 17:57:51 2009
New Revision: 807320

URL: http://svn.apache.org/viewvc?rev=807320&view=rev
Log:
checkpoint at most once per 5 seconds

Modified:
    couchdb/trunk/src/couchdb/couch_rep.erl
    couchdb/trunk/src/couchdb/couch_rep_reader.erl

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=807320&r1=807319&r2=807320&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Mon Aug 24 17:57:51 2009
@@ -15,7 +15,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
     code_change/3]).
 
--export([replicate/2]).
+-export([replicate/2, checkpoint/1]).
 
 -include("couch_db.hrl").
 
@@ -28,6 +28,7 @@
     source,
     target,
     init_args,
+    checkpoint_scheduled = nil,
 
     start_seq,
     history,
@@ -73,6 +74,9 @@
         get_result(Server, PostBody, UserCtx)
     end.
 
+checkpoint(Server) ->
+    gen_server:cast(Server, do_checkpoint).
+
 get_result(Server, PostBody, UserCtx) ->
     try gen_server:call(Server, get_result, infinity) of
     retry -> replicate(PostBody, UserCtx);
@@ -137,6 +141,7 @@
         target = Target,
         init_args = InitArgs,
         stats = Stats,
+        checkpoint_scheduled = nil,
 
         start_seq = StartSeq,
         history = History,
@@ -154,19 +159,22 @@
     Listeners = State#state.listeners,
     {noreply, State#state{listeners=[From|Listeners]}}.
 
+handle_cast(do_checkpoint, State) ->
+    {noreply, do_checkpoint(State)};
+
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
 handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
     couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
-    {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+    {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
 
 handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
         when SourceSeq > N ->
     MissingRevs = State#state.missing_revs,
     ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
     couch_task_status:update("W Processed source update #~p", [SourceSeq]),
-    {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+    {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
 handle_info({writer_checkpoint, _}, State) ->
     {noreply, State};
 
@@ -190,6 +198,7 @@
 
 terminate(normal, State) ->
     #state{
+        checkpoint_scheduled = TRef,
         checkpoint_history = CheckpointHistory,
         committed_seq = NewSeq,
         listeners = Listeners,
@@ -197,7 +206,8 @@
         target = Target,
         stats = Stats,
         source_log = #doc{body={OldHistory}}
-    } = State,
+    } = do_checkpoint(State),
+    timer:cancel(TRef),
     couch_task_status:update("Finishing"),
     ets:delete(Stats),
     close_db(Target),
@@ -414,6 +424,18 @@
     {not_found, no_db_file} -> throw({db_not_found, DbName})
     end.
 
+schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) ->
+    Server = self(),
+    case timer:apply_after(5000, couch_rep, checkpoint, [Server]) of
+    {ok, TRef} ->
+        State#state{checkpoint_scheduled = TRef};
+    Error ->
+        ?LOG_ERROR("tried to schedule a checkpoint but got ~p", [Error]),
+        State
+    end;
+schedule_checkpoint(State) ->
+    State.
+
 do_checkpoint(State) ->
     #state{
         source = Source,
@@ -466,6 +488,7 @@
     {TgtRevPos,TgtRevId} =
         update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
     State#state{
+        checkpoint_scheduled = nil,
         checkpoint_history = NewRepHistory,
         source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
         target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}

Modified: couchdb/trunk/src/couchdb/couch_rep_reader.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_reader.erl?rev=807320&r1=807319&r2=807320&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_reader.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_reader.erl Mon Aug 24 17:57:51 2009
@@ -130,7 +130,6 @@
         gen_server:reply(ReaderFrom, ok);
     true -> ok end,
     NewState = State#state{count=0, reader_from=nil, docs=queue:new()},
-    ?LOG_INFO("replying to next_docs with HighSeq ~p", [calculate_new_high_seq(State)]),
     {reply, {calculate_new_high_seq(State), queue:to_list(Docs)}, NewState}.
 
 handle_open_remote_doc(Id, Seq, Revs, From, #state{monitor_count=N} = State)



Mime
View raw message