Author: fdmanana
Date: Tue Mar 8 18:49:53 2011
New Revision: 1079483
URL: http://svn.apache.org/viewvc?rev=1079483&view=rev
Log:
Adapt replication manager to the new replicator's code
Now all the replication document management is done only by the
couch_replication_manager module, instead of being split by this
module and replication gen_servers. The code is also simpler now,
since it uses the couch_replication_notifier gen_event.
This is a pure refactoring, not adding any new behaviour.
Modified:
couchdb/trunk/src/couchdb/couch_replication_manager.erl
couchdb/trunk/src/couchdb/couch_replicator.erl
couchdb/trunk/src/couchdb/couch_replicator.hrl
couchdb/trunk/src/couchdb/couch_replicator_utils.erl
Modified: couchdb/trunk/src/couchdb/couch_replication_manager.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replication_manager.erl?rev=1079483&r1=1079482&r2=1079483&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replication_manager.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replication_manager.erl Tue Mar 8 18:49:53 2011
@@ -18,23 +18,31 @@
-include("couch_db.hrl").
-include("couch_replicator.hrl").
+-include("couch_js_functions.hrl").
--define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
--define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
+-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
+-define(REP_TO_STATE, couch_rep_id_to_rep_state).
-define(INITIAL_WAIT, 5).
+-record(rep_state, {
+ rep,
+ starting,
+ max_retries
+}).
+
-import(couch_replicator_utils, [
- parse_rep_doc/2,
- update_rep_doc/2
+ parse_rep_doc/2
]).
-import(couch_util, [
get_value/2,
- get_value/3
+ get_value/3,
+ to_binary/1
]).
-record(state, {
changes_feed_loop = nil,
db_notifier = nil,
+ rep_notifier = nil,
rep_db_name = nil,
rep_start_pids = [],
max_retries
@@ -46,8 +54,8 @@ start_link() ->
init(_) ->
process_flag(trap_exit, true),
- _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]),
- _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
+ ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
+ ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
Server = self(),
ok = couch_config:register(
fun("replicator", "db", NewName) ->
@@ -62,6 +70,7 @@ init(_) ->
changes_feed_loop = Loop,
rep_db_name = RepDbName,
db_notifier = db_update_notifier(),
+ rep_notifier = rep_notifier(),
max_retries = list_to_integer(
couch_config:get("replicator", "max_replication_retry_count", "10"))
}}.
@@ -70,26 +79,49 @@ init(_) ->
handle_call({rep_db_update, Change}, _From, State) ->
{reply, ok, process_update(State, Change)};
-handle_call({triggered, {BaseId, _}}, _From, State) ->
- [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
- true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}),
+handle_call({rep_started, {BaseId, _Ext} = RepId}, _From, State) ->
+ #rep_state{rep = #rep{doc_id = DocId}} = RepState = rep_state(RepId),
+ true = ets:insert(
+ ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}}),
+ update_rep_doc(DocId, [
+ {<<"_replication_state">>, <<"triggered">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
{reply, ok, State};
-handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
- DocId = get_value(<<"_id">>, Props),
- [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup(
- ?DOC_ID_TO_REP_ID, DocId),
+handle_call({rep_start_failure, Rep, Error}, _From, State) ->
+ #rep{id = {BaseId, _} = RepId, doc_id = DocId} = Rep,
+ #rep_state{max_retries = MaxRetries} = rep_state(RepId),
?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
"the document `~s`. Last error reason was: ~p",
[pp_rep_id(RepId), MaxRetries, DocId, Error]),
update_rep_doc(
- RepDoc,
+ DocId,
[{<<"_replication_state">>, <<"error">>},
{<<"_replication_id">>, ?l2b(BaseId)}]),
- true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
- true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
{reply, ok, State};
+handle_call({rep_complete, RepId}, _From, State) ->
+ #rep_state{rep = #rep{doc_id = DocId}} = rep_state(RepId),
+ couch_replicator:cancel_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
+ ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+ [pp_rep_id(RepId), DocId]),
+ update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
+ {reply, ok, State};
+
+handle_call({rep_error, RepId, Error}, _From, State) ->
+ #rep_state{rep = #rep{doc_id = DocId}} = rep_state(RepId),
+ couch_replicator:cancel_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
+ ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s",
+ [pp_rep_id(RepId), DocId, to_binary(error_reason(Error))]),
+ update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]),
+ {reply, ok, State};
+
handle_call(Msg, From, State) ->
?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
[Msg, From]),
@@ -124,6 +156,10 @@ handle_info({'EXIT', From, Reason}, #sta
?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
{stop, {db_update_notifier_died, Reason}, State};
+handle_info({'EXIT', From, Reason}, #state{rep_notifier = From} = State) ->
+ ?LOG_ERROR("Replication notifier died. Reason: ~p", [Reason]),
+ {stop, {rep_notifier_died, Reason}, State};
+
handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
% one of the replication start processes terminated successfully
{noreply, State#state{rep_start_pids = Pids -- [From]}};
@@ -146,8 +182,8 @@ terminate(_Reason, State) ->
catch exit(Pid, stop)
end,
[Loop | StartPids]),
- true = ets:delete(?REP_ID_TO_DOC_ID),
- true = ets:delete(?DOC_ID_TO_REP_ID),
+ true = ets:delete(?REP_TO_STATE),
+ true = ets:delete(?DOC_TO_REP),
couch_db_update_notifier:stop(Notifier).
@@ -156,7 +192,7 @@ code_change(_OldVsn, State, _Extra) ->
changes_feed_loop() ->
- {ok, RepDb} = couch_replicator_utils:ensure_rep_db_exists(),
+ {ok, RepDb} = ensure_rep_db_exists(),
Server = self(),
Pid = spawn_link(
fun() ->
@@ -216,6 +252,31 @@ db_update_notifier() ->
Notifier.
+rep_notifier() ->
+ Server = self(),
+ {ok, Notifier} = couch_replication_notifier:start_link(
+ fun({finished, RepId, _CheckPointHistory}) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{} ->
+ % TODO: maybe add replication stats to the doc
+ ok = gen_server:call(Server, {rep_complete, RepId}, infinity)
+ end;
+ ({error, RepId, Error}) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{} ->
+ ok = gen_server:call(
+ Server, {rep_error, RepId, Error}, infinity)
+ end;
+ (_) ->
+ ok
+ end),
+ Notifier.
+
+
restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
stop_all_replications(),
lists:foreach(
@@ -241,16 +302,12 @@ process_update(State, {Change}) ->
State;
false ->
case get_value(<<"_replication_state">>, RepProps) of
- <<"completed">> ->
- replication_complete(DocId),
- State;
- <<"error">> ->
- stop_replication(DocId),
- State;
+ undefined ->
+ maybe_start_replication(State, DocId, JsonRepDoc);
<<"triggered">> ->
maybe_start_replication(State, DocId, JsonRepDoc);
- undefined ->
- maybe_start_replication(State, DocId, JsonRepDoc)
+ _ ->
+ State
end
end.
@@ -267,101 +324,88 @@ rep_user_ctx({RepDoc}) ->
end.
-maybe_start_replication(#state{max_retries = MaxRetries} = State,
- DocId, JsonRepDoc) ->
- {ok, #rep{id = {BaseId, _} = RepId} = Rep} =
- parse_rep_doc(JsonRepDoc, rep_user_ctx(JsonRepDoc)),
- case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
- [] ->
- true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
- true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}),
+maybe_start_replication(State, DocId, RepDoc) ->
+ {ok, #rep{id = {BaseId, _} = RepId} = Rep} = parse_rep_doc(
+ RepDoc, rep_user_ctx(RepDoc)),
+ case rep_state(RepId) of
+ nil ->
+ RepState = #rep_state{
+ rep = Rep,
+ starting = true,
+ max_retries = State#state.max_retries
+ },
+ true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
+ true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
+ ?LOG_INFO("Attempting to start replication `~s` (document `~s`).",
+ [pp_rep_id(RepId), DocId]),
Server = self(),
- Pid = spawn_link(
- fun() -> start_replication(Server, Rep, MaxRetries) end),
+ Pid = spawn_link(fun() ->
+ start_replication(Server, Rep, State#state.max_retries)
+ end),
State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
- [{BaseId, {DocId, _}}] ->
+ #rep_state{rep = #rep{doc_id = DocId}} ->
State;
- [{BaseId, {OtherDocId, false}}] ->
+ #rep_state{starting = false, rep = #rep{doc_id = OtherDocId}} ->
?LOG_INFO("The replication specified by the document `~s` was already"
" triggered by the document `~s`", [DocId, OtherDocId]),
- maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+ maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
State;
- [{BaseId, {OtherDocId, true}}] ->
+ #rep_state{starting = true, rep = #rep{doc_id = OtherDocId}} ->
?LOG_INFO("The replication specified by the document `~s` is already"
" being triggered by the document `~s`", [DocId, OtherDocId]),
- maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+ maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
State
end.
-maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
- case get_value(<<"_replication_id">>, Props) of
+maybe_tag_rep_doc(DocId, {RepProps}, RepId) ->
+ case get_value(<<"_replication_id">>, RepProps) of
RepId ->
ok;
_ ->
- update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
+ update_rep_doc(DocId, [{<<"_replication_id">>, RepId}])
end.
-start_replication(Server, #rep{id = RepId, doc = {RepProps}} = Rep, MaxRetries) ->
+start_replication(Server, #rep{id = RepId, doc_id = DocId} = Rep, MaxRetries) ->
case (catch couch_replicator:async_replicate(Rep)) of
{ok, _} ->
- ok = gen_server:call(Server, {triggered, RepId}, infinity),
+ ok = gen_server:call(Server, {rep_started, RepId}, infinity),
?LOG_INFO("Document `~s` triggered replication `~s`",
- [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]);
+ [DocId, pp_rep_id(RepId)]);
Error ->
keep_retrying(Server, Rep, Error, ?INITIAL_WAIT, MaxRetries)
end.
keep_retrying(Server, Rep, Error, _Wait, 0) ->
- ok = gen_server:call(Server, {restart_failure, Rep, Error}, infinity);
+ ok = gen_server:call(Server, {rep_start_failure, Rep, Error}, infinity);
-keep_retrying(Server, #rep{doc = {RepProps}} = Rep, Error, Wait, RetriesLeft) ->
- DocId = get_value(<<"_id">>, RepProps),
+keep_retrying(Server, #rep{doc_id = DocId} = Rep, Error, Wait, RetriesLeft) ->
?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. "
"Retrying in ~p seconds", [pp_rep_id(Rep), DocId, Error, Wait]),
ok = timer:sleep(Wait * 1000),
case (catch couch_replicator:async_replicate(Rep)) of
{ok, _} ->
- ok = gen_server:call(Server, {triggered, Rep#rep.id}, infinity),
- [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
+ ok = gen_server:call(Server, {rep_started, Rep#rep.id}, infinity),
+ #rep_state{max_retries = MaxRetries} = rep_state(Rep#rep.id),
?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
- [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]);
+ [DocId, pp_rep_id(Rep), MaxRetries - RetriesLeft + 1]);
NewError ->
keep_retrying(Server, Rep, NewError, Wait * 2, RetriesLeft - 1)
end.
rep_doc_deleted(DocId) ->
- case stop_replication(DocId) of
- {ok, RepId} ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [{DocId, RepId}] ->
+ couch_replicator:cancel_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
?LOG_INFO("Stopped replication `~s` because replication document `~s`"
" was deleted", [pp_rep_id(RepId), DocId]);
- none ->
- ok
- end.
-
-
-replication_complete(DocId) ->
- case stop_replication(DocId) of
- {ok, RepId} ->
- ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
- [pp_rep_id(RepId), DocId]);
- none ->
- ok
- end.
-
-
-stop_replication(DocId) ->
- case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
- [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] ->
- couch_replicator:cancel_replication(RepId),
- true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
- true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
- {ok, RepId};
[] ->
- none
+ ok
end.
@@ -369,12 +413,73 @@ stop_all_replications() ->
?LOG_INFO("Stopping all ongoing replications because the replicator"
" database was deleted or changed", []),
ets:foldl(
- fun({_, {RepId, _}}, _) ->
+ fun({_, RepId}, _) ->
couch_replicator:cancel_replication(RepId)
end,
- ok, ?DOC_ID_TO_REP_ID),
- true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
- true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
+ ok, ?DOC_TO_REP),
+ true = ets:delete_all_objects(?REP_TO_STATE),
+ true = ets:delete_all_objects(?DOC_TO_REP).
+
+
+update_rep_doc(RepDocId, KVs) ->
+ {ok, RepDb} = ensure_rep_db_exists(),
+ try
+ {ok, LatestRepDoc} = couch_db:open_doc(RepDb, RepDocId, []),
+ update_rep_doc(RepDb, LatestRepDoc, KVs)
+ catch throw:conflict ->
+ % Shouldn't happen, as by default only the role _replicator can
+ % update replication documents.
+ ?LOG_ERROR("Conflict error when updating replication document `~s`."
+ " Retrying.", [RepDocId]),
+ ok = timer:sleep(5),
+ update_rep_doc(RepDocId, KVs)
+ after
+ couch_db:close(RepDb)
+ end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+ NewRepDocBody = lists:foldl(
+ fun({<<"_replication_state">> = K, _V} = KV, Body) ->
+ Body1 = lists:keystore(K, 1, Body, KV),
+ {Mega, Secs, _} = erlang:now(),
+ UnixTime = Mega * 1000000 + Secs,
+ lists:keystore(
+ <<"_replication_state_time">>, 1, Body1,
+ {<<"_replication_state_time">>, UnixTime});
+ ({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody, KVs),
+ % Might not succeed - when the replication doc is deleted right
+ % before this update (not an error, ignore).
+ couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []).
+
+
+ensure_rep_db_exists() ->
+ DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+ UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
+ case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}]) of
+ {ok, Db} ->
+ Db;
+ _Error ->
+ {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}])
+ end,
+ ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+ {ok, Db}.
+
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+ case couch_db:open_doc(RepDb, DDocID, []) of
+ {ok, _Doc} ->
+ ok;
+ _ ->
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+ ]}),
+ {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+ end.
% pretty-print replication id
@@ -382,3 +487,18 @@ pp_rep_id(#rep{id = RepId}) ->
pp_rep_id(RepId);
pp_rep_id({Base, Extension}) ->
Base ++ Extension.
+
+
+rep_state(RepId) ->
+ case ets:lookup(?REP_TO_STATE, RepId) of
+ [{RepId, RepState}] ->
+ RepState;
+ [] ->
+ nil
+ end.
+
+
+error_reason({error, Reason}) ->
+ Reason;
+error_reason(Reason) ->
+ Reason.
Modified: couchdb/trunk/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1079483&r1=1079482&r2=1079483&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Tue Mar 8 18:49:53 2011
@@ -35,7 +35,6 @@
]).
-import(couch_replicator_utils, [
- update_rep_doc/2,
start_db_compaction_notifier/2,
stop_db_compaction_notifier/1
]).
@@ -241,8 +240,6 @@ do_init(#rep{options = Options, id = {Ba
end,
lists:seq(1, CopiersCount)),
- maybe_set_triggered(Rep),
-
couch_task_status:add_task(
"Replication",
io_lib:format("`~s`: `~s` -> `~s`",
@@ -439,10 +436,9 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-terminate(normal, #rep_state{rep_details = #rep{id = RepId, doc = RepDoc},
+terminate(normal, #rep_state{rep_details = #rep{id = RepId},
checkpoint_history = CheckpointHistory} = State) ->
terminate_cleanup(State),
- update_rep_doc(RepDoc, [{<<"_replication_state">>, <<"completed">>}]),
couch_replication_notifier:notify({finished, RepId, CheckpointHistory});
terminate(shutdown, State) ->
@@ -453,12 +449,11 @@ terminate(Reason, State) ->
#rep_state{
source_name = Source,
target_name = Target,
- rep_details = #rep{id = {BaseId, Ext} = RepId, doc = RepDoc}
+ rep_details = #rep{id = {BaseId, Ext} = RepId}
} = State,
?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
[BaseId ++ Ext, Source, Target, to_binary(Reason)]),
terminate_cleanup(State),
- update_rep_doc(RepDoc, [{<<"_replication_state">>, <<"error">>}]),
couch_replication_notifier:notify({error, RepId, Reason}).
@@ -811,20 +806,6 @@ sum_stats([Stats1 | RestStats]) ->
Stats1, RestStats).
-maybe_set_triggered(#rep{id = {BaseId, _}, doc = {RepProps} = RepDoc}) ->
- case get_value(<<"_replication_state">>, RepProps) of
- <<"triggered">> ->
- ok;
- _ ->
- update_rep_doc(
- RepDoc,
- [
- {<<"_replication_state">>, <<"triggered">>},
- {<<"_replication_id">>, ?l2b(BaseId)}
- ])
- end.
-
-
db_monitor(#db{} = Db) ->
couch_db:monitor(Db);
db_monitor(_HttpDb) ->
Modified: couchdb/trunk/src/couchdb/couch_replicator.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.hrl?rev=1079483&r1=1079482&r2=1079483&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.hrl Tue Mar 8 18:49:53 2011
@@ -18,7 +18,7 @@
target,
options,
user_ctx,
- doc
+ doc_id
}).
-record(rep_stats, {
Modified: couchdb/trunk/src/couchdb/couch_replicator_utils.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_utils.erl?rev=1079483&r1=1079482&r2=1079483&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_utils.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator_utils.erl Tue Mar 8 18:49:53 2011
@@ -13,8 +13,6 @@
-module(couch_replicator_utils).
-export([parse_rep_doc/2]).
--export([update_rep_doc/2]).
--export([ensure_rep_db_exists/0]).
-export([open_db/1, close_db/1]).
-export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
-export([replication_id/2]).
@@ -22,7 +20,6 @@
-include("couch_db.hrl").
-include("couch_api_wrap.hrl").
-include("couch_replicator.hrl").
--include("couch_js_functions.hrl").
-include("../ibrowse/ibrowse.hrl").
-import(couch_util, [
@@ -31,7 +28,7 @@
]).
-parse_rep_doc({Props} = RepObj, UserCtx) ->
+parse_rep_doc({Props}, UserCtx) ->
ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
Options = make_options(Props),
Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
@@ -41,80 +38,11 @@ parse_rep_doc({Props} = RepObj, UserCtx)
target = Target,
options = Options,
user_ctx = UserCtx,
- doc = RepObj
+ doc_id = get_value(<<"_id">>, Props)
},
{ok, Rep#rep{id = replication_id(Rep)}}.
-update_rep_doc({Props} = _RepDoc, KVs) ->
- case get_value(<<"_id">>, Props) of
- undefined ->
- ok;
- RepDocId ->
- {ok, RepDb} = ensure_rep_db_exists(),
- case couch_db:open_doc(RepDb, RepDocId, []) of
- {ok, LatestRepDoc} ->
- update_rep_doc(RepDb, LatestRepDoc, KVs);
- _ ->
- ok
- end,
- couch_db:close(RepDb)
- end.
-
-update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
- NewRepDocBody = lists:foldl(
- fun({<<"_replication_state">> = K, _V} = KV, Body) ->
- Body1 = lists:keystore(K, 1, Body, KV),
- {Mega, Secs, _} = erlang:now(),
- UnixTime = Mega * 1000000 + Secs,
- lists:keystore(
- <<"_replication_state_time">>, 1,
- Body1, {<<"_replication_state_time">>, UnixTime});
- ({K, _V} = KV, Body) ->
- lists:keystore(K, 1, Body, KV)
- end,
- RepDocBody,
- KVs
- ),
- % might not succeed - when the replication doc is deleted right
- % before this update (not an error)
- couch_db:update_doc(
- RepDb,
- RepDoc#doc{body = {NewRepDocBody}},
- []).
-
-
-ensure_rep_db_exists() ->
- DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
- Opts = [
- {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
- sys_db
- ],
- case couch_db:open(DbName, Opts) of
- {ok, Db} ->
- Db;
- _Error ->
- {ok, Db} = couch_db:create(DbName, Opts)
- end,
- ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
- {ok, Db}.
-
-
-ensure_rep_ddoc_exists(RepDb, DDocID) ->
- case couch_db:open_doc(RepDb, DDocID, []) of
- {ok, _Doc} ->
- ok;
- _ ->
- DDoc = couch_doc:from_json_obj({[
- {<<"_id">>, DDocID},
- {<<"language">>, <<"javascript">>},
- {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
- ]}),
- {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
- end,
- ok.
-
-
replication_id(#rep{options = Options} = Rep) ->
BaseId = replication_id(Rep, ?REP_ID_VERSION),
{BaseId, maybe_append_options([continuous, create_target], Options)}.
|