Return-Path: Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: (qmail 24475 invoked from network); 8 Mar 2011 18:50:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 8 Mar 2011 18:50:20 -0000 Received: (qmail 44125 invoked by uid 500); 8 Mar 2011 18:50:20 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 44069 invoked by uid 500); 8 Mar 2011 18:50:20 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 44062 invoked by uid 99); 8 Mar 2011 18:50:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Mar 2011 18:50:20 +0000 X-ASF-Spam-Status: No, hits=-1996.4 required=5.0 tests=ALL_TRUSTED,FS_REPLICA,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Mar 2011 18:50:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id F1EE92388A19; Tue, 8 Mar 2011 18:49:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1079483 - in /couchdb/trunk/src/couchdb: couch_replication_manager.erl couch_replicator.erl couch_replicator.hrl couch_replicator_utils.erl Date: Tue, 08 Mar 2011 18:49:53 -0000 To: commits@couchdb.apache.org From: fdmanana@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110308184953.F1EE92388A19@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fdmanana Date: Tue Mar 8 18:49:53 2011 New Revision: 1079483 URL: http://svn.apache.org/viewvc?rev=1079483&view=rev Log: Adapt replication manager to the new replicator's code Now all the replication document management is done only by the couch_replication_manager module, instead of being split by this module and replication gen_servers. The code is also simpler now, since it uses the couch_replication_notifier gen_event. This is a pure refactoring, not adding any new behaviour. Modified: couchdb/trunk/src/couchdb/couch_replication_manager.erl couchdb/trunk/src/couchdb/couch_replicator.erl couchdb/trunk/src/couchdb/couch_replicator.hrl couchdb/trunk/src/couchdb/couch_replicator_utils.erl Modified: couchdb/trunk/src/couchdb/couch_replication_manager.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replication_manager.erl?rev=1079483&r1=1079482&r2=1079483&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replication_manager.erl (original) +++ couchdb/trunk/src/couchdb/couch_replication_manager.erl Tue Mar 8 18:49:53 2011 @@ -18,23 +18,31 @@ -include("couch_db.hrl"). -include("couch_replicator.hrl"). +-include("couch_js_functions.hrl"). --define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). --define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). +-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id). +-define(REP_TO_STATE, couch_rep_id_to_rep_state). -define(INITIAL_WAIT, 5). +-record(rep_state, { + rep, + starting, + max_retries +}). + -import(couch_replicator_utils, [ - parse_rep_doc/2, - update_rep_doc/2 + parse_rep_doc/2 ]). -import(couch_util, [ get_value/2, - get_value/3 + get_value/3, + to_binary/1 ]). -record(state, { changes_feed_loop = nil, db_notifier = nil, + rep_notifier = nil, rep_db_name = nil, rep_start_pids = [], max_retries @@ -46,8 +54,8 @@ start_link() -> init(_) -> process_flag(trap_exit, true), - _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]), - _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]), + ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]), + ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]), Server = self(), ok = couch_config:register( fun("replicator", "db", NewName) -> @@ -62,6 +70,7 @@ init(_) -> changes_feed_loop = Loop, rep_db_name = RepDbName, db_notifier = db_update_notifier(), + rep_notifier = rep_notifier(), max_retries = list_to_integer( couch_config:get("replicator", "max_replication_retry_count", "10")) }}. @@ -70,26 +79,49 @@ init(_) -> handle_call({rep_db_update, Change}, _From, State) -> {reply, ok, process_update(State, Change)}; -handle_call({triggered, {BaseId, _}}, _From, State) -> - [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId), - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}), +handle_call({rep_started, {BaseId, _Ext} = RepId}, _From, State) -> + #rep_state{rep = #rep{doc_id = DocId}} = RepState = rep_state(RepId), + true = ets:insert( + ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}}), + update_rep_doc(DocId, [ + {<<"_replication_state">>, <<"triggered">>}, + {<<"_replication_id">>, ?l2b(BaseId)}]), {reply, ok, State}; -handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) -> - DocId = get_value(<<"_id">>, Props), - [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup( - ?DOC_ID_TO_REP_ID, DocId), +handle_call({rep_start_failure, Rep, Error}, _From, State) -> + #rep{id = {BaseId, _} = RepId, doc_id = DocId} = Rep, + #rep_state{max_retries = MaxRetries} = rep_state(RepId), ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using " "the document `~s`. Last error reason was: ~p", [pp_rep_id(RepId), MaxRetries, DocId, Error]), update_rep_doc( - RepDoc, + DocId, [{<<"_replication_state">>, <<"error">>}, {<<"_replication_id">>, ?l2b(BaseId)}]), - true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), - true = ets:delete(?DOC_ID_TO_REP_ID, DocId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), {reply, ok, State}; +handle_call({rep_complete, RepId}, _From, State) -> + #rep_state{rep = #rep{doc_id = DocId}} = rep_state(RepId), + couch_replicator:cancel_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), + ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", + [pp_rep_id(RepId), DocId]), + update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]), + {reply, ok, State}; + +handle_call({rep_error, RepId, Error}, _From, State) -> + #rep_state{rep = #rep{doc_id = DocId}} = rep_state(RepId), + couch_replicator:cancel_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), + ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s", + [pp_rep_id(RepId), DocId, to_binary(error_reason(Error))]), + update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]), + {reply, ok, State}; + handle_call(Msg, From, State) -> ?LOG_ERROR("Replication manager received unexpected call ~p from ~p", [Msg, From]), @@ -124,6 +156,10 @@ handle_info({'EXIT', From, Reason}, #sta ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]), {stop, {db_update_notifier_died, Reason}, State}; +handle_info({'EXIT', From, Reason}, #state{rep_notifier = From} = State) -> + ?LOG_ERROR("Replication notifier died. Reason: ~p", [Reason]), + {stop, {rep_notifier_died, Reason}, State}; + handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) -> % one of the replication start processes terminated successfully {noreply, State#state{rep_start_pids = Pids -- [From]}}; @@ -146,8 +182,8 @@ terminate(_Reason, State) -> catch exit(Pid, stop) end, [Loop | StartPids]), - true = ets:delete(?REP_ID_TO_DOC_ID), - true = ets:delete(?DOC_ID_TO_REP_ID), + true = ets:delete(?REP_TO_STATE), + true = ets:delete(?DOC_TO_REP), couch_db_update_notifier:stop(Notifier). @@ -156,7 +192,7 @@ code_change(_OldVsn, State, _Extra) -> changes_feed_loop() -> - {ok, RepDb} = couch_replicator_utils:ensure_rep_db_exists(), + {ok, RepDb} = ensure_rep_db_exists(), Server = self(), Pid = spawn_link( fun() -> @@ -216,6 +252,31 @@ db_update_notifier() -> Notifier. +rep_notifier() -> + Server = self(), + {ok, Notifier} = couch_replication_notifier:start_link( + fun({finished, RepId, _CheckPointHistory}) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{} -> + % TODO: maybe add replication stats to the doc + ok = gen_server:call(Server, {rep_complete, RepId}, infinity) + end; + ({error, RepId, Error}) -> + case rep_state(RepId) of + nil -> + ok; + #rep_state{} -> + ok = gen_server:call( + Server, {rep_error, RepId, Error}, infinity) + end; + (_) -> + ok + end), + Notifier. + + restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) -> stop_all_replications(), lists:foreach( @@ -241,16 +302,12 @@ process_update(State, {Change}) -> State; false -> case get_value(<<"_replication_state">>, RepProps) of - <<"completed">> -> - replication_complete(DocId), - State; - <<"error">> -> - stop_replication(DocId), - State; + undefined -> + maybe_start_replication(State, DocId, JsonRepDoc); <<"triggered">> -> maybe_start_replication(State, DocId, JsonRepDoc); - undefined -> - maybe_start_replication(State, DocId, JsonRepDoc) + _ -> + State end end. @@ -267,101 +324,88 @@ rep_user_ctx({RepDoc}) -> end. -maybe_start_replication(#state{max_retries = MaxRetries} = State, - DocId, JsonRepDoc) -> - {ok, #rep{id = {BaseId, _} = RepId} = Rep} = - parse_rep_doc(JsonRepDoc, rep_user_ctx(JsonRepDoc)), - case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of - [] -> - true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), - true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}), +maybe_start_replication(State, DocId, RepDoc) -> + {ok, #rep{id = {BaseId, _} = RepId} = Rep} = parse_rep_doc( + RepDoc, rep_user_ctx(RepDoc)), + case rep_state(RepId) of + nil -> + RepState = #rep_state{ + rep = Rep, + starting = true, + max_retries = State#state.max_retries + }, + true = ets:insert(?REP_TO_STATE, {RepId, RepState}), + true = ets:insert(?DOC_TO_REP, {DocId, RepId}), + ?LOG_INFO("Attempting to start replication `~s` (document `~s`).", + [pp_rep_id(RepId), DocId]), Server = self(), - Pid = spawn_link( - fun() -> start_replication(Server, Rep, MaxRetries) end), + Pid = spawn_link(fun() -> + start_replication(Server, Rep, State#state.max_retries) + end), State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; - [{BaseId, {DocId, _}}] -> + #rep_state{rep = #rep{doc_id = DocId}} -> State; - [{BaseId, {OtherDocId, false}}] -> + #rep_state{starting = false, rep = #rep{doc_id = OtherDocId}} -> ?LOG_INFO("The replication specified by the document `~s` was already" " triggered by the document `~s`", [DocId, OtherDocId]), - maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), + maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)), State; - [{BaseId, {OtherDocId, true}}] -> + #rep_state{starting = true, rep = #rep{doc_id = OtherDocId}} -> ?LOG_INFO("The replication specified by the document `~s` is already" " being triggered by the document `~s`", [DocId, OtherDocId]), - maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), + maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)), State end. -maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> - case get_value(<<"_replication_id">>, Props) of +maybe_tag_rep_doc(DocId, {RepProps}, RepId) -> + case get_value(<<"_replication_id">>, RepProps) of RepId -> ok; _ -> - update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}]) + update_rep_doc(DocId, [{<<"_replication_id">>, RepId}]) end. -start_replication(Server, #rep{id = RepId, doc = {RepProps}} = Rep, MaxRetries) -> +start_replication(Server, #rep{id = RepId, doc_id = DocId} = Rep, MaxRetries) -> case (catch couch_replicator:async_replicate(Rep)) of {ok, _} -> - ok = gen_server:call(Server, {triggered, RepId}, infinity), + ok = gen_server:call(Server, {rep_started, RepId}, infinity), ?LOG_INFO("Document `~s` triggered replication `~s`", - [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]); + [DocId, pp_rep_id(RepId)]); Error -> keep_retrying(Server, Rep, Error, ?INITIAL_WAIT, MaxRetries) end. keep_retrying(Server, Rep, Error, _Wait, 0) -> - ok = gen_server:call(Server, {restart_failure, Rep, Error}, infinity); + ok = gen_server:call(Server, {rep_start_failure, Rep, Error}, infinity); -keep_retrying(Server, #rep{doc = {RepProps}} = Rep, Error, Wait, RetriesLeft) -> - DocId = get_value(<<"_id">>, RepProps), +keep_retrying(Server, #rep{doc_id = DocId} = Rep, Error, Wait, RetriesLeft) -> ?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. " "Retrying in ~p seconds", [pp_rep_id(Rep), DocId, Error, Wait]), ok = timer:sleep(Wait * 1000), case (catch couch_replicator:async_replicate(Rep)) of {ok, _} -> - ok = gen_server:call(Server, {triggered, Rep#rep.id}, infinity), - [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId), + ok = gen_server:call(Server, {rep_started, Rep#rep.id}, infinity), + #rep_state{max_retries = MaxRetries} = rep_state(Rep#rep.id), ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts", - [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]); + [DocId, pp_rep_id(Rep), MaxRetries - RetriesLeft + 1]); NewError -> keep_retrying(Server, Rep, NewError, Wait * 2, RetriesLeft - 1) end. rep_doc_deleted(DocId) -> - case stop_replication(DocId) of - {ok, RepId} -> + case ets:lookup(?DOC_TO_REP, DocId) of + [{DocId, RepId}] -> + couch_replicator:cancel_replication(RepId), + true = ets:delete(?REP_TO_STATE, RepId), + true = ets:delete(?DOC_TO_REP, DocId), ?LOG_INFO("Stopped replication `~s` because replication document `~s`" " was deleted", [pp_rep_id(RepId), DocId]); - none -> - ok - end. - - -replication_complete(DocId) -> - case stop_replication(DocId) of - {ok, RepId} -> - ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", - [pp_rep_id(RepId), DocId]); - none -> - ok - end. - - -stop_replication(DocId) -> - case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of - [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] -> - couch_replicator:cancel_replication(RepId), - true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), - true = ets:delete(?DOC_ID_TO_REP_ID, DocId), - {ok, RepId}; [] -> - none + ok end. @@ -369,12 +413,73 @@ stop_all_replications() -> ?LOG_INFO("Stopping all ongoing replications because the replicator" " database was deleted or changed", []), ets:foldl( - fun({_, {RepId, _}}, _) -> + fun({_, RepId}, _) -> couch_replicator:cancel_replication(RepId) end, - ok, ?DOC_ID_TO_REP_ID), - true = ets:delete_all_objects(?REP_ID_TO_DOC_ID), - true = ets:delete_all_objects(?DOC_ID_TO_REP_ID). + ok, ?DOC_TO_REP), + true = ets:delete_all_objects(?REP_TO_STATE), + true = ets:delete_all_objects(?DOC_TO_REP). + + +update_rep_doc(RepDocId, KVs) -> + {ok, RepDb} = ensure_rep_db_exists(), + try + {ok, LatestRepDoc} = couch_db:open_doc(RepDb, RepDocId, []), + update_rep_doc(RepDb, LatestRepDoc, KVs) + catch throw:conflict -> + % Shouldn't happen, as by default only the role _replicator can + % update replication documents. + ?LOG_ERROR("Conflict error when updating replication document `~s`." + " Retrying.", [RepDocId]), + ok = timer:sleep(5), + update_rep_doc(RepDocId, KVs) + after + couch_db:close(RepDb) + end. + +update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> + NewRepDocBody = lists:foldl( + fun({<<"_replication_state">> = K, _V} = KV, Body) -> + Body1 = lists:keystore(K, 1, Body, KV), + {Mega, Secs, _} = erlang:now(), + UnixTime = Mega * 1000000 + Secs, + lists:keystore( + <<"_replication_state_time">>, 1, Body1, + {<<"_replication_state_time">>, UnixTime}); + ({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, KVs), + % Might not succeed - when the replication doc is deleted right + % before this update (not an error, ignore). + couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []). + + +ensure_rep_db_exists() -> + DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), + UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}, + case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}]) of + {ok, Db} -> + Db; + _Error -> + {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}]) + end, + ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), + {ok, Db}. + + +ensure_rep_ddoc_exists(RepDb, DDocID) -> + case couch_db:open_doc(RepDb, DDocID, []) of + {ok, _Doc} -> + ok; + _ -> + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} + ]}), + {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) + end. % pretty-print replication id @@ -382,3 +487,18 @@ pp_rep_id(#rep{id = RepId}) -> pp_rep_id(RepId); pp_rep_id({Base, Extension}) -> Base ++ Extension. + + +rep_state(RepId) -> + case ets:lookup(?REP_TO_STATE, RepId) of + [{RepId, RepState}] -> + RepState; + [] -> + nil + end. + + +error_reason({error, Reason}) -> + Reason; +error_reason(Reason) -> + Reason. Modified: couchdb/trunk/src/couchdb/couch_replicator.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1079483&r1=1079482&r2=1079483&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator.erl (original) +++ couchdb/trunk/src/couchdb/couch_replicator.erl Tue Mar 8 18:49:53 2011 @@ -35,7 +35,6 @@ ]). -import(couch_replicator_utils, [ - update_rep_doc/2, start_db_compaction_notifier/2, stop_db_compaction_notifier/1 ]). @@ -241,8 +240,6 @@ do_init(#rep{options = Options, id = {Ba end, lists:seq(1, CopiersCount)), - maybe_set_triggered(Rep), - couch_task_status:add_task( "Replication", io_lib:format("`~s`: `~s` -> `~s`", @@ -439,10 +436,9 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -terminate(normal, #rep_state{rep_details = #rep{id = RepId, doc = RepDoc}, +terminate(normal, #rep_state{rep_details = #rep{id = RepId}, checkpoint_history = CheckpointHistory} = State) -> terminate_cleanup(State), - update_rep_doc(RepDoc, [{<<"_replication_state">>, <<"completed">>}]), couch_replication_notifier:notify({finished, RepId, CheckpointHistory}); terminate(shutdown, State) -> @@ -453,12 +449,11 @@ terminate(Reason, State) -> #rep_state{ source_name = Source, target_name = Target, - rep_details = #rep{id = {BaseId, Ext} = RepId, doc = RepDoc} + rep_details = #rep{id = {BaseId, Ext} = RepId} } = State, ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s", [BaseId ++ Ext, Source, Target, to_binary(Reason)]), terminate_cleanup(State), - update_rep_doc(RepDoc, [{<<"_replication_state">>, <<"error">>}]), couch_replication_notifier:notify({error, RepId, Reason}). @@ -811,20 +806,6 @@ sum_stats([Stats1 | RestStats]) -> Stats1, RestStats). -maybe_set_triggered(#rep{id = {BaseId, _}, doc = {RepProps} = RepDoc}) -> - case get_value(<<"_replication_state">>, RepProps) of - <<"triggered">> -> - ok; - _ -> - update_rep_doc( - RepDoc, - [ - {<<"_replication_state">>, <<"triggered">>}, - {<<"_replication_id">>, ?l2b(BaseId)} - ]) - end. - - db_monitor(#db{} = Db) -> couch_db:monitor(Db); db_monitor(_HttpDb) -> Modified: couchdb/trunk/src/couchdb/couch_replicator.hrl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.hrl?rev=1079483&r1=1079482&r2=1079483&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator.hrl (original) +++ couchdb/trunk/src/couchdb/couch_replicator.hrl Tue Mar 8 18:49:53 2011 @@ -18,7 +18,7 @@ target, options, user_ctx, - doc + doc_id }). -record(rep_stats, { Modified: couchdb/trunk/src/couchdb/couch_replicator_utils.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_utils.erl?rev=1079483&r1=1079482&r2=1079483&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_replicator_utils.erl (original) +++ couchdb/trunk/src/couchdb/couch_replicator_utils.erl Tue Mar 8 18:49:53 2011 @@ -13,8 +13,6 @@ -module(couch_replicator_utils). -export([parse_rep_doc/2]). --export([update_rep_doc/2]). --export([ensure_rep_db_exists/0]). -export([open_db/1, close_db/1]). -export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]). -export([replication_id/2]). @@ -22,7 +20,6 @@ -include("couch_db.hrl"). -include("couch_api_wrap.hrl"). -include("couch_replicator.hrl"). --include("couch_js_functions.hrl"). -include("../ibrowse/ibrowse.hrl"). -import(couch_util, [ @@ -31,7 +28,7 @@ ]). -parse_rep_doc({Props} = RepObj, UserCtx) -> +parse_rep_doc({Props}, UserCtx) -> ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)), Options = make_options(Props), Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options), @@ -41,80 +38,11 @@ parse_rep_doc({Props} = RepObj, UserCtx) target = Target, options = Options, user_ctx = UserCtx, - doc = RepObj + doc_id = get_value(<<"_id">>, Props) }, {ok, Rep#rep{id = replication_id(Rep)}}. -update_rep_doc({Props} = _RepDoc, KVs) -> - case get_value(<<"_id">>, Props) of - undefined -> - ok; - RepDocId -> - {ok, RepDb} = ensure_rep_db_exists(), - case couch_db:open_doc(RepDb, RepDocId, []) of - {ok, LatestRepDoc} -> - update_rep_doc(RepDb, LatestRepDoc, KVs); - _ -> - ok - end, - couch_db:close(RepDb) - end. - -update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> - NewRepDocBody = lists:foldl( - fun({<<"_replication_state">> = K, _V} = KV, Body) -> - Body1 = lists:keystore(K, 1, Body, KV), - {Mega, Secs, _} = erlang:now(), - UnixTime = Mega * 1000000 + Secs, - lists:keystore( - <<"_replication_state_time">>, 1, - Body1, {<<"_replication_state_time">>, UnixTime}); - ({K, _V} = KV, Body) -> - lists:keystore(K, 1, Body, KV) - end, - RepDocBody, - KVs - ), - % might not succeed - when the replication doc is deleted right - % before this update (not an error) - couch_db:update_doc( - RepDb, - RepDoc#doc{body = {NewRepDocBody}}, - []). - - -ensure_rep_db_exists() -> - DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), - Opts = [ - {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, - sys_db - ], - case couch_db:open(DbName, Opts) of - {ok, Db} -> - Db; - _Error -> - {ok, Db} = couch_db:create(DbName, Opts) - end, - ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), - {ok, Db}. - - -ensure_rep_ddoc_exists(RepDb, DDocID) -> - case couch_db:open_doc(RepDb, DDocID, []) of - {ok, _Doc} -> - ok; - _ -> - DDoc = couch_doc:from_json_obj({[ - {<<"_id">>, DDocID}, - {<<"language">>, <<"javascript">>}, - {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} - ]}), - {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) - end, - ok. - - replication_id(#rep{options = Options} = Rep) -> BaseId = replication_id(Rep, ?REP_ID_VERSION), {BaseId, maybe_append_options([continuous, create_target], Options)}.