couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [10/44] couch-replicator commit: updated refs/heads/63012-defensive to 1afa5ea
Date Tue, 07 Jun 2016 11:05:14 GMT
Remove extra process spawn and retry on error from rep. manager

 * Removed spawning an extra process to start replications.

 * Removed retrying on error (scheduler will handle that better).

Replication manager ended up with no state.

`rep.py` tools show replications still trigger and replicate data:

```
: rep.replicate_1_to_n_then_check_replication(1)
 > created  1 dbs with prefix rdyno_src_
 > created  1 dbs with prefix rdyno_tgt_
updating documents
 > _replicator rdyno_0001_0001 : 4-c28c7737ed9f472ad286c63f5652908a
waiting for replication documents to trigger
 > retrying function wait_to_trigger
 > function wait_to_trigger succeded after 10.030 +/- 10.0  sec.
all replication documents triggered
>>> update cycle 0  <<<
 > waiting for target  1
 > function wait_till_dbs_equal succeded after 0.022 +/- 1.0  sec.
 > waiting to propagate changes from  1 to 1  : 0.037 sec.
```


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/a1ef86dd
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/a1ef86dd
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/a1ef86dd

Branch: refs/heads/63012-defensive
Commit: a1ef86dde1313bc11a0b6c58873abc13c2e81412
Parents: 46362d3
Author: Nick Vatamaniuc <vatamane@gmail.com>
Authored: Tue May 17 19:28:48 2016 -0400
Committer: Nick Vatamaniuc <vatamane@gmail.com>
Committed: Wed May 18 10:58:16 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 151 +++++++---------------------------
 1 file changed, 32 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/a1ef86dd/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 63d625e..08d659e 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -49,9 +49,6 @@
 -define(REP_TO_STATE, couch_rep_id_to_rep_state).
 
 
--record(state, {
-    rep_start_pids = []
-}).
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -126,8 +123,7 @@ replication_error(#rep{id = RepId}, Error) ->
         ok;
     #rep{db_name = DbName, doc_id = DocId} ->
         % NV: TODO: later, perhaps don't update doc on each error
-        couch_replicator_docs:update_doc_error(DbName, DocId, RepId, Error),
-        ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
+        couch_replicator_docs:update_doc_error(DbName, DocId, RepId, Error)
     end.
 
 
@@ -153,123 +149,81 @@ continue(#rep{id = RepId}) ->
 
 
 init(_) ->
-    process_flag(trap_exit, true),
     ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, public]),
     ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, public]),
     couch_replicator_docs:ensure_rep_db_exists(),
-    {ok, #state{}}.
+    {ok, nil}.
 
 
 handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
-    NewState = try
-        process_update(State, DbName, Change)
+    try
+        process_update(DbName, Change)
     catch
     _Tag:Error ->
         {RepProps} = get_json_value(doc, ChangeProps),
         DocId = get_json_value(<<"_id">>, RepProps),
-        couch_replicator_docs:update_doc_process_error(DbName, DocId, Error),
-        State
+        couch_replicator_docs:update_doc_process_error(DbName, DocId, Error)
     end,
-    {reply, ok, NewState};
+    {reply, ok, State};
 
 
 handle_call({rep_complete, RepId}, _From, State) ->
     true = ets:delete(?REP_TO_STATE, RepId),
-    {reply, ok, State};
-
-handle_call({rep_error, RepId, Error}, _From, State) ->
-    {reply, ok, replication_error(State, RepId, Error)};
-
-handle_call(Msg, From, State) ->
-    couch_log:error("Replication manager received unexpected call ~p from ~p",
-        [Msg, From]),
-    {stop, {error, {unexpected_call, Msg}}, State}.
+    {reply, ok, State}.
 
 
 handle_cast(Msg, State) ->
     couch_log:error("Replication manager received unexpected cast ~p", [Msg]),
     {stop, {error, {unexpected_cast, Msg}}, State}.
 
-handle_info({'EXIT', From, Reason}, #state{rep_start_pids = Pids} = State) ->
-    case lists:keytake(From, 2, Pids) of
-        {value, {rep_start, From}, NewPids} ->
-            if Reason == normal -> ok; true ->
-                Fmt = "~s : Known replication pid ~w died :: ~w",
-                couch_log:error(Fmt, [?MODULE, From, Reason])
-            end,
-            {noreply, State#state{rep_start_pids = NewPids}};
-        false when Reason == normal ->
-            {noreply, State};
-        false ->
-            Fmt = "~s : Unknown pid ~w died :: ~w",
-            couch_log:error(Fmt, [?MODULE, From, Reason]),
-            {stop, {unexpected_exit, From, Reason}, State}
-    end;
-
-handle_info({'DOWN', _Ref, _, _, _}, State) ->
-    % From a db monitor created by a replication process. Ignore.
-    {noreply, State};
-
-handle_info(shutdown, State) ->
-    {stop, shutdown, State};
 
 handle_info(Msg, State) ->
     couch_log:error("Replication manager received unexpected message ~p", [Msg]),
     {stop, {unexpected_msg, Msg}, State}.
 
 
-terminate(_Reason, #state{rep_start_pids = StartPids}) ->
-    stop_all_replications(),
-    lists:foreach(
-        fun({_Tag, Pid}) ->
-            catch unlink(Pid),
-            catch exit(Pid, stop)
-        end,
-        StartPids),
-    true = ets:delete(?REP_TO_STATE),
-    true = ets:delete(?DOC_TO_REP).
+
+terminate(_Reason, _State) ->
+    ok.
 
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
--spec process_update(#state{}, binary(), tuple()) -> #state{}.
-process_update(State, DbName, {Change}) ->
+-spec process_update(binary(), tuple()) -> ok.
+process_update(DbName, {Change}) ->
     {RepProps} = JsonRepDoc = get_json_value(doc, Change),
     DocId = get_json_value(<<"_id">>, RepProps),
     OwnerRes = couch_replicator_clustering:owner(DbName, DocId),
     case {OwnerRes, get_json_value(deleted, Change, false)} of
     {_, true} ->
-        rep_doc_deleted(DbName, DocId),
-        State;
+        rep_doc_deleted(DbName, DocId);
     {{ok, Owner}, false} when Owner /= node() ->
-        couch_log:notice("Not starting '~s' as owner is ~s.", [DocId, Owner]),
-        State;
+        couch_log:notice("Not starting '~s' as owner is ~s.", [DocId, Owner]);
     {{error, unstable}, false} ->
-	couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]),
-	State;
+	couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
     {{ok,_Owner}, false} ->
         couch_log:notice("Maybe starting '~s' as I'm the owner", [DocId]),
         case get_json_value(<<"_replication_state">>, RepProps) of
         undefined ->
-            maybe_start_replication(State, DbName, DocId, JsonRepDoc);
+            maybe_start_replication(DbName, DocId, JsonRepDoc);
         <<"triggered">> ->
-            maybe_start_replication(State, DbName, DocId, JsonRepDoc);
+            maybe_start_replication(DbName, DocId, JsonRepDoc);
         <<"completed">> ->
-            replication_complete(DbName, DocId),
-            State;
+            replication_complete(DbName, DocId);
         <<"error">> ->
             case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
             [] ->
-                maybe_start_replication(State, DbName, DocId, JsonRepDoc);
+                maybe_start_replication(DbName, DocId, JsonRepDoc);
             _ ->
-                State
+                ok
             end
         end
-    end.
+    end,
+    ok.
 
--spec maybe_start_replication(#state{}, binary(), binary(), tuple()) -> #state{}.
-maybe_start_replication(State, DbName, DocId, RepDoc) ->
+-spec maybe_start_replication(binary(), binary(), tuple()) -> ok.
+maybe_start_replication(DbName, DocId, RepDoc) ->
     Rep0 = couch_replicator_docs:parse_rep_doc(RepDoc),
     #rep{id = {BaseId, _} = RepId} = Rep0,
     Rep = Rep0#rep{db_name = DbName},
@@ -279,18 +233,15 @@ maybe_start_replication(State, DbName, DocId, RepDoc) ->
         true = ets:insert(?DOC_TO_REP, {{DbName, DocId}, RepId}),
         couch_log:notice("Attempting to start replication `~s` (document `~s`).",
             [pp_rep_id(RepId), DocId]),
-        Pid = spawn_link(?MODULE, start_replication, [Rep]),
-        State#state{
-            rep_start_pids = [{rep_start, Pid} | State#state.rep_start_pids]
-        };
+        ok = start_replication(Rep);
     #rep{doc_id = DocId} ->
-        State;
+        ok;
     #rep{db_name = DbName, doc_id = OtherDocId} ->
         couch_log:notice("The replication specified by the document `~s` already started"
             " triggered by the document `~s`", [DocId, OtherDocId]),
-        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
-        State
-    end.
+        maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId))
+    end,
+    ok.
 
 -spec maybe_tag_rep_doc(binary(), binary(), tuple(), binary()) -> ok.
 maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
@@ -303,16 +254,11 @@ maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
 
 -spec start_replication(#rep{}) -> ok.
 start_replication(Rep) ->
-    % NV: TODO: Removed splay and back-off sleep on error. Instead to replace that
-    % temporarily add some random sleep here. To avoid repeated failed restarts in
-    % a loop if source doc is broken
-    timer:sleep(random:uniform(1000)),
-    case (catch couch_replicator_scheduler:add_job(Rep)) of
+    case couch_replicator_scheduler:add_job(Rep) of
     ok ->
         ok;
-    {error, Error} ->
-        couch_log:error("replicator scheduler add_job ~p failed: ~p", [Rep, Error]),
-        replication_error(Rep, Error)
+    {error, already_added} ->
+        couch_log:error("replicator scheduler add_job ~p was already added", [Rep])
     end.
 
 -spec replication_complete(binary(), binary()) -> ok.
@@ -339,39 +285,6 @@ rep_doc_deleted(DbName, DocId) ->
         ok
     end.
 
-replication_error(State, RepId, Error) ->
-    case rep_state(RepId) of
-    nil ->
-        State;
-    RepState ->
-        maybe_retry_replication(RepState, Error, State)
-    end.
-
--spec maybe_retry_replication(#rep{}, any(), #state{}) -> #state{}.
-maybe_retry_replication(#rep{id = RepId, doc_id = DocId} = Rep, Error, State) ->
-    ErrorBinary = couch_replicator_utils:rep_error_to_binary(Error),
-    couch_log:error("Error in replication `~s` (triggered by `~s`): ~s",
-        [pp_rep_id(RepId), DocId, ErrorBinary]),
-    % NV: TODO: Removed repeated failed restarts handling. Will do that some
-    % other way in scheduler code
-    Pid = spawn_link(?MODULE, start_replication, [Rep]),
-    State#state{
-        rep_start_pids = [{rep_start, Pid} | State#state.rep_start_pids]
-    }.
-
-
--spec stop_all_replications() -> ok.
-stop_all_replications() ->
-    couch_log:notice("Stopping all ongoing replications", []),
-    ets:foldl(
-        fun({_, RepId}, _) ->
-            couch_replicator_scheduler:remove_job(RepId)
-        end,
-        ok, ?DOC_TO_REP),
-    true = ets:delete_all_objects(?REP_TO_STATE),
-    true = ets:delete_all_objects(?DOC_TO_REP),
-    ok.
-
 
 -spec clean_up_replications(binary()) -> ok.
 clean_up_replications(DbName) ->


Mime
View raw message