couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [16/44] couch-replicator commit: updated refs/heads/63012-defensive to 1afa5ea
Date Tue, 07 Jun 2016 11:05:20 GMT
Improve clustering module & rescan handling

Now there is a grace period during startup where nodes are expected to be added
to the cluster. During that period a shorter quiet time applies (5 seconds by
default). This allow replicator change feeds to start quicker, instead of
waiting for more than a minute.

Instead of directly manipulating couch_multidb_changes process from the
supervisor. Add a separate process (couch_replicator_db_change) which handles
rescannig in a cleaner way.

Rescanning is triggered by an event from clustering module. Events are sent
through the `replication` gen_event (`couch_replicator_notifier` module).

In previous commit forgot to actually create the local `_replicator` db. This
commit fixes that.

Also added more dialyzer types.

Eunit tests (171 still pass). Replications checked via `rep` script still work.

```
In [1]: rep.replicate_1_to_n_then_check_replication(100)
creating rdyno_src_0001
...
 > function wait_to_trigger succeded after 10.509 +/- 10.0  sec.
all replication documents triggered
>>> update cycle 0  <<<
 > waiting for target  1
 > retrying function wait_till_dbs_equal
 > function wait_till_dbs_equal succeded after 6.414 +/- 1.0  sec.
...
> waiting to propagate changes from  1 to 100  : 48.636 sec.
 ```


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/c5ff4e6f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/c5ff4e6f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/c5ff4e6f

Branch: refs/heads/63012-defensive
Commit: c5ff4e6fff8e56baca96f3eb623b9f3179aaab6e
Parents: 6746aad
Author: Nick Vatamaniuc <vatamane@gmail.com>
Authored: Fri May 27 11:44:14 2016 -0400
Committer: Nick Vatamaniuc <vatamane@gmail.com>
Committed: Fri May 27 11:44:14 2016 -0400

----------------------------------------------------------------------
 src/couch_multidb_changes.erl       |   2 +-
 src/couch_replicator.erl            |   2 +
 src/couch_replicator_clustering.erl | 125 ++++++++++++++++++-------------
 src/couch_replicator_db_changes.erl | 102 +++++++++++++++++++++++++
 src/couch_replicator_sup.erl        |  32 +++-----
 5 files changed, 187 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c5ff4e6f/src/couch_multidb_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_multidb_changes.erl b/src/couch_multidb_changes.erl
index 16e12d4..e543c9b 100644
--- a/src/couch_multidb_changes.erl
+++ b/src/couch_multidb_changes.erl
@@ -145,7 +145,7 @@ handle_cast({resume_scan, DbName}, #state{pids=Pids, tid=Ets} = State)
->
 
 
 handle_info({'EXIT', From, normal}, #state{scanner = From} = State) ->
-    couch_log:info("multidb_changes ~p scanner pid exited ~p",[State#state.suffix, From]),
+    couch_log:debug("multidb_changes ~p scanner pid exited ~p",[State#state.suffix, From]),
     {noreply, State#state{scanner=nil}};
 
 handle_info({'EXIT', From, Reason}, #state{scanner = From} = State) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c5ff4e6f/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 82eff26..a72ba00 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -50,7 +50,9 @@ replicate(PostBody, Ctx) ->
 
 % This is called from supervisor. Must respect supervisor protocol so
 % it returns `ignore`.
+-spec ensure_rep_db_exists() -> ignore.
 ensure_rep_db_exists() ->
+    {ok, _Db} = couch_replicator_docs:ensure_rep_db_exists(),
     couch_log:notice("~p : created local _replicator database", [?MODULE]),
     ignore.
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c5ff4e6f/src/couch_replicator_clustering.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_clustering.erl b/src/couch_replicator_clustering.erl
index 6c8cb1f..08be9db 100644
--- a/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator_clustering.erl
@@ -10,12 +10,26 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
+
+% Maintain cluster membership and stability notifications for replications.
+% On changes to cluster membership, broadcast events to `replication` gen_event.
+% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
+%
+% Cluster stability is defined as "there have been no nodes added or removed in
+% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
+% speedier startup, during initialization there is a shorter StartupQuietPeriod in
+% effect (also configurable).
+%
+% This module is also in charge of calculating ownership of replications based on
+% where their _repicator db documents shards live.
+
+
 -module(couch_replicator_clustering).
 -behaviour(gen_server).
 -behaviour(config_listener).
 
 % public API
--export([start_link/0, owner/2, owner/1]).
+-export([start_link/0, owner/2, is_stable/0]).
 
 % gen_server callbacks
 -export([init/1, handle_call/3, handle_info/2, handle_cast/2,
@@ -28,10 +42,13 @@
 -include_lib("mem3/include/mem3.hrl").
 
 -define(DEFAULT_QUIET_PERIOD, 60). % seconds
+-define(DEFAULT_START_PERIOD, 5). % seconds
 
 -record(state, {
+    start_time :: erlang:timestamp(),
     last_change :: erlang:timestamp(),
-    quiet_period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(),
+    period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(),
+    start_period = ?DEFAULT_START_PERIOD :: non_neg_integer(),
     timer :: timer:tref()
 }).
 
@@ -51,8 +68,7 @@ start_link() ->
 owner(_DbName, null) ->
     no_owner;
 owner(<<"shards/", _/binary>> = DbName, DocId) ->
-    IsStable = gen_server:call(?MODULE, is_stable, infinity),
-    case IsStable of
+    case is_stable() of
         false ->
             unstable;
         true ->
@@ -62,40 +78,27 @@ owner(_DbName, _DocId) ->
     node().
 
 
-
-% owner/1 function computes ownership based on the single
-% input Key parameter. It will uniformly distribute this Key
-% across the list of current live nodes in the cluster without
-% regard to shard ownership.
-%
-% Originally this function was used in chttpd for replications
-% coming from _replicate endpoint. It was called choose_node
-% and was called like this:
-%  choose_node([couch_util:get_value(<<"source">>, Props),
-%               couch_util:get_value(<<"target">>, Props)])
-%
--spec owner(term()) -> node().
-owner(Key) when is_binary(Key) ->
-    Checksum = erlang:crc32(Key),
-    Nodes = lists:sort([node() | nodes()]),
-    lists:nth(1 + Checksum rem length(Nodes), Nodes);
-owner(Key) ->
-    owner(term_to_binary(Key)).
+-spec is_stable() -> true | false.
+is_stable() ->
+    gen_server:call(?MODULE, is_stable, infinity).
 
 
 % gen_server callbacks
 
-
 init([]) ->
     net_kernel:monitor_nodes(true),
     ok = config:listen_for_changes(?MODULE, self()),
-    Interval = config:get_integer("replicator", "cluster_quiet_period", 
-        ?DEFAULT_QUIET_PERIOD),
+    Period = abs(config:get_integer("replicator", "cluster_quiet_period",
+        ?DEFAULT_QUIET_PERIOD)),
+    StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
+        ?DEFAULT_START_PERIOD)),
     couch_log:debug("Initialized clustering gen_server ~w", [self()]),
     {ok, #state{
+        start_time = os:timestamp(),
         last_change = os:timestamp(),
-        quiet_period = Interval,
-        timer = new_timer(Interval)
+        period = Period,
+        start_period = StartPeriod,
+        timer = new_timer(StartPeriod)
     }}.
 
 
@@ -103,29 +106,32 @@ handle_call(is_stable, _From, State) ->
     {reply, is_stable(State), State}.
 
 
-handle_cast({set_quiet_period, QuietPeriod}, State) when
+handle_cast({set_period, QuietPeriod}, State) when
     is_integer(QuietPeriod), QuietPeriod > 0 ->
-    {noreply, State#state{quiet_period = QuietPeriod}}.
+    {noreply, State#state{period = QuietPeriod}}.
 
 
-handle_info({nodeup, _Node}, State) ->
-    Ts = os:timestamp(),
-    Timer = new_timer(State#state.quiet_period),
-    {noreply, State#state{last_change = Ts, timer = Timer}};
+handle_info({nodeup, Node}, State) ->
+    Timer = new_timer(interval(State)),
+    couch_replicator_notifier:notify({cluster, unstable}),
+    couch_log:notice("~s : nodeup ~s, cluster unstable", [?MODULE, Node]),
+    {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
 
-handle_info({nodedown, _Node}, State) ->
-    Ts = os:timestamp(),
-    Timer = new_timer(State#state.quiet_period),
-    {noreply, State#state{last_change = Ts, timer = Timer}};
+handle_info({nodedown, Node}, State) ->
+    Timer = new_timer(interval(State)),
+    couch_replicator_notifier:notify({cluster, unstable}),
+    couch_log:notice("~s : nodedown ~s, cluster unstable", [?MODULE, Node]),
+    {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
 
-handle_info(rescan_check, State) ->
+handle_info(stability_check, State) ->
    timer:cancel(State#state.timer),
    case is_stable(State) of
        true ->
-	   trigger_rescan(),
+	   couch_replicator_notifier:notify({cluster, stable}),
+           couch_log:notice("~s : publishing cluster `stable` event", [?MODULE]),
            {noreply, State};
        false ->
-	   Timer = new_timer(State#state.quiet_period),
+	   Timer = new_timer(interval(State)),
 	   {noreply, State#state{timer = Timer}}
    end.
 
@@ -139,28 +145,41 @@ terminate(_Reason, _State) ->
 
 %% Internal functions
 
+-spec new_timer(non_neg_integer()) -> timer:tref().
 new_timer(Interval) ->
-    {ok, Timer} = timer:send_after(Interval, rescan_check),
+    {ok, Timer} = timer:send_after(Interval, stability_check),
     Timer.
 
-trigger_rescan() ->
-    couch_log:notice("Triggering replicator rescan from ~p", [?MODULE]),
-    couch_replicator_sup:restart_mdb_listener(),
-    ok.
 
-is_stable(State) ->
-    % os:timestamp() results are not guaranteed to be monotonic
-    Sec = case timer:now_diff(os:timestamp(), State#state.last_change) of
+-spec interval(#state{}) -> non_neg_integer().
+interval(#state{period = Period, start_period = Period0, start_time = T0}) ->
+    case now_diff_sec(T0) > Period of
+        true ->
+            % Normal operation
+            Period;
+        false ->
+            % During startup
+            Period0
+    end.
+
+
+-spec is_stable(#state{}) -> boolean().
+is_stable(#state{period = Period, last_change = TS} = State) ->
+    now_diff_sec(TS) > interval(State).
+
+
+-spec now_diff_sec(erlang:timestamp()) -> non_neg_integer().
+now_diff_sec(Time) ->
+    case timer:now_diff(os:timestamp(), Time) of
         USec when USec < 0 ->
             0;
         USec when USec >= 0 ->
              USec / 1000000
-    end,
-    Sec > State#state.quiet_period.
+    end.
 
 
 handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
-    ok = gen_server:cast(S, {set_quiet_period, list_to_integer(V)}),
+    ok = gen_server:cast(S, {set_period, list_to_integer(V)}),
     {ok, S};
 handle_config_change(_, _, _, _, S) ->
     {ok, S}.
@@ -173,7 +192,7 @@ handle_config_terminate(Self, _, _) ->
         config:listen_for_changes(?MODULE, Self)
     end).
 
-
+-spec owner_int(binary(), binary()) -> node().
 owner_int(DbName, DocId) ->
     Live = [node() | nodes()],
     Nodes = [N || #shard{node=N} <- mem3:shards(mem3:dbname(DbName), DocId),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c5ff4e6f/src/couch_replicator_db_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_db_changes.erl b/src/couch_replicator_db_changes.erl
new file mode 100644
index 0000000..bc91697
--- /dev/null
+++ b/src/couch_replicator_db_changes.erl
@@ -0,0 +1,102 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_db_changes).
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+
+-record(state, {
+   event_listener :: pid(),
+   mdb_changes :: pid() | nil
+}).
+
+
+-spec start_link() ->
+    {ok, pid()} | ignore | {error, any()}.
+start_link() ->
+    gen_server:start_link(?MODULE, [], []).
+
+
+init([]) ->
+    EvtPid = start_link_cluster_event_listener(),
+    State = #state{event_listener = EvtPid, mdb_changes = nil},
+    case couch_replicator_clustering:is_stable() of
+        true ->
+            {ok, restart_mdb_changes(State)};
+        false ->
+            {ok, State}
+    end.
+
+
+handle_call(_Msg, _From, State) ->
+    {noreply, State}.
+
+
+handle_cast({cluster, unstable}, State) ->
+    {noreply, stop_mdb_changes(State)};
+
+handle_cast({cluster, stable}, State) ->
+    {noreply, restart_mdb_changes(State)}.
+
+
+handle_info(_Msg, State) ->
+    {noreply, State}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+-spec restart_mdb_changes(#state{}) -> #state{}.
+restart_mdb_changes(#state{mdb_changes = nil} = State) ->
+    Suffix = <<"_replicator">>,
+    CallbackMod = couch_replicator_doc_processor,
+    Options = [skip_ddocs],
+    {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil, Options),
+    couch_log:notice("Started replicator db changes listener ~p", [Pid]),
+    State#state{mdb_changes = Pid};
+
+restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
+    restart_mdb_changes(stop_mdb_changes(State)).
+
+
+-spec stop_mdb_changes(#state{}) -> #state{}.
+stop_mdb_changes(#state{mdb_changes = nil} = State) ->
+    State;
+
+stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
+    couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
+    unlink(Pid),
+    exit(Pid, kill),
+    State#state{mdb_changes = nil}.
+
+
+-spec start_link_cluster_event_listener() -> pid().
+start_link_cluster_event_listener() ->
+    Server = self(),
+    CallbackFun =
+        fun(Event = {cluster, _}) -> gen_server:cast(Server, Event);
+           (_) -> ok
+        end,
+    {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
+    Pid.
+
+

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c5ff4e6f/src/couch_replicator_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_sup.erl b/src/couch_replicator_sup.erl
index 6510604..5c5c709 100644
--- a/src/couch_replicator_sup.erl
+++ b/src/couch_replicator_sup.erl
@@ -13,31 +13,25 @@
 
 -module(couch_replicator_sup).
 -behaviour(supervisor).
--export([start_link/0, init/1, restart_mdb_listener/0]).
+-export([start_link/0, init/1]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init(_Args) ->
-    MdbChangesArgs = [
-        <<"_replicator">>,               % DbSuffix
-        couch_replicator_doc_processor,  % Module
-        nil,                             % Callback context
-        [skip_ddocs]                     % Options
-    ],
     Children = [
-       {couch_replicator_clustering,
-            {couch_replicator_clustering, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_clustering]},
         {couch_replication_event,
             {gen_event, start_link, [{local, couch_replication}]},
             permanent,
             brutal_kill,
             worker,
             dynamic},
+       {couch_replicator_clustering,
+            {couch_replicator_clustering, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_replicator_clustering]},
         {couch_replicator_scheduler_sup,
             {couch_replicator_scheduler_sup, start_link, []},
             permanent,
@@ -51,7 +45,7 @@ init(_Args) ->
             worker,
             [couch_replicator_scheduler]},
         {couch_replicator,
-            % This is simple function call which does not create a process
+            % This is a simple function call which does not create a process
             % but returns `ignore`. It is used to make sure each node
             % a local `_replicator` database.
             {couch_replicator, ensure_rep_db_exists, []},
@@ -59,17 +53,11 @@ init(_Args) ->
             brutal_kill,
             worker,
             [couch_replicator]},
-        {couch_multidb_changes,
-            {couch_multidb_changes, start_link, MdbChangesArgs},
+        {couch_replicator_db_changes,
+            {couch_replicator_db_changes, start_link, []},
             permanent,
             brutal_kill,
             worker,
             [couch_multidb_changes]}
     ],
     {ok, {{rest_for_one,10,1}, Children}}.
-
-
-restart_mdb_listener() ->
-    ok = supervisor:terminate_child(?MODULE, couch_multidb_changes),
-    {ok, ChildPid} = supervisor:restart_child(?MODULE, couch_multidb_changes),
-    ChildPid.


Mime
View raw message