Return-Path: Delivered-To: apmail-couchdb-dev-archive@www.apache.org Received: (qmail 99864 invoked from network); 25 Jan 2011 17:31:42 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 25 Jan 2011 17:31:42 -0000 Received: (qmail 76592 invoked by uid 500); 25 Jan 2011 17:31:41 -0000 Delivered-To: apmail-couchdb-dev-archive@couchdb.apache.org Received: (qmail 76549 invoked by uid 500); 25 Jan 2011 17:31:41 -0000 Mailing-List: contact dev-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list dev@couchdb.apache.org Received: (qmail 76541 invoked by uid 99); 25 Jan 2011 17:31:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Jan 2011 17:31:41 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=10.0 tests=FREEMAIL_FROM,RCVD_IN_DNSWL_LOW,SPF_PASS,T_TO_NO_BRKTS_FREEMAIL X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of bchesneau@gmail.com designates 209.85.210.180 as permitted sender) Received: from [209.85.210.180] (HELO mail-iy0-f180.google.com) (209.85.210.180) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Jan 2011 17:31:32 +0000 Received: by iyj21 with SMTP id 21so5622140iyj.11 for ; Tue, 25 Jan 2011 09:31:11 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type:content-transfer-encoding; bh=Vti+Fb8poNsNuM1uDx5p0W/62ewL0jDsT+pE1J25Z40=; b=cgTlAR6yJ4b+wdyBnvpFfNzEbr4Acy+iENITKbYf9rIH4/LqfGzaFGEzvMFz0mI1vF nLAER6h2fMudUwtCb/y+2+213cC02I6SVhN3SvpZtJINn+frbVUqgiaDWcKWQagP5boL UKjxXbNjiIbDXLbvVIa85fuTAfwHatouvb7aw= DomainKey-Signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type:content-transfer-encoding; b=nIEQOd0mmZ5TNUKUNN8Nsh5R1f9rxmcf60C6Rd5dwr3KytivprfzP8ydOT2E++zeJP vK0WMAIepggf3gwfP8qkDNdtcH9Ksc0pJGxiS39T5hiDeSgDzmGXr5rgyF2aZ7KeWliV PXyBMxEUr+aFenRn0LlwaI/b6Btl2nGtnpmM0= MIME-Version: 1.0 Received: by 10.231.36.139 with SMTP id t11mr6924577ibd.91.1295976671266; Tue, 25 Jan 2011 09:31:11 -0800 (PST) Received: by 10.231.36.67 with HTTP; Tue, 25 Jan 2011 09:31:11 -0800 (PST) In-Reply-To: <20110124134611.89CBD238890A@eris.apache.org> References: <20110124134611.89CBD238890A@eris.apache.org> Date: Tue, 25 Jan 2011 18:31:11 +0100 Message-ID: Subject: Re: svn commit: r1062772 - /couchdb/trunk/src/couchdb/couch_rep_db_listener.erl From: Benoit Chesneau To: dev@couchdb.apache.org Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable X-Virus-Checked: Checked by ClamAV on apache.org On Mon, Jan 24, 2011 at 2:46 PM, wrote: > Author: fdmanana > Date: Mon Jan 24 13:46:11 2011 > New Revision: 1062772 > > URL: http://svn.apache.org/viewvc?rev=3D1062772&view=3Drev > Log: > Refactoring of the replicator database listener > > Simpler implementation and more reliable behaviour when the replicator > database is deleted or changed on the fly. > > Modified: > =A0 =A0couchdb/trunk/src/couchdb/couch_rep_db_listener.erl > > Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl > URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_= listener.erl?rev=3D1062772&r1=3D1062771&r2=3D1062772&view=3Ddiff > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D > --- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original) > +++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Mon Jan 24 13:46:= 11 2011 > @@ -18,98 +18,113 @@ > > =A0-include("couch_db.hrl"). > > --define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id). > --define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id). > +-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). > +-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). > > =A0-record(state, { > =A0 =A0 changes_feed_loop =3D nil, > - =A0 =A0changes_queue =3D nil, > - =A0 =A0changes_processor =3D nil, > - =A0 =A0db_notifier =3D nil > + =A0 =A0db_notifier =3D nil, > + =A0 =A0rep_db_name =3D nil, > + =A0 =A0rep_start_pids =3D [] > =A0}). > > +-import(couch_util, [ > + =A0 =A0get_value/2, > + =A0 =A0get_value/3 > +]). > + > > =A0start_link() -> > =A0 =A0 gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). > > =A0init(_) -> > =A0 =A0 process_flag(trap_exit, true), > - =A0 =A0{ok, Queue} =3D couch_work_queue:new( > - =A0 =A0 =A0 =A0[{max_size, 1024 * 1024}, {max_items, 1000}]), > - =A0 =A0{ok, Processor} =3D changes_processor(Queue), > - =A0 =A0{ok, Loop} =3D changes_feed_loop(Queue), > + =A0 =A0?DOC_ID_TO_REP_ID =3D ets:new(?DOC_ID_TO_REP_ID, [named_table, s= et, private]), > + =A0 =A0?REP_ID_TO_DOC_ID =3D ets:new(?REP_ID_TO_DOC_ID, [named_table, s= et, private]), > =A0 =A0 Server =3D self(), > =A0 =A0 ok =3D couch_config:register( > - =A0 =A0 =A0 =A0fun("replicator", "db") -> > - =A0 =A0 =A0 =A0 =A0 =A0ok =3D gen_server:cast(Server, rep_db_changed) > + =A0 =A0 =A0 =A0fun("replicator", "db", NewName) -> > + =A0 =A0 =A0 =A0 =A0 =A0ok =3D gen_server:cast(Server, {rep_db_changed, = ?l2b(NewName)}) > =A0 =A0 =A0 =A0 end > =A0 =A0 ), > + =A0 =A0{Loop, RepDbName} =3D changes_feed_loop(), > =A0 =A0 {ok, #state{ > =A0 =A0 =A0 =A0 changes_feed_loop =3D Loop, > - =A0 =A0 =A0 =A0changes_queue =3D Queue, > - =A0 =A0 =A0 =A0changes_processor =3D Processor, > + =A0 =A0 =A0 =A0rep_db_name =3D RepDbName, > =A0 =A0 =A0 =A0 db_notifier =3D db_update_notifier()} > =A0 =A0 }. > > > +handle_call({rep_db_update, Change}, _From, State) -> > + =A0 =A0{reply, ok, process_update(State, Change)}; > + > =A0handle_call(Msg, From, State) -> > =A0 =A0 ?LOG_ERROR("Replicator DB listener received unexpected call ~p fr= om ~p", > =A0 =A0 =A0 =A0 [Msg, From]), > =A0 =A0 {stop, {error, {unexpected_call, Msg}}, State}. > > > -handle_cast(rep_db_changed, State) -> > - =A0 =A0#state{ > - =A0 =A0 =A0 =A0changes_feed_loop =3D Loop, > - =A0 =A0 =A0 =A0changes_queue =3D Queue > - =A0 =A0} =3D State, > - =A0 =A0catch unlink(Loop), > - =A0 =A0catch exit(Loop, rep_db_changed), > - =A0 =A0couch_work_queue:queue(Queue, stop_all_replications), > - =A0 =A0{ok, NewLoop} =3D changes_feed_loop(Queue), > - =A0 =A0{noreply, State#state{changes_feed_loop =3D NewLoop}}; > - > -handle_cast(rep_db_created, #state{changes_feed_loop =3D Loop} =3D State= ) -> > - =A0 =A0catch unlink(Loop), > - =A0 =A0catch exit(Loop, rep_db_changed), > - =A0 =A0{ok, NewLoop} =3D changes_feed_loop(State#state.changes_queue), > - =A0 =A0{noreply, State#state{changes_feed_loop =3D NewLoop}}; > +handle_cast({rep_db_changed, NewName}, > + =A0 =A0 =A0 =A0#state{rep_db_name =3D NewName} =3D State) -> > + =A0 =A0{noreply, State}; > + > +handle_cast({rep_db_changed, _NewName}, State) -> > + =A0 =A0{noreply, restart(State)}; > + > +handle_cast({rep_db_created, NewName}, > + =A0 =A0 =A0 =A0#state{rep_db_name =3D NewName} =3D State) -> > + =A0 =A0{noreply, State}; > + > +handle_cast({rep_db_created, _NewName}, State) -> > + =A0 =A0{noreply, restart(State)}; > > =A0handle_cast(Msg, State) -> > =A0 =A0 ?LOG_ERROR("Replicator DB listener received unexpected cast ~p", = [Msg]), > =A0 =A0 {stop, {error, {unexpected_cast, Msg}}, State}. > > + > =A0handle_info({'EXIT', From, normal}, #state{changes_feed_loop =3D From}= =3D State) -> > =A0 =A0 % replicator DB deleted > - =A0 =A0couch_work_queue:queue(State#state.changes_queue, stop_all_repli= cations), > - =A0 =A0{noreply, State#state{changes_feed_loop =3D nil}}; > + =A0 =A0{noreply, State#state{changes_feed_loop =3D nil, rep_db_name =3D= nil}}; > > =A0handle_info({'EXIT', From, Reason}, #state{db_notifier =3D From} =3D S= tate) -> > =A0 =A0 ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason])= , > =A0 =A0 {stop, {db_update_notifier_died, Reason}, State}; > > -handle_info({'EXIT', From, Reason}, #state{changes_processor =3D From} = =3D State) -> > - =A0 =A0?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [= Reason]), > - =A0 =A0{stop, {rep_db_changes_processor_died, Reason}, State}. > +handle_info({'EXIT', From, normal}, #state{rep_start_pids =3D Pids} =3D = State) -> > + =A0 =A0% one of the replication start processes terminated successfully > + =A0 =A0{noreply, State#state{rep_start_pids =3D Pids -- [From]}}; > + > +handle_info(Msg, State) -> > + =A0 =A0?LOG_ERROR("Replicator DB listener received unexpected message ~= p", [Msg]), > + =A0 =A0{stop, {unexpected_msg, Msg}, State}. > > > =A0terminate(_Reason, State) -> > =A0 =A0 #state{ > + =A0 =A0 =A0 =A0rep_start_pids =3D StartPids, > =A0 =A0 =A0 =A0 changes_feed_loop =3D Loop, > - =A0 =A0 =A0 =A0changes_queue =3D Queue > + =A0 =A0 =A0 =A0db_notifier =3D Notifier > =A0 =A0 } =3D State, > - =A0 =A0exit(Loop, stop), > - =A0 =A0% closing the queue will cause changes_processor to shutdown > - =A0 =A0couch_work_queue:close(Queue), > - =A0 =A0ok. > + =A0 =A0stop_all_replications(), > + =A0 =A0lists:foreach( > + =A0 =A0 =A0 =A0fun(Pid) -> > + =A0 =A0 =A0 =A0 =A0 =A0catch unlink(Pid), > + =A0 =A0 =A0 =A0 =A0 =A0catch exit(Pid, stop) > + =A0 =A0 =A0 =A0end, > + =A0 =A0 =A0 =A0[Loop | StartPids]), > + =A0 =A0true =3D ets:delete(?REP_ID_TO_DOC_ID), > + =A0 =A0true =3D ets:delete(?DOC_ID_TO_REP_ID), > + =A0 =A0couch_db_update_notifier:stop(Notifier). > > > =A0code_change(_OldVsn, State, _Extra) -> > =A0 =A0 {ok, State}. > > > -changes_feed_loop(ChangesQueue) -> > +changes_feed_loop() -> > =A0 =A0 {ok, RepDb} =3D couch_rep:ensure_rep_db_exists(), > + =A0 =A0Server =3D self(), > =A0 =A0 Pid =3D spawn_link( > =A0 =A0 =A0 =A0 fun() -> > =A0 =A0 =A0 =A0 =A0 =A0 ChangesFeedFun =3D couch_changes:handle_changes( > @@ -126,7 +141,8 @@ changes_feed_loop(ChangesQueue) -> > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 fun({change, Change, _}, _) -> > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 case has_valid_rep_id(Change) of > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 true -> > - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0couch_work_queue:queue(C= hangesQueue, Change); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0ok =3D gen_server:call( > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Server, {rep_db_= update, Change}, infinity); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 false -> > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 ok > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 end; > @@ -137,7 +153,15 @@ changes_feed_loop(ChangesQueue) -> > =A0 =A0 =A0 =A0 end > =A0 =A0 ), > =A0 =A0 couch_db:close(RepDb), > - =A0 =A0{ok, Pid}. > + =A0 =A0{Pid, couch_db:name(RepDb)}. > + > + > +has_valid_rep_id({Change}) -> > + =A0 =A0has_valid_rep_id(get_value(<<"id">>, Change)); > +has_valid_rep_id(<>) -> > + =A0 =A0false; > +has_valid_rep_id(_Else) -> > + =A0 =A0true. > > > =A0db_update_notifier() -> > @@ -146,121 +170,106 @@ db_update_notifier() -> > =A0 =A0 =A0 =A0 fun({created, DbName}) -> > =A0 =A0 =A0 =A0 =A0 =A0 case ?l2b(couch_config:get("replicator", "db", "_= replicator")) of > =A0 =A0 =A0 =A0 =A0 =A0 DbName -> > - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0ok =3D gen_server:cast(Server, rep_db_cr= eated); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0ok =3D gen_server:cast(Server, {rep_db_c= reated, DbName}); > =A0 =A0 =A0 =A0 =A0 =A0 _ -> > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 ok > =A0 =A0 =A0 =A0 =A0 =A0 end; > =A0 =A0 =A0 =A0 (_) -> > + =A0 =A0 =A0 =A0 =A0 =A0% no need to handle the 'deleted' event - the ch= anges feed loop > + =A0 =A0 =A0 =A0 =A0 =A0% dies when the database is deleted > =A0 =A0 =A0 =A0 =A0 =A0 ok > =A0 =A0 =A0 =A0 end > =A0 =A0 ), > =A0 =A0 Notifier. > > > -changes_processor(ChangesQueue) -> > - =A0 =A0Pid =3D spawn_link( > - =A0 =A0 =A0 =A0fun() -> > - =A0 =A0 =A0 =A0 =A0 =A0ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, p= rivate]), > - =A0 =A0 =A0 =A0 =A0 =A0ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set= , private]), > - =A0 =A0 =A0 =A0 =A0 =A0consume_changes(ChangesQueue), > - =A0 =A0 =A0 =A0 =A0 =A0true =3D ets:delete(?REP_ID_TO_DOC_ID_MAP), > - =A0 =A0 =A0 =A0 =A0 =A0true =3D ets:delete(?DOC_TO_REP_ID_MAP) > - =A0 =A0 =A0 =A0end > - =A0 =A0), > - =A0 =A0{ok, Pid}. > - > - > -consume_changes(ChangesQueue) -> > - =A0 =A0case couch_work_queue:dequeue(ChangesQueue) of > - =A0 =A0closed -> > - =A0 =A0 =A0 =A0ok; > - =A0 =A0{ok, Changes} -> > - =A0 =A0 =A0 =A0lists:foreach(fun process_change/1, Changes), > - =A0 =A0 =A0 =A0consume_changes(ChangesQueue) > - =A0 =A0end. > - > - > -has_valid_rep_id({Change}) -> > - =A0 =A0has_valid_rep_id(couch_util:get_value(<<"id">>, Change)); > -has_valid_rep_id(<>) -> > - =A0 =A0false; > -has_valid_rep_id(_Else) -> > - =A0 =A0true. > +restart(#state{changes_feed_loop =3D Loop, rep_start_pids =3D StartPids}= =3D State) -> > + =A0 =A0stop_all_replications(), > + =A0 =A0lists:foreach( > + =A0 =A0 =A0 =A0fun(Pid) -> > + =A0 =A0 =A0 =A0 =A0 =A0catch unlink(Pid), > + =A0 =A0 =A0 =A0 =A0 =A0catch exit(Pid, rep_db_changed) > + =A0 =A0 =A0 =A0end, > + =A0 =A0 =A0 =A0[Loop | StartPids]), > + =A0 =A0{NewLoop, NewRepDbName} =3D changes_feed_loop(), > + =A0 =A0State#state{ > + =A0 =A0 =A0 =A0changes_feed_loop =3D NewLoop, > + =A0 =A0 =A0 =A0rep_db_name =3D NewRepDbName, > + =A0 =A0 =A0 =A0rep_start_pids =3D [] > + =A0 =A0}. > > -process_change(stop_all_replications) -> > - =A0 =A0?LOG_INFO("Stopping all ongoing replications because the replica= tor DB " > - =A0 =A0 =A0 =A0"was deleted or changed", []), > - =A0 =A0stop_all_replications(); > > -process_change({Change}) -> > - =A0 =A0{RepProps} =3D JsonRepDoc =3D couch_util:get_value(doc, Change), > - =A0 =A0DocId =3D couch_util:get_value(<<"_id">>, RepProps), > - =A0 =A0case couch_util:get_value(<<"deleted">>, Change, false) of > +process_update(State, {Change}) -> > + =A0 =A0{RepProps} =3D JsonRepDoc =3D get_value(doc, Change), > + =A0 =A0DocId =3D get_value(<<"_id">>, RepProps), > + =A0 =A0case get_value(<<"deleted">>, Change, false) of > =A0 =A0 true -> > - =A0 =A0 =A0 =A0rep_doc_deleted(DocId); > + =A0 =A0 =A0 =A0rep_doc_deleted(DocId), > + =A0 =A0 =A0 =A0State; > =A0 =A0 false -> > - =A0 =A0 =A0 =A0case couch_util:get_value(<<"_replication_state">>, RepP= rops) of > + =A0 =A0 =A0 =A0case get_value(<<"_replication_state">>, RepProps) of > =A0 =A0 =A0 =A0 <<"completed">> -> > - =A0 =A0 =A0 =A0 =A0 =A0replication_complete(DocId); > + =A0 =A0 =A0 =A0 =A0 =A0replication_complete(DocId), > + =A0 =A0 =A0 =A0 =A0 =A0State; > =A0 =A0 =A0 =A0 <<"error">> -> > - =A0 =A0 =A0 =A0 =A0 =A0stop_replication(DocId); > + =A0 =A0 =A0 =A0 =A0 =A0stop_replication(DocId), > + =A0 =A0 =A0 =A0 =A0 =A0State; > =A0 =A0 =A0 =A0 <<"triggered">> -> > - =A0 =A0 =A0 =A0 =A0 =A0maybe_start_replication(DocId, JsonRepDoc); > + =A0 =A0 =A0 =A0 =A0 =A0maybe_start_replication(State, DocId, JsonRepDoc= ); > =A0 =A0 =A0 =A0 undefined -> > - =A0 =A0 =A0 =A0 =A0 =A0maybe_start_replication(DocId, JsonRepDoc); > - =A0 =A0 =A0 =A0_ -> > - =A0 =A0 =A0 =A0 =A0 =A0?LOG_ERROR("Invalid value for the `_replication_= state` property" > - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0" of the replication document `~s`", [Do= cId]) > + =A0 =A0 =A0 =A0 =A0 =A0maybe_start_replication(State, DocId, JsonRepDoc= ) > =A0 =A0 =A0 =A0 end > - =A0 =A0end, > - =A0 =A0ok. > + =A0 =A0end. > > > =A0rep_user_ctx({RepDoc}) -> > - =A0 =A0case couch_util:get_value(<<"user_ctx">>, RepDoc) of > + =A0 =A0case get_value(<<"user_ctx">>, RepDoc) of > =A0 =A0 undefined -> > =A0 =A0 =A0 =A0 #user_ctx{roles =3D [<<"_admin">>]}; > =A0 =A0 {UserCtx} -> > =A0 =A0 =A0 =A0 #user_ctx{ > - =A0 =A0 =A0 =A0 =A0 =A0name =3D couch_util:get_value(<<"name">>, UserCt= x, null), > - =A0 =A0 =A0 =A0 =A0 =A0roles =3D couch_util:get_value(<<"roles">>, User= Ctx, []) > + =A0 =A0 =A0 =A0 =A0 =A0name =3D get_value(<<"name">>, UserCtx, null), > + =A0 =A0 =A0 =A0 =A0 =A0roles =3D get_value(<<"roles">>, UserCtx, []) > =A0 =A0 =A0 =A0 } > =A0 =A0 end. > > > -maybe_start_replication(DocId, JsonRepDoc) -> > +maybe_start_replication(State, DocId, JsonRepDoc) -> > =A0 =A0 UserCtx =3D rep_user_ctx(JsonRepDoc), > =A0 =A0 {BaseId, _} =3D RepId =3D couch_rep:make_replication_id(JsonRepDo= c, UserCtx), > - =A0 =A0case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of > + =A0 =A0case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of > =A0 =A0 [] -> > - =A0 =A0 =A0 =A0true =3D ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocI= d}), > - =A0 =A0 =A0 =A0true =3D ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}), > - =A0 =A0 =A0 =A0spawn_link(fun() -> start_replication(JsonRepDoc, RepId,= UserCtx) end); > + =A0 =A0 =A0 =A0true =3D ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}), > + =A0 =A0 =A0 =A0true =3D ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}), > + =A0 =A0 =A0 =A0Pid =3D spawn_link(fun() -> > + =A0 =A0 =A0 =A0 =A0 =A0start_replication(JsonRepDoc, RepId, UserCtx) > + =A0 =A0 =A0 =A0end), > + =A0 =A0 =A0 =A0State#state{rep_start_pids =3D [Pid | State#state.rep_st= art_pids]}; > =A0 =A0 [{BaseId, DocId}] -> > - =A0 =A0 =A0 =A0ok; > + =A0 =A0 =A0 =A0State; > =A0 =A0 [{BaseId, OtherDocId}] -> > - =A0 =A0 =A0 =A0maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), Other= DocId) > + =A0 =A0 =A0 =A0?LOG_INFO("The replication specified by the document `~s= ` was already" > + =A0 =A0 =A0 =A0 =A0 =A0" triggered by the document `~s`", [DocId, Other= DocId]), > + =A0 =A0 =A0 =A0maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), > + =A0 =A0 =A0 =A0State > =A0 =A0 end. > > > -maybe_tag_rep_doc(DocId, {Props} =3D JsonRepDoc, RepId, OtherDocId) -> > - =A0 =A0case couch_util:get_value(<<"_replication_id">>, Props) of > +maybe_tag_rep_doc({Props} =3D JsonRepDoc, RepId) -> > + =A0 =A0case get_value(<<"_replication_id">>, Props) of > =A0 =A0 RepId -> > =A0 =A0 =A0 =A0 ok; > =A0 =A0 _ -> > - =A0 =A0 =A0 =A0?LOG_INFO("The replication specified by the document `~s= ` was already" > - =A0 =A0 =A0 =A0 =A0 =A0" triggered by the document `~s`", [DocId, Other= DocId]), > =A0 =A0 =A0 =A0 couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id= ">>, RepId}]) > =A0 =A0 end. > > > - > -start_replication({RepProps} =3D RepDoc, {Base, Ext} =3D RepId, UserCtx)= -> > +start_replication({RepProps} =3D RepDoc, {Base, _} =3D RepId, UserCtx) -= > > =A0 =A0 case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) = of > - =A0 =A0RepPid when is_pid(RepPid) -> > + =A0 =A0Pid when is_pid(Pid) -> > =A0 =A0 =A0 =A0 ?LOG_INFO("Document `~s` triggered replication `~s`", > - =A0 =A0 =A0 =A0 =A0 =A0[couch_util:get_value(<<"_id">>, RepProps), Base= ++ Ext]), > - =A0 =A0 =A0 =A0couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx); > + =A0 =A0 =A0 =A0 =A0 =A0[get_value(<<"_id">>, RepProps), pp_rep_id(RepId= )]), > + =A0 =A0 =A0 =A0couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); > =A0 =A0 Error -> > =A0 =A0 =A0 =A0 couch_rep:update_rep_doc( > =A0 =A0 =A0 =A0 =A0 =A0 RepDoc, > @@ -269,43 +278,54 @@ start_replication({RepProps} =3D RepDoc, { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 {<<"_replication_id">>, ?l2b(Base)} > =A0 =A0 =A0 =A0 =A0 =A0 ] > =A0 =A0 =A0 =A0 ), > - =A0 =A0 =A0 =A0?LOG_ERROR("Error starting replication `~s`: ~p", [Base = ++ Ext, Error]) > + =A0 =A0 =A0 =A0?LOG_ERROR("Error starting replication `~s`: ~p", [pp_re= p_id(RepId), Error]) > =A0 =A0 end. > > + > =A0rep_doc_deleted(DocId) -> > =A0 =A0 case stop_replication(DocId) of > - =A0 =A0{ok, {Base, Ext}} -> > + =A0 =A0{ok, RepId} -> > =A0 =A0 =A0 =A0 ?LOG_INFO("Stopped replication `~s` because replication d= ocument `~s`" > - =A0 =A0 =A0 =A0 =A0 =A0" was deleted", [Base ++ Ext, DocId]); > + =A0 =A0 =A0 =A0 =A0 =A0" was deleted", [pp_rep_id(RepId), DocId]); > =A0 =A0 none -> > =A0 =A0 =A0 =A0 ok > =A0 =A0 end. > > + > =A0replication_complete(DocId) -> > =A0 =A0 case stop_replication(DocId) of > - =A0 =A0{ok, {Base, Ext}} -> > + =A0 =A0{ok, RepId} -> > =A0 =A0 =A0 =A0 ?LOG_INFO("Replication `~s` finished (triggered by docume= nt `~s`)", > - =A0 =A0 =A0 =A0 =A0 =A0[Base ++ Ext, DocId]); > + =A0 =A0 =A0 =A0 =A0 =A0[pp_rep_id(RepId), DocId]); > =A0 =A0 none -> > =A0 =A0 =A0 =A0 ok > =A0 =A0 end. > > + > =A0stop_replication(DocId) -> > - =A0 =A0case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of > + =A0 =A0case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of > =A0 =A0 [{DocId, {BaseId, _} =3D RepId}] -> > =A0 =A0 =A0 =A0 couch_rep:end_replication(RepId), > - =A0 =A0 =A0 =A0true =3D ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId), > - =A0 =A0 =A0 =A0true =3D ets:delete(?DOC_TO_REP_ID_MAP, DocId), > + =A0 =A0 =A0 =A0true =3D ets:delete(?REP_ID_TO_DOC_ID, BaseId), > + =A0 =A0 =A0 =A0true =3D ets:delete(?DOC_ID_TO_REP_ID, DocId), > =A0 =A0 =A0 =A0 {ok, RepId}; > =A0 =A0 [] -> > =A0 =A0 =A0 =A0 none > =A0 =A0 end. > > + > =A0stop_all_replications() -> > + =A0 =A0?LOG_INFO("Stopping all ongoing replications because the replica= tor DB " > + =A0 =A0 =A0 =A0"was deleted or changed", []), > =A0 =A0 ets:foldl( > =A0 =A0 =A0 =A0 fun({_, RepId}, _) -> couch_rep:end_replication(RepId) en= d, > =A0 =A0 =A0 =A0 ok, > - =A0 =A0 =A0 =A0?DOC_TO_REP_ID_MAP > + =A0 =A0 =A0 =A0?DOC_ID_TO_REP_ID > =A0 =A0 ), > - =A0 =A0true =3D ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP), > - =A0 =A0true =3D ets:delete_all_objects(?DOC_TO_REP_ID_MAP). > + =A0 =A0true =3D ets:delete_all_objects(?REP_ID_TO_DOC_ID), > + =A0 =A0true =3D ets:delete_all_objects(?DOC_ID_TO_REP_ID). > + > + > +% pretty-print replication id > +pp_rep_id({Base, Extension}) -> > + =A0 =A0Base ++ Extension. > > > Is there any reason you are using named table here ? Why not just use ets ids ? Also why using macros ?