couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [06/30] couch-replicator commit: updated refs/heads/63012-scheduler-dont-start-immediately to 6a913dc
Date Fri, 03 Jun 2016 15:17:30 GMT
Stitch everything together.

Use scheduler to spawn/kill replication jobs.

Replicator manager is now replicator_manager_sup supervisor with 3
children (rest_for_one supervision):
  Clustering : ownership and cluster configuration
  Replicator manager : handles replication starts
  Multi-db change feed: handle finding all replicator docs

Old replicator eunit tests (166) still pass but rep.py shows failure.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/6718e25a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/6718e25a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/6718e25a

Branch: refs/heads/63012-scheduler-dont-start-immediately
Commit: 6718e25a4b68b6e30cc151bd0c7a6c681e695afb
Parents: 5218edf
Author: Nick Vatamaniuc <vatamane@gmail.com>
Authored: Mon May 16 04:35:39 2016 -0400
Committer: Nick Vatamaniuc <vatamane@gmail.com>
Committed: Mon May 16 04:35:39 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator_clustering.erl  |  63 ++++++++++---
 src/couch_replicator_manager.erl     | 147 +++++++++++-------------------
 src/couch_replicator_manager_sup.erl |  56 ++++++++++++
 src/couch_replicator_sup.erl         |  12 +--
 4 files changed, 161 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/6718e25a/src/couch_replicator_clustering.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_clustering.erl b/src/couch_replicator_clustering.erl
index 8c761d7..edb00d6 100644
--- a/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator_clustering.erl
@@ -31,7 +31,8 @@
 
 -record(state, {
     last_change :: erlang:timestamp(),
-    quiet_period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer()
+    quiet_period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(),
+    timer :: timer:tref()
 }).
 
 
@@ -50,7 +51,7 @@ start_link() ->
 % been changing recently. Recency is a configuration parameter.
 %
 -spec owner(Dbname :: binary(), DocId :: binary() | null) ->
-    {ok, node()} | {error, no_owner} | {error, unstable}.
+    {ok, node()} | {ok, no_owner} | {error, unstable}.
 owner(_DbName, null) ->
     {ok, no_owner};
 owner(<<"shards/", _/binary>> = DbName, DocId) ->
@@ -92,19 +93,18 @@ owner(Key) ->
 init([]) ->
     net_kernel:monitor_nodes(true),
     ok = config:listen_for_changes(?MODULE, self()),
-    couch_log:debug("Initialized clustering gen_server ~w",[self()]),
-    {ok, #state{last_change = os:timestamp()}}.
+    Interval = config:get_integer("replicator", "cluster_quiet_period", 
+        ?DEFAULT_QUIET_PERIOD),
+    couch_log:debug("Initialized clustering gen_server ~w", [self()]),
+    {ok, #state{
+        last_change = os:timestamp(),
+        quiet_period = Interval,
+        timer = new_timer(Interval)
+    }}.
 
 
 handle_call(is_stable, _From, State) ->
-    % os:timestamp() results are not guaranteed to be monotonic
-    Sec = case timer:now_diff(os:timestamp(), State#state.last_change) of
-              USec when USec < 0 ->
-                  0;
-              USec when USec >= 0 ->
-                  USec / 1000000
-          end,
-    {reply, Sec > State#state.quiet_period, State}.
+    {reply, is_stable(State), State}.
 
 
 handle_cast({set_quiet_period, QuietPeriod}, State) when
@@ -113,11 +113,25 @@ handle_cast({set_quiet_period, QuietPeriod}, State) when
 
 
 handle_info({nodeup, _Node}, State) ->
-    {noreply, State#state{last_change = os:timestamp()}};
+    Ts = os:timestamp(),
+    Timer = new_timer(State#state.quiet_period),
+    {noreply, State#state{last_change = Ts, timer = Timer}};
 
 handle_info({nodedown, _Node}, State) ->
-    {noreply, State#state{last_change = os:timestamp()}}.
-
+    Ts = os:timestamp(),
+    Timer = new_timer(State#state.quiet_period),
+    {noreply, State#state{last_change = Ts, timer = Timer}};
+
+handle_info(rescan_check, State) ->
+   timer:cancel(State#state.timer),
+   case is_stable(State) of
+       true ->
+	   trigger_rescan(),
+           {noreply, State};
+       false ->
+	   Timer = new_timer(State#state.quiet_period),
+	   {noreply, State#state{timer = Timer}}
+   end.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -129,6 +143,25 @@ terminate(_Reason, _State) ->
 
 %% Internal functions
 
+new_timer(Interval) ->
+    {ok, Timer} = timer:send_after(Interval, rescan_check),
+    Timer.
+
+trigger_rescan() ->
+    couch_log:notice("Triggering replicator rescan from ~p", [?MODULE]),
+    couch_replicator_manager_sup:restart_mdb_listener(),
+    ok.
+
+is_stable(State) ->
+    % os:timestamp() results are not guaranteed to be monotonic
+    Sec = case timer:now_diff(os:timestamp(), State#state.last_change) of
+        USec when USec < 0 ->
+            0;
+        USec when USec >= 0 ->
+             USec / 1000000
+    end,
+    Sec > State#state.quiet_period.
+
 
 handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
     ok = gen_server:cast(S, {set_quiet_period, list_to_integer(V)}),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/6718e25a/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 37bbb82..710edc5 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -50,15 +50,34 @@
 
 
 -record(state, {
-    mdb_listener = nil,
-    rep_start_pids = [],
-    live = []
+    rep_start_pids = []
 }).
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
+
+%%%%%% Multidb changes callbacks
+
+db_created(DbName, Server) ->
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    Server.
+
+db_deleted(DbName, Server) ->
+    clean_up_replications(DbName),
+    Server.
+
+db_found(DbName, Server) ->
+    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+    Server.
+
+db_change(DbName, Change, Server) ->
+    ok = gen_server:call(Server, {rep_db_update, DbName, Change}, infinity),
+    Server.
+
+
+
 replication_started(#rep{id = RepId}) ->
     case rep_state(RepId) of
     nil ->
@@ -101,7 +120,7 @@ replication_error(#rep{id = RepId}, Error) ->
     nil ->
         ok;
     #rep{db_name = DbName, doc_id = DocId} ->
-        % NV: TODO: We might want to do something else instead of update doc on every error
+        % NV: TODO: later, perhaps don't update doc on each error
          couch_replicator_docs:update_doc_error(DbName, DocId, RepId, Error),
         ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
     end.
@@ -111,28 +130,28 @@ replication_error(#rep{id = RepId}, Error) ->
 continue(#rep{doc_id = null}) ->
     {true, no_owner};
 continue(#rep{id = RepId}) ->
-    Owner = gen_server:call(?MODULE, {owner, RepId}, infinity),
-    {node() == Owner, Owner}.
+    case rep_state(RepId) of
+    nil ->
+        {false, nonode};
+    #rep{db_name = DbName, doc_id = DocId} ->
+	case couch_replicator_clustering:owner(DbName, DocId) of
+        {ok, no_owner} ->
+	    {true, no_owner};
+	{ok, Owner} ->
+	    {node() == Owner, Owner};
+	{error, unstable} ->
+	    {true, unstable}
+        end
+    end.
 
 
 init(_) ->
     process_flag(trap_exit, true),
-    net_kernel:monitor_nodes(true),
-    Live = [node() | nodes()],
     ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, public]),
     ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, public]),
     couch_replicator_docs:ensure_rep_db_exists(),
-    {ok, #state{mdb_listener = start_mdb_listener(), live = Live}}.
+    {ok, #state{}}.
 
-% NV: TODO: Use new cluster membership module here. Possible return value
-% could be 'unstable' in which case should keep old owner + possibly logging.
-handle_call({owner, RepId}, _From, State) ->
-    case rep_state(RepId) of
-    nil ->
-        {reply, nonode, State};
-    #rep{db_name = DbName, doc_id = DocId} ->
-        {reply, owner(DbName, DocId, State#state.live), State}
-    end;
 
 handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
     NewState = try
@@ -164,22 +183,6 @@ handle_cast(Msg, State) ->
     couch_log:error("Replication manager received unexpected cast ~p", [Msg]),
     {stop, {error, {unexpected_cast, Msg}}, State}.
 
-% NV: TODO: Remove when switching to new cluster membership module
-handle_info({nodeup, Node}, State) ->
-    couch_log:notice("Rescanning replicator dbs as ~s came up.", [Node]),
-    Live = lists:usort([Node | State#state.live]),
-    {noreply, rescan(State#state{live=Live})};
-
-% NV: TODO: Remove when switching to new cluster membership module
-handle_info({nodedown, Node}, State) ->
-    couch_log:notice("Rescanning replicator dbs ~s went down.", [Node]),
-    Live = State#state.live -- [Node],
-    {noreply, rescan(State#state{live=Live})};
-
-handle_info({'EXIT', From, Reason}, #state{mdb_listener = From} = State) ->
-    couch_log:error("Database update notifier died. Reason: ~p", [Reason]),
-    {stop, {db_update_notifier_died, Reason}, State};
-
 handle_info({'EXIT', From, Reason}, #state{rep_start_pids = Pids} = State) ->
     case lists:keytake(From, 2, Pids) of
         {value, {rep_start, From}, NewPids} ->
@@ -208,11 +211,7 @@ handle_info(Msg, State) ->
     {stop, {unexpected_msg, Msg}, State}.
 
 
-terminate(_Reason, State) ->
-    #state{
-        rep_start_pids = StartPids,
-        mdb_listener = Listener
-    } = State,
+terminate(_Reason, #state{rep_start_pids = StartPids}) ->
     stop_all_replications(),
     lists:foreach(
         fun({_Tag, Pid}) ->
@@ -221,63 +220,28 @@ terminate(_Reason, State) ->
         end,
         StartPids),
     true = ets:delete(?REP_TO_STATE),
-    true = ets:delete(?DOC_TO_REP),
-    catch unlink(Listener),
-    catch exit(Listener).
+    true = ets:delete(?DOC_TO_REP).
 
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-start_mdb_listener() ->
-    {ok, Pid} = couch_multidb_changes:start_link(
-        <<"_replicator">>, ?MODULE, self(), [skip_ddocs]),
-    Pid.
-
-
-%%%%%% multidb changes callbacks
-
-db_created(DbName, Server) ->
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
-
-db_deleted(DbName, Server) ->
-    clean_up_replications(DbName),
-    Server.
-
-db_found(DbName, Server) ->
-    couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
-    Server.
-
-db_change(DbName, Change, Server) ->
-    ok = gen_server:call(Server, {rep_db_update, DbName, Change}, infinity),
-    Server.
-
-
-% NV: TODO: This will change when using the new clustering module.
-% Rescan should happend when cluster membership changes, clustering module
-% handles with an appropriate back-off. mdb_listener should probably live
-% in a supervisor and that supervisor should be asked to restart it.
-rescan(#state{mdb_listener = nil} = State) ->
-    State#state{mdb_listener = start_mdb_listener()};
-rescan(#state{mdb_listener = MPid} = State) ->
-    unlink(MPid),
-    exit(MPid, exit),
-    rescan(State#state{mdb_listener = nil}).
-
-
 process_update(State, DbName, {Change}) ->
     {RepProps} = JsonRepDoc = get_json_value(doc, Change),
     DocId = get_json_value(<<"_id">>, RepProps),
-    case {owner(DbName, DocId, State#state.live), get_json_value(deleted, Change, false)}
of
+    Owner = couch_replicator_clustering:owner(DbName, DocId),
+    case {Owner, get_json_value(deleted, Change, false)} of
     {_, true} ->
         rep_doc_deleted(DbName, DocId),
         State;
-    {Owner, false} when Owner /= node() ->
+    {{ok, Owner}, false} when Owner /= node() ->
         couch_log:notice("Not starting '~s' as owner is ~s.", [DocId, Owner]),
         State;
-    {_Owner, false} ->
+    {{error, unstable}, false} ->
+	couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]),
+	State;
+    {{ok,_Owner}, false} ->
         couch_log:notice("Maybe starting '~s' as I'm the owner", [DocId]),
         case get_json_value(<<"_replication_state">>, RepProps) of
         undefined ->
@@ -298,15 +262,6 @@ process_update(State, DbName, {Change}) ->
     end.
 
 
-owner(<<"shards/", _/binary>> = DbName, DocId, Live) ->
-    Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(mem3:dbname(DbName), DocId),
-			     lists:member(N, Live)]),
-    hd(mem3_util:rotate_list({DbName, DocId}, Nodes));
-owner(_DbName, _DocId, _Live) ->
-    node().
-
-
-
 maybe_start_replication(State, DbName, DocId, RepDoc) ->
     Rep0 = couch_replicator_docs:parse_rep_doc(RepDoc),
     #rep{id = {BaseId, _} = RepId} = Rep0,
@@ -344,7 +299,7 @@ start_replication(Rep) ->
     % temporarily add some random sleep here. To avoid repeated failed restarts in
     % a loop if source doc is broken
     timer:sleep(random:uniform(1000)),
-    case (catch couch_replicator:async_replicate(Rep)) of
+    case (catch couch_replicator_scheduler:add_job(Rep)) of
     {ok, _} ->
         ok;
     Error ->
@@ -362,7 +317,7 @@ replication_complete(DbName, DocId) ->
 rep_doc_deleted(DbName, DocId) ->
     case ets:lookup(?DOC_TO_REP, {DbName, DocId}) of
     [{{DbName, DocId}, RepId}] ->
-        couch_replicator:cancel_replication(RepId),
+        couch_replicator_scheduler:remove_job(RepId),
         true = ets:delete(?REP_TO_STATE, RepId),
         true = ets:delete(?DOC_TO_REP, {DbName, DocId}),
         couch_log:notice("Stopped replication `~s` because replication document `~s`"
@@ -396,7 +351,7 @@ stop_all_replications() ->
     couch_log:notice("Stopping all ongoing replications", []),
     ets:foldl(
         fun({_, RepId}, _) ->
-            couch_replicator:cancel_replication(RepId)
+            couch_replicator_scheduler:remove_job(RepId)
         end,
         ok, ?DOC_TO_REP),
     true = ets:delete_all_objects(?REP_TO_STATE),
@@ -406,7 +361,7 @@ stop_all_replications() ->
 clean_up_replications(DbName) ->
     ets:foldl(
         fun({{Name, DocId}, RepId}, _) when Name =:= DbName ->
-            couch_replicator:cancel_replication(RepId),
+            couch_replicator_scheduler:remove_job(RepId),
             ets:delete(?DOC_TO_REP,{Name, DocId}),
             ets:delete(?REP_TO_STATE, RepId);
            ({_,_}, _) ->
@@ -438,4 +393,4 @@ before_doc_update(Doc, Db) ->
     couch_replicator_docs:before_doc_update(Doc, Db).
 
 after_doc_read(Doc, Db) ->
-    couch_replicator_doc:after_doc_read(Doc, Db).
+    couch_replicator_docs:after_doc_read(Doc, Db).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/6718e25a/src/couch_replicator_manager_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager_sup.erl b/src/couch_replicator_manager_sup.erl
new file mode 100644
index 0000000..6564962
--- /dev/null
+++ b/src/couch_replicator_manager_sup.erl
@@ -0,0 +1,56 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_manager_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1]).
+-export([restart_mdb_listener/0]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init(_Args) ->
+    MdbChangesArgs = [
+        <<"_replicator">>,        % DbSuffix
+	couch_replicator_manager, % Module
+	couch_replicator_manager, % Context(Server)
+        [skip_ddocs]              % Options
+    ],
+    Children = [
+        {couch_replicator_clustering,
+            {couch_replicator_clustering, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_clustering]},
+        {couch_replicator_manager,
+            {couch_replicator_manager, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_manager]},
+        {couch_multidb_changes,
+            {couch_multidb_changes, start_link, MdbChangesArgs},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_multidb_changes]}
+    ],
+    {ok, {{rest_for_one,10,1}, Children}}.
+
+
+restart_mdb_listener() ->
+    ok = supervisor:terminate_child(?MODULE, couch_multidb_changes),
+    {ok, ChildPid} = supervisor:restart_child(?MODULE, couch_multidb_changes),
+    ChildPid.
+

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/6718e25a/src/couch_replicator_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_sup.erl b/src/couch_replicator_sup.erl
index b744a74..e67f20b 100644
--- a/src/couch_replicator_sup.erl
+++ b/src/couch_replicator_sup.erl
@@ -38,12 +38,12 @@ init(_Args) ->
             brutal_kill,
             worker,
             dynamic},
-        {couch_replicator_manager,
-            {couch_replicator_manager, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_manager]},
+        {couch_replicator_manager_sup,
+	    {couch_replicator_manager_sup, start_link, []},
+	    permanent,
+            infinity,
+	    supervisor,
+            [couch_replicator_manager_sup]},
         {couch_replicator_job_sup,
             {couch_replicator_job_sup, start_link, []},
             permanent,


Mime
View raw message