Return-Path: Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: (qmail 30106 invoked from network); 12 Apr 2011 11:07:19 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 12 Apr 2011 11:07:19 -0000 Received: (qmail 95152 invoked by uid 500); 12 Apr 2011 11:07:19 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 94987 invoked by uid 500); 12 Apr 2011 11:07:18 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 94979 invoked by uid 99); 12 Apr 2011 11:07:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Apr 2011 11:07:17 +0000 X-ASF-Spam-Status: No, hits=-1996.4 required=5.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Apr 2011 11:07:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5384D23889E0; Tue, 12 Apr 2011 11:06:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1091371 - in /couchdb/trunk/src/couchdb: couch_replication_manager.erl couch_replicator.erl Date: Tue, 12 Apr 2011 11:06:53 -0000 To: commits@couchdb.apache.org From: fdmanana@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110412110653.5384D23889E0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fdmanana Date: Tue Apr 12 11:06:52 2011 New Revision: 1091371 URL: http://svn.apache.org/viewvc?rev=1091371&view=rev Log: Replication manager refactoring Update the state of replication documents outside the replication manager gen_server. This allows for a faster transition of replication states without adding substantial complexity, more or less similar to what is done in the 1.1.x branch. Modified: couchdb/trunk/src/couchdb/couch_replication_manager.erl couchdb/trunk/src/couchdb/couch_replicator.erl Modified: couchdb/trunk/src/couchdb/couch_replication_manager.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replication_manager.erl?rev=1091371&r1=1091370&r2=1091371&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replication_manager.erl (original) +++ couchdb/trunk/src/couchdb/couch_replication_manager.erl Tue Apr 12 11:06:52 2011 @@ -13,6 +13,10 @@ -module(couch_replication_manager). -behaviour(gen_server). +% public API +-export([replication_started/1, replication_completed/1, replication_error/2]). + +% gen_server callbacks -export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]). -export([code_change/3, terminate/2]). @@ -28,8 +32,7 @@ rep, starting, retries_left, - max_retries, - last_error + max_retries }). -import(couch_replicator_utils, [ @@ -44,7 +47,6 @@ -record(state, { changes_feed_loop = nil, db_notifier = nil, - rep_notifier = nil, rep_db_name = nil, rep_start_pids = [], max_retries @@ -54,6 +56,44 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +replication_started(#rep{id = {BaseId, _} = RepId}) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{rep = #rep{doc_id = DocId}} -> + update_rep_doc(DocId, [ + {<<"_replication_state">>, <<"triggered">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), + ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity), + ?LOG_INFO("Document `~s` triggered replication `~s`", + [DocId, pp_rep_id(RepId)]) + end. + + +replication_completed(#rep{id = RepId}) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{rep = #rep{doc_id = DocId}} -> + update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]), + ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity), + ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", + [pp_rep_id(RepId), DocId]) + end. + + +replication_error(#rep{id = RepId}, Error) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{rep = #rep{doc_id = DocId}} -> + % TODO: maybe add error reason to replication document + update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]), + ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity) + end. + + init(_) -> process_flag(trap_exit, true), ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]), @@ -72,7 +112,6 @@ init(_) -> changes_feed_loop = Loop, rep_db_name = RepDbName, db_notifier = db_update_notifier(), - rep_notifier = rep_notifier(), max_retries = list_to_integer( couch_config:get("replicator", "max_replication_retry_count", "10")) }}. @@ -81,41 +120,22 @@ init(_) -> handle_call({rep_db_update, Change}, _From, State) -> {reply, ok, process_update(State, Change)}; -handle_call({rep_started, {BaseId, _Ext} = RepId}, _From, State) -> +handle_call({rep_started, RepId}, _From, State) -> case rep_state(RepId) of nil -> ok; - #rep_state{rep = #rep{doc_id = DocId}} = RepState -> + RepState -> true = ets:insert( - ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}}), - update_rep_doc(DocId, [ - {<<"_replication_state">>, <<"triggered">>}, - {<<"_replication_id">>, ?l2b(BaseId)}]), - ?LOG_INFO("Document `~s` triggered replication `~s`", - [DocId, pp_rep_id(RepId)]) + ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}}) end, {reply, ok, State}; handle_call({rep_complete, RepId}, _From, State) -> - case rep_state(RepId) of - nil -> - ok; - #rep_state{rep = #rep{doc_id = DocId}} -> - update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]) - end, + true = ets:delete(?REP_TO_STATE, RepId), {reply, ok, State}; handle_call({rep_error, RepId, Error}, _From, State) -> - case rep_state(RepId) of - nil -> - {reply, ok, State}; - #rep_state{rep = #rep{doc_id = DocId}} = RepState -> - NewRepState = RepState#rep_state{last_error = Error}, - true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}), - % TODO: maybe add error reason to replication document - update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]), - {reply, ok, State} - end; + {reply, ok, replication_error(State, RepId, Error)}; handle_call(Msg, From, State) -> ?LOG_ERROR("Replication manager received unexpected call ~p from ~p", @@ -151,10 +171,6 @@ handle_info({'EXIT', From, Reason}, #sta ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]), {stop, {db_update_notifier_died, Reason}, State}; -handle_info({'EXIT', From, Reason}, #state{rep_notifier = From} = State) -> - ?LOG_ERROR("Replication notifier died. Reason: ~p", [Reason]), - {stop, {rep_notifier_died, Reason}, State}; - handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) -> % one of the replication start processes terminated successfully {noreply, State#state{rep_start_pids = Pids -- [From]}}; @@ -168,8 +184,7 @@ terminate(_Reason, State) -> #state{ rep_start_pids = StartPids, changes_feed_loop = Loop, - db_notifier = DbNotifier, - rep_notifier = RepNotifier + db_notifier = DbNotifier } = State, stop_all_replications(), lists:foreach( @@ -180,7 +195,6 @@ terminate(_Reason, State) -> [Loop | StartPids]), true = ets:delete(?REP_TO_STATE), true = ets:delete(?DOC_TO_REP), - couch_replication_notifier:stop(RepNotifier), couch_db_update_notifier:stop(DbNotifier). @@ -249,31 +263,6 @@ db_update_notifier() -> Notifier. -rep_notifier() -> - Server = self(), - {ok, Notifier} = couch_replication_notifier:start_link( - fun({finished, RepId, _CheckPointHistory}) -> - case rep_state(RepId) of - nil -> - ok; - #rep_state{} -> - % TODO: maybe add replication stats to the doc - ok = gen_server:call(Server, {rep_complete, RepId}, infinity) - end; - ({error, RepId, Error}) -> - case rep_state(RepId) of - nil -> - ok; - #rep_state{} -> - ok = gen_server:call( - Server, {rep_error, RepId, Error}, infinity) - end; - (_) -> - ok - end), - Notifier. - - restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) -> stop_all_replications(), lists:foreach( @@ -306,8 +295,8 @@ process_update(State, {Change}) -> <<"completed">> -> replication_complete(DocId), State; - <<"error">> -> - replication_error(State, DocId) + _ -> + State end end. @@ -379,11 +368,13 @@ start_replication(Server, #rep{id = RepI replication_complete(DocId) -> case ets:lookup(?DOC_TO_REP, DocId) of [{DocId, RepId}] -> - couch_replicator:cancel_replication(RepId), - true = ets:delete(?REP_TO_STATE, RepId), - true = ets:delete(?DOC_TO_REP, DocId), - ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", - [pp_rep_id(RepId), DocId]); + case rep_state(RepId) of + nil -> + couch_replicator:cancel_replication(RepId); + #rep_state{} -> + ok + end, + true = ets:delete(?DOC_TO_REP, DocId); _ -> ok end. @@ -402,19 +393,18 @@ rep_doc_deleted(DocId) -> end. -replication_error(State, DocId) -> - case ets:lookup(?DOC_TO_REP, DocId) of - [{DocId, RepId}] -> - maybe_retry_replication(rep_state(RepId), State); - _ -> - State +replication_error(State, RepId, Error) -> + case rep_state(RepId) of + nil -> + State; + RepState -> + maybe_retry_replication(RepState, Error, State) end. -maybe_retry_replication(#rep_state{retries_left = 0} = RepState, State) -> +maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) -> #rep_state{ rep = #rep{id = RepId, doc_id = DocId}, - max_retries = MaxRetries, - last_error = Error + max_retries = MaxRetries } = RepState, couch_replicator:cancel_replication(RepId), true = ets:delete(?REP_TO_STATE, RepId), @@ -424,11 +414,10 @@ maybe_retry_replication(#rep_state{retri [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]), State; -maybe_retry_replication(RepState, State) -> +maybe_retry_replication(RepState, Error, State) -> #rep_state{ rep = #rep{id = RepId, doc_id = DocId} = Rep, - retries_left = RetriesLeft, - last_error = Error + retries_left = RetriesLeft } = RepState, NewRepState = RepState#rep_state{retries_left = RetriesLeft - 1}, true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}), Modified: couchdb/trunk/src/couchdb/couch_replicator.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1091371&r1=1091370&r2=1091371&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator.erl (original) +++ couchdb/trunk/src/couchdb/couch_replicator.erl Tue Apr 12 11:06:52 2011 @@ -278,6 +278,8 @@ do_init(#rep{options = Options, id = {Ba ?LOG_DEBUG("Missing rev finder pids are: ~p", [MissingRevFinders]), ?LOG_DEBUG("Worker pids are: ~p", [Workers]), + couch_replication_manager:replication_started(Rep), + {ok, State#rep_state{ missing_revs_queue = MissingRevsQueue, changes_queue = ChangesQueue, @@ -443,10 +445,11 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -terminate(normal, #rep_state{rep_details = #rep{id = RepId}, +terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep, checkpoint_history = CheckpointHistory} = State) -> terminate_cleanup(State), - couch_replication_notifier:notify({finished, RepId, CheckpointHistory}); + couch_replication_notifier:notify({finished, RepId, CheckpointHistory}), + couch_replication_manager:replication_completed(Rep); terminate(shutdown, State) -> % cancelled replication throught ?MODULE:cancel_replication/1 @@ -456,12 +459,13 @@ terminate(Reason, State) -> #rep_state{ source_name = Source, target_name = Target, - rep_details = #rep{id = {BaseId, Ext} = RepId} + rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep } = State, ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s", [BaseId ++ Ext, Source, Target, to_binary(Reason)]), terminate_cleanup(State), - couch_replication_notifier:notify({error, RepId, Reason}). + couch_replication_notifier:notify({error, RepId, Reason}), + couch_replication_manager:replication_error(Rep, Reason). terminate_cleanup(State) ->