couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1079483 - in /couchdb/trunk/src/couchdb: couch_replication_manager.erl couch_replicator.erl couch_replicator.hrl couch_replicator_utils.erl
Date Tue, 08 Mar 2011 18:49:53 GMT
Author: fdmanana
Date: Tue Mar  8 18:49:53 2011
New Revision: 1079483

URL: http://svn.apache.org/viewvc?rev=1079483&view=rev
Log:
Adapt replication manager to the new replicator's code

Now all the replication document management is done only by the
couch_replication_manager module, instead of being split by this
module and replication gen_servers. The code is also simpler now,
since it uses the couch_replication_notifier gen_event.

This is a pure refactoring, not adding any new behaviour.


Modified:
    couchdb/trunk/src/couchdb/couch_replication_manager.erl
    couchdb/trunk/src/couchdb/couch_replicator.erl
    couchdb/trunk/src/couchdb/couch_replicator.hrl
    couchdb/trunk/src/couchdb/couch_replicator_utils.erl

Modified: couchdb/trunk/src/couchdb/couch_replication_manager.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replication_manager.erl?rev=1079483&r1=1079482&r2=1079483&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replication_manager.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replication_manager.erl Tue Mar  8 18:49:53 2011
@@ -18,23 +18,31 @@
 
 -include("couch_db.hrl").
 -include("couch_replicator.hrl").
+-include("couch_js_functions.hrl").
 
--define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
--define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
+-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
+-define(REP_TO_STATE, couch_rep_id_to_rep_state).
 -define(INITIAL_WAIT, 5).
 
+-record(rep_state, {
+    rep,
+    starting,
+    max_retries
+}).
+
 -import(couch_replicator_utils, [
-    parse_rep_doc/2,
-    update_rep_doc/2
+    parse_rep_doc/2
 ]).
 -import(couch_util, [
     get_value/2,
-    get_value/3
+    get_value/3,
+    to_binary/1
 ]).
 
 -record(state, {
     changes_feed_loop = nil,
     db_notifier = nil,
+    rep_notifier = nil,
     rep_db_name = nil,
     rep_start_pids = [],
     max_retries
@@ -46,8 +54,8 @@ start_link() ->
 
 init(_) ->
     process_flag(trap_exit, true),
-    _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]),
-    _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
+    ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
+    ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
     Server = self(),
     ok = couch_config:register(
         fun("replicator", "db", NewName) ->
@@ -62,6 +70,7 @@ init(_) ->
         changes_feed_loop = Loop,
         rep_db_name = RepDbName,
         db_notifier = db_update_notifier(),
+        rep_notifier = rep_notifier(),
         max_retries = list_to_integer(
             couch_config:get("replicator", "max_replication_retry_count", "10"))
     }}.
@@ -70,26 +79,49 @@ init(_) ->
 handle_call({rep_db_update, Change}, _From, State) ->
     {reply, ok, process_update(State, Change)};
 
-handle_call({triggered, {BaseId, _}}, _From, State) ->
-    [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
-    true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}),
+handle_call({rep_started, {BaseId, _Ext} = RepId}, _From, State) ->
+    #rep_state{rep = #rep{doc_id = DocId}} = RepState = rep_state(RepId),
+    true = ets:insert(
+        ?REP_TO_STATE, {RepId, RepState#rep_state{starting = false}}),
+    update_rep_doc(DocId, [
+        {<<"_replication_state">>, <<"triggered">>},
+        {<<"_replication_id">>, ?l2b(BaseId)}]),
     {reply, ok, State};
 
-handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
-    DocId = get_value(<<"_id">>, Props),
-    [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup(
-        ?DOC_ID_TO_REP_ID, DocId),
+handle_call({rep_start_failure, Rep, Error}, _From, State) ->
+    #rep{id = {BaseId, _} = RepId, doc_id = DocId} = Rep,
+    #rep_state{max_retries = MaxRetries} = rep_state(RepId),
     ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
         "the document `~s`. Last error reason was: ~p",
         [pp_rep_id(RepId), MaxRetries, DocId, Error]),
     update_rep_doc(
-        RepDoc,
+        DocId,
         [{<<"_replication_state">>, <<"error">>},
             {<<"_replication_id">>, ?l2b(BaseId)}]),
-    true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
-    true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
+    true = ets:delete(?REP_TO_STATE, RepId),
+    true = ets:delete(?DOC_TO_REP, DocId),
     {reply, ok, State};
 
+handle_call({rep_complete, RepId}, _From, State) ->
+    #rep_state{rep = #rep{doc_id = DocId}} = rep_state(RepId),
+    couch_replicator:cancel_replication(RepId),
+    true = ets:delete(?REP_TO_STATE, RepId),
+    true = ets:delete(?DOC_TO_REP, DocId),
+    ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+        [pp_rep_id(RepId), DocId]),
+    update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
+    {reply, ok, State};
+
+handle_call({rep_error, RepId, Error}, _From, State) ->
+    #rep_state{rep = #rep{doc_id = DocId}} = rep_state(RepId),
+    couch_replicator:cancel_replication(RepId),
+    true = ets:delete(?REP_TO_STATE, RepId),
+    true = ets:delete(?DOC_TO_REP, DocId),
+    ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s",
+        [pp_rep_id(RepId), DocId, to_binary(error_reason(Error))]),
+    update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]),
+     {reply, ok, State};
+
 handle_call(Msg, From, State) ->
     ?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
         [Msg, From]),
@@ -124,6 +156,10 @@ handle_info({'EXIT', From, Reason}, #sta
     ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
     {stop, {db_update_notifier_died, Reason}, State};
 
+handle_info({'EXIT', From, Reason}, #state{rep_notifier = From} = State) ->
+    ?LOG_ERROR("Replication notifier died. Reason: ~p", [Reason]),
+    {stop, {rep_notifier_died, Reason}, State};
+
 handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
     % one of the replication start processes terminated successfully
     {noreply, State#state{rep_start_pids = Pids -- [From]}};
@@ -146,8 +182,8 @@ terminate(_Reason, State) ->
             catch exit(Pid, stop)
         end,
         [Loop | StartPids]),
-    true = ets:delete(?REP_ID_TO_DOC_ID),
-    true = ets:delete(?DOC_ID_TO_REP_ID),
+    true = ets:delete(?REP_TO_STATE),
+    true = ets:delete(?DOC_TO_REP),
     couch_db_update_notifier:stop(Notifier).
 
 
@@ -156,7 +192,7 @@ code_change(_OldVsn, State, _Extra) ->
 
 
 changes_feed_loop() ->
-    {ok, RepDb} = couch_replicator_utils:ensure_rep_db_exists(),
+    {ok, RepDb} = ensure_rep_db_exists(),
     Server = self(),
     Pid = spawn_link(
         fun() ->
@@ -216,6 +252,31 @@ db_update_notifier() ->
     Notifier.
 
 
+rep_notifier() ->
+    Server = self(),
+    {ok, Notifier} = couch_replication_notifier:start_link(
+        fun({finished, RepId, _CheckPointHistory}) ->
+            case rep_state(RepId) of
+            nil ->
+                ok;
+            #rep_state{} ->
+                % TODO: maybe add replication stats to the doc
+                ok = gen_server:call(Server, {rep_complete, RepId}, infinity)
+            end;
+        ({error, RepId, Error}) ->
+            case rep_state(RepId) of
+            nil ->
+                ok;
+            #rep_state{} ->
+                ok = gen_server:call(
+                    Server, {rep_error, RepId, Error}, infinity)
+            end;
+        (_) ->
+            ok
+        end),
+    Notifier.
+
+
 restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
     stop_all_replications(),
     lists:foreach(
@@ -241,16 +302,12 @@ process_update(State, {Change}) ->
         State;
     false ->
         case get_value(<<"_replication_state">>, RepProps) of
-        <<"completed">> ->
-            replication_complete(DocId),
-            State;
-        <<"error">> ->
-            stop_replication(DocId),
-            State;
+        undefined ->
+            maybe_start_replication(State, DocId, JsonRepDoc);
         <<"triggered">> ->
             maybe_start_replication(State, DocId, JsonRepDoc);
-        undefined ->
-            maybe_start_replication(State, DocId, JsonRepDoc)
+        _ ->
+           State
         end
     end.
 
@@ -267,101 +324,88 @@ rep_user_ctx({RepDoc}) ->
     end.
 
 
-maybe_start_replication(#state{max_retries = MaxRetries} = State,
-        DocId, JsonRepDoc) ->
-    {ok, #rep{id = {BaseId, _} = RepId} = Rep} =
-        parse_rep_doc(JsonRepDoc, rep_user_ctx(JsonRepDoc)),
-    case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
-    [] ->
-        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
-        true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}),
+maybe_start_replication(State, DocId, RepDoc) ->
+    {ok, #rep{id = {BaseId, _} = RepId} = Rep} = parse_rep_doc(
+        RepDoc, rep_user_ctx(RepDoc)),
+    case rep_state(RepId) of
+    nil ->
+        RepState = #rep_state{
+            rep = Rep,
+            starting = true,
+            max_retries = State#state.max_retries
+        },
+        true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
+        true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
+        ?LOG_INFO("Attempting to start replication `~s` (document `~s`).",
+            [pp_rep_id(RepId), DocId]),
         Server = self(),
-        Pid = spawn_link(
-            fun() -> start_replication(Server, Rep, MaxRetries) end),
+        Pid = spawn_link(fun() ->
+           start_replication(Server, Rep, State#state.max_retries)
+        end),
         State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
-    [{BaseId, {DocId, _}}] ->
+    #rep_state{rep = #rep{doc_id = DocId}} ->
         State;
-    [{BaseId, {OtherDocId, false}}] ->
+    #rep_state{starting = false, rep = #rep{doc_id = OtherDocId}} ->
         ?LOG_INFO("The replication specified by the document `~s` was already"
             " triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
         State;
-    [{BaseId, {OtherDocId, true}}] ->
+    #rep_state{starting = true, rep = #rep{doc_id = OtherDocId}} ->
         ?LOG_INFO("The replication specified by the document `~s` is already"
             " being triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+        maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
         State
     end.
 
 
-maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
-    case get_value(<<"_replication_id">>, Props) of
+maybe_tag_rep_doc(DocId, {RepProps}, RepId) ->
+    case get_value(<<"_replication_id">>, RepProps) of
     RepId ->
         ok;
     _ ->
-        update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
+        update_rep_doc(DocId, [{<<"_replication_id">>, RepId}])
     end.
 
 
-start_replication(Server, #rep{id = RepId, doc = {RepProps}} = Rep, MaxRetries) ->
+start_replication(Server, #rep{id = RepId, doc_id = DocId} = Rep, MaxRetries) ->
     case (catch couch_replicator:async_replicate(Rep)) of
     {ok, _} ->
-        ok = gen_server:call(Server, {triggered, RepId}, infinity),
+        ok = gen_server:call(Server, {rep_started, RepId}, infinity),
         ?LOG_INFO("Document `~s` triggered replication `~s`",
-            [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]);
+            [DocId, pp_rep_id(RepId)]);
     Error ->
         keep_retrying(Server, Rep, Error, ?INITIAL_WAIT, MaxRetries)
     end.
 
 
 keep_retrying(Server, Rep, Error, _Wait, 0) ->
-    ok = gen_server:call(Server, {restart_failure, Rep, Error}, infinity);
+    ok = gen_server:call(Server, {rep_start_failure, Rep, Error}, infinity);
 
-keep_retrying(Server, #rep{doc = {RepProps}} = Rep, Error, Wait, RetriesLeft) ->
-    DocId = get_value(<<"_id">>, RepProps),
+keep_retrying(Server, #rep{doc_id = DocId} = Rep, Error, Wait, RetriesLeft) ->
     ?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. "
         "Retrying in ~p seconds", [pp_rep_id(Rep), DocId, Error, Wait]),
     ok = timer:sleep(Wait * 1000),
     case (catch couch_replicator:async_replicate(Rep)) of
     {ok, _} ->
-        ok = gen_server:call(Server, {triggered, Rep#rep.id}, infinity),
-        [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
+        ok = gen_server:call(Server, {rep_started, Rep#rep.id}, infinity),
+        #rep_state{max_retries = MaxRetries} = rep_state(Rep#rep.id),
         ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
-            [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]);
+            [DocId, pp_rep_id(Rep), MaxRetries - RetriesLeft + 1]);
     NewError ->
         keep_retrying(Server, Rep, NewError, Wait * 2, RetriesLeft - 1)
     end.
 
 
 rep_doc_deleted(DocId) ->
-    case stop_replication(DocId) of
-    {ok, RepId} ->
+    case ets:lookup(?DOC_TO_REP, DocId) of
+    [{DocId, RepId}] ->
+        couch_replicator:cancel_replication(RepId),
+        true = ets:delete(?REP_TO_STATE, RepId),
+        true = ets:delete(?DOC_TO_REP, DocId),
         ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
             " was deleted", [pp_rep_id(RepId), DocId]);
-    none ->
-        ok
-    end.
-
-
-replication_complete(DocId) ->
-    case stop_replication(DocId) of
-    {ok, RepId} ->
-        ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
-            [pp_rep_id(RepId), DocId]);
-    none ->
-        ok
-    end.
-
-
-stop_replication(DocId) ->
-    case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
-    [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] ->
-        couch_replicator:cancel_replication(RepId),
-        true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
-        true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
-        {ok, RepId};
     [] ->
-        none
+        ok
     end.
 
 
@@ -369,12 +413,73 @@ stop_all_replications() ->
     ?LOG_INFO("Stopping all ongoing replications because the replicator"
         " database was deleted or changed", []),
     ets:foldl(
-        fun({_, {RepId, _}}, _) ->
+        fun({_, RepId}, _) ->
             couch_replicator:cancel_replication(RepId)
         end,
-        ok, ?DOC_ID_TO_REP_ID),
-    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
-    true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
+        ok, ?DOC_TO_REP),
+    true = ets:delete_all_objects(?REP_TO_STATE),
+    true = ets:delete_all_objects(?DOC_TO_REP).
+
+
+update_rep_doc(RepDocId, KVs) ->
+    {ok, RepDb} = ensure_rep_db_exists(),
+    try
+        {ok, LatestRepDoc} = couch_db:open_doc(RepDb, RepDocId, []),
+        update_rep_doc(RepDb, LatestRepDoc, KVs)
+    catch throw:conflict ->
+        % Shouldn't happen, as by default only the role _replicator can
+        % update replication documents.
+        ?LOG_ERROR("Conflict error when updating replication document `~s`."
+            " Retrying.", [RepDocId]),
+        ok = timer:sleep(5),
+        update_rep_doc(RepDocId, KVs)
+    after
+        couch_db:close(RepDb)
+    end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+    NewRepDocBody = lists:foldl(
+        fun({<<"_replication_state">> = K, _V} = KV, Body) ->
+                Body1 = lists:keystore(K, 1, Body, KV),
+                {Mega, Secs, _} = erlang:now(),
+                UnixTime = Mega * 1000000 + Secs,
+                lists:keystore(
+                    <<"_replication_state_time">>, 1, Body1,
+                    {<<"_replication_state_time">>, UnixTime});
+            ({K, _V} = KV, Body) ->
+                lists:keystore(K, 1, Body, KV)
+        end,
+        RepDocBody, KVs),
+    % Might not succeed - when the replication doc is deleted right
+    % before this update (not an error, ignore).
+    couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []).
+
+
+ensure_rep_db_exists() ->
+    DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+    UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
+    case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}]) of
+    {ok, Db} ->
+        Db;
+    _Error ->
+        {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}])
+    end,
+    ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+    {ok, Db}.
+
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+    case couch_db:open_doc(RepDb, DDocID, []) of
+    {ok, _Doc} ->
+        ok;
+    _ ->
+        DDoc = couch_doc:from_json_obj({[
+            {<<"_id">>, DDocID},
+            {<<"language">>, <<"javascript">>},
+            {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+        ]}),
+        {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+     end.
 
 
 % pretty-print replication id
@@ -382,3 +487,18 @@ pp_rep_id(#rep{id = RepId}) ->
     pp_rep_id(RepId);
 pp_rep_id({Base, Extension}) ->
     Base ++ Extension.
+
+
+rep_state(RepId) ->
+    case ets:lookup(?REP_TO_STATE, RepId) of
+    [{RepId, RepState}] ->
+        RepState;
+    [] ->
+        nil
+    end.
+
+
+error_reason({error, Reason}) ->
+    Reason;
+error_reason(Reason) ->
+    Reason.

Modified: couchdb/trunk/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1079483&r1=1079482&r2=1079483&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Tue Mar  8 18:49:53 2011
@@ -35,7 +35,6 @@
 ]).
 
 -import(couch_replicator_utils, [
-    update_rep_doc/2,
     start_db_compaction_notifier/2,
     stop_db_compaction_notifier/1
 ]).
@@ -241,8 +240,6 @@ do_init(#rep{options = Options, id = {Ba
         end,
         lists:seq(1, CopiersCount)),
 
-    maybe_set_triggered(Rep),
-
     couch_task_status:add_task(
         "Replication",
          io_lib:format("`~s`: `~s` -> `~s`",
@@ -439,10 +436,9 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-terminate(normal, #rep_state{rep_details = #rep{id = RepId, doc = RepDoc},
+terminate(normal, #rep_state{rep_details = #rep{id = RepId},
     checkpoint_history = CheckpointHistory} = State) ->
     terminate_cleanup(State),
-    update_rep_doc(RepDoc, [{<<"_replication_state">>, <<"completed">>}]),
     couch_replication_notifier:notify({finished, RepId, CheckpointHistory});
 
 terminate(shutdown, State) ->
@@ -453,12 +449,11 @@ terminate(Reason, State) ->
     #rep_state{
         source_name = Source,
         target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId, doc = RepDoc}
+        rep_details = #rep{id = {BaseId, Ext} = RepId}
     } = State,
     ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
         [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
     terminate_cleanup(State),
-    update_rep_doc(RepDoc, [{<<"_replication_state">>, <<"error">>}]),
     couch_replication_notifier:notify({error, RepId, Reason}).
 
 
@@ -811,20 +806,6 @@ sum_stats([Stats1 | RestStats]) ->
         Stats1, RestStats).
 
 
-maybe_set_triggered(#rep{id = {BaseId, _}, doc = {RepProps} = RepDoc}) ->
-    case get_value(<<"_replication_state">>, RepProps) of
-    <<"triggered">> ->
-        ok;
-    _ ->
-        update_rep_doc(
-            RepDoc,
-            [
-                {<<"_replication_state">>, <<"triggered">>},
-                {<<"_replication_id">>, ?l2b(BaseId)}
-            ])
-    end.
-
-
 db_monitor(#db{} = Db) ->
     couch_db:monitor(Db);
 db_monitor(_HttpDb) ->

Modified: couchdb/trunk/src/couchdb/couch_replicator.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.hrl?rev=1079483&r1=1079482&r2=1079483&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.hrl Tue Mar  8 18:49:53 2011
@@ -18,7 +18,7 @@
     target,
     options,
     user_ctx,
-    doc
+    doc_id
 }).
 
 -record(rep_stats, {

Modified: couchdb/trunk/src/couchdb/couch_replicator_utils.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_utils.erl?rev=1079483&r1=1079482&r2=1079483&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_utils.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator_utils.erl Tue Mar  8 18:49:53 2011
@@ -13,8 +13,6 @@
 -module(couch_replicator_utils).
 
 -export([parse_rep_doc/2]).
--export([update_rep_doc/2]).
--export([ensure_rep_db_exists/0]).
 -export([open_db/1, close_db/1]).
 -export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
 -export([replication_id/2]).
@@ -22,7 +20,6 @@
 -include("couch_db.hrl").
 -include("couch_api_wrap.hrl").
 -include("couch_replicator.hrl").
--include("couch_js_functions.hrl").
 -include("../ibrowse/ibrowse.hrl").
 
 -import(couch_util, [
@@ -31,7 +28,7 @@
 ]).
 
 
-parse_rep_doc({Props} = RepObj, UserCtx) ->
+parse_rep_doc({Props}, UserCtx) ->
     ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
     Options = make_options(Props),
     Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
@@ -41,80 +38,11 @@ parse_rep_doc({Props} = RepObj, UserCtx)
         target = Target,
         options = Options,
         user_ctx = UserCtx,
-        doc = RepObj
+        doc_id = get_value(<<"_id">>, Props)
     },
     {ok, Rep#rep{id = replication_id(Rep)}}.
 
 
-update_rep_doc({Props} = _RepDoc, KVs) ->
-    case get_value(<<"_id">>, Props) of
-    undefined ->
-        ok;
-    RepDocId ->
-        {ok, RepDb} = ensure_rep_db_exists(),
-        case couch_db:open_doc(RepDb, RepDocId, []) of
-        {ok, LatestRepDoc} ->
-            update_rep_doc(RepDb, LatestRepDoc, KVs);
-        _ ->
-            ok
-        end,
-        couch_db:close(RepDb)
-    end.
-
-update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
-    NewRepDocBody = lists:foldl(
-        fun({<<"_replication_state">> = K, _V} = KV, Body) ->
-                Body1 = lists:keystore(K, 1, Body, KV),
-                {Mega, Secs, _} = erlang:now(),
-                UnixTime = Mega * 1000000 + Secs,
-                lists:keystore(
-                    <<"_replication_state_time">>, 1,
-                    Body1, {<<"_replication_state_time">>, UnixTime});
-            ({K, _V} = KV, Body) ->
-                lists:keystore(K, 1, Body, KV)
-        end,
-        RepDocBody,
-        KVs
-    ),
-    % might not succeed - when the replication doc is deleted right
-    % before this update (not an error)
-    couch_db:update_doc(
-        RepDb,
-        RepDoc#doc{body = {NewRepDocBody}},
-        []).
-
-
-ensure_rep_db_exists() ->
-    DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
-    Opts = [
-        {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
-        sys_db
-    ],
-    case couch_db:open(DbName, Opts) of
-    {ok, Db} ->
-        Db;
-    _Error ->
-        {ok, Db} = couch_db:create(DbName, Opts)
-    end,
-    ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
-    {ok, Db}.
-
-
-ensure_rep_ddoc_exists(RepDb, DDocID) ->
-    case couch_db:open_doc(RepDb, DDocID, []) of
-    {ok, _Doc} ->
-        ok;
-    _ ->
-        DDoc = couch_doc:from_json_obj({[
-            {<<"_id">>, DDocID},
-            {<<"language">>, <<"javascript">>},
-            {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
-        ]}),
-        {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
-    end,
-    ok.
-
-
 replication_id(#rep{options = Options} = Rep) ->
     BaseId = replication_id(Rep, ?REP_ID_VERSION),
     {BaseId, maybe_append_options([continuous, create_target], Options)}.



Mime
View raw message