Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BDC8D200C76 for ; Fri, 28 Apr 2017 23:36:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BB3E5160BB8; Fri, 28 Apr 2017 21:36:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E6803160BBD for ; Fri, 28 Apr 2017 23:36:03 +0200 (CEST) Received: (qmail 36399 invoked by uid 500); 28 Apr 2017 21:36:03 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 36384 invoked by uid 99); 28 Apr 2017 21:36:03 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Apr 2017 21:36:03 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 2D02F8183D; Fri, 28 Apr 2017 21:36:00 +0000 (UTC) Date: Fri, 28 Apr 2017 21:36:02 +0000 To: "commits@couchdb.apache.org" Subject: [couchdb] 02/09: Cluster ownership module implementation MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: vatamane@apache.org Reply-To: "commits@couchdb.apache.org" In-Reply-To: <149341536059.24892.9528088849326229128@gitbox.apache.org> References: <149341536059.24892.9528088849326229128@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: couchdb X-Git-Refname: refs/heads/63012-scheduler X-Git-Reftype: branch X-Git-Rev: 9895a734f1054eba2fe0cf582574adf28fdec369 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.3.dev Auto-Submitted: auto-generated Message-Id: <20170428213601.2D02F8183D@gitbox.apache.org> archived-at: Fri, 28 Apr 2017 21:36:04 -0000 This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch 63012-scheduler in repository https://gitbox.apache.org/repos/asf/couchdb.git commit 9895a734f1054eba2fe0cf582574adf28fdec369 Author: Nick Vatamaniuc AuthorDate: Wed Apr 20 15:42:29 2016 -0400 Cluster ownership module implementation This module maintains cluster membership information for replication and provides functions to check ownership of replication jobs. A cluster membership change is registered only after a configurable `cluster_quiet_period` interval has passed since the last node addition or removal. This is useful in cases of rolling node reboots in a cluster in order to avoid rescanning for membership changes after every node up and down event, and instead doing only on rescan at the very end. Jira: COUCHDB-3324 --- .../src/couch_replicator_clustering.erl | 243 +++++++++++++++++++++ 1 file changed, 243 insertions(+) diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl new file mode 100644 index 0000000..7618f24 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_clustering.erl @@ -0,0 +1,243 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +% Maintain cluster membership and stability notifications for replications. +% On changes to cluster membership, broadcast events to `replication` gen_event. +% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events. +% +% Cluster stability is defined as "there have been no nodes added or removed in +% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a +% speedier startup, during initialization there is a shorter StartupPeriod +% in effect (also configurable). +% +% This module is also in charge of calculating ownership of replications based +% on where their _replicator db documents shards live. + + +-module(couch_replicator_clustering). + +-behaviour(gen_server). +-behaviour(config_listener). + +-export([ + start_link/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_info/2, + handle_cast/2, + code_change/3 +]). + +-export([ + owner/2, + owner/3, + is_stable/0, + link_cluster_event_listener/3 +]). + +% config_listener callbacks +-export([ + handle_config_change/5, + handle_config_terminate/3 +]). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +-define(DEFAULT_QUIET_PERIOD, 60). % seconds +-define(DEFAULT_START_PERIOD, 5). % seconds +-define(RELISTEN_DELAY, 5000). + +-record(state, { + start_time :: erlang:timestamp(), + last_change :: erlang:timestamp(), + period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(), + start_period = ?DEFAULT_START_PERIOD :: non_neg_integer(), + timer :: reference() +}). + + +-spec start_link() -> {ok, pid()} | ignore | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +% owner/2 function computes ownership for a {DbName, DocId} tuple +% `unstable` if cluster is considered to be unstable i.e. it has changed +% recently, or returns node() which of the owner. +% +-spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable. +owner(<<"shards/", _/binary>> = DbName, DocId) -> + case is_stable() of + false -> + unstable; + true -> + owner_int(DbName, DocId) + end; +owner(_DbName, _DocId) -> + node(). + + +% Direct calculation of node membership. This is the algorithm part. It +% doesn't read the shard map, just picks owner based on a hash. +-spec owner(binary(), binary(), [node()]) -> node(). +owner(DbName, DocId, Nodes) -> + hd(mem3_util:rotate_list({DbName, DocId}, lists:usort(Nodes))). + + +-spec is_stable() -> true | false. +is_stable() -> + gen_server:call(?MODULE, is_stable). + + +-spec link_cluster_event_listener(atom(), atom(), list()) -> pid(). +link_cluster_event_listener(Mod, Fun, Args) + when is_atom(Mod), is_atom(Fun), is_list(Args) -> + CallbackFun = + fun(Event = {cluster, _}) -> erlang:apply(Mod, Fun, Args ++ [Event]); + (_) -> ok + end, + {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun), + Pid. + + +% gen_server callbacks + +init([]) -> + net_kernel:monitor_nodes(true), + ok = config:listen_for_changes(?MODULE, nil), + Period = abs(config:get_integer("replicator", "cluster_quiet_period", + ?DEFAULT_QUIET_PERIOD)), + StartPeriod = abs(config:get_integer("replicator", "cluster_start_period", + ?DEFAULT_START_PERIOD)), + couch_log:debug("Initialized clustering gen_server ~w", [self()]), + couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), + {ok, #state{ + start_time = os:timestamp(), + last_change = os:timestamp(), + period = Period, + start_period = StartPeriod, + timer = new_timer(StartPeriod) + }}. + + +terminate(_Reason, _State) -> + ok. + + +handle_call(is_stable, _From, State) -> + {reply, is_stable(State), State}. + + +handle_cast({set_period, QuietPeriod}, State) -> + {noreply, State#state{period = QuietPeriod}}. + + +handle_info({nodeup, Node}, State) -> + Timer = new_timer(interval(State)), + couch_replicator_notifier:notify({cluster, unstable}), + couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), + couch_log:notice("~s : nodeup ~s, cluster unstable", [?MODULE, Node]), + {noreply, State#state{last_change = os:timestamp(), timer = Timer}}; + +handle_info({nodedown, Node}, State) -> + Timer = new_timer(interval(State)), + couch_replicator_notifier:notify({cluster, unstable}), + couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), + couch_log:notice("~s : nodedown ~s, cluster unstable", [?MODULE, Node]), + {noreply, State#state{last_change = os:timestamp(), timer = Timer}}; + +handle_info(stability_check, State) -> + erlang:cancel_timer(State#state.timer), + case is_stable(State) of + true -> + couch_replicator_notifier:notify({cluster, stable}), + couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1), + couch_log:notice("~s : publish cluster `stable` event", [?MODULE]), + {noreply, State}; + false -> + Timer = new_timer(interval(State)), + {noreply, State#state{timer = Timer}} + end; + +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%% Internal functions + +-spec new_timer(non_neg_integer()) -> reference(). +new_timer(IntervalSec) -> + erlang:send_after(IntervalSec * 1000, self(), stability_check). + + +% For the first Period seconds after node boot we check cluster stability every +% StartPeriod seconds. Once the initial Period seconds have passed we continue +% to monitor once every Period seconds +-spec interval(#state{}) -> non_neg_integer(). +interval(#state{period = Period, start_period = StartPeriod, + start_time = T0}) -> + case now_diff_sec(T0) > Period of + true -> + % Normal operation + Period; + false -> + % During startup + StartPeriod + end. + + +-spec is_stable(#state{}) -> boolean(). +is_stable(#state{last_change = TS} = State) -> + now_diff_sec(TS) > interval(State). + + +-spec now_diff_sec(erlang:timestamp()) -> non_neg_integer(). +now_diff_sec(Time) -> + case timer:now_diff(os:timestamp(), Time) of + USec when USec < 0 -> + 0; + USec when USec >= 0 -> + USec / 1000000 + end. + + +handle_config_change("replicator", "cluster_quiet_period", V, _, S) -> + ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}), + {ok, S}; +handle_config_change(_, _, _, _, S) -> + {ok, S}. + + +handle_config_terminate(_, stop, _) -> ok; +handle_config_terminate(_S, _R, _St) -> + Pid = whereis(?MODULE), + erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). + + +-spec owner_int(binary(), binary()) -> node(). +owner_int(ShardName, DocId) -> + DbName = mem3:dbname(ShardName), + Live = [node() | nodes()], + Shards = mem3:shards(DbName, DocId), + Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)], + owner(DbName, DocId, Nodes). -- To stop receiving notification emails like this one, please contact "commits@couchdb.apache.org" .