couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [02/44] couch-replicator commit: updated refs/heads/63012-defensive to 1afa5ea
Date Tue, 07 Jun 2016 11:05:06 GMT
Cluster ownership module implementation.

Handles owner/2 calls and returns:
 `{ok, node()} | {error, no_owner} | {error, unstable}`

`unstable` means cluster has received nodeup or nodedown recently (
a configurable parameter defaulting to 60 sec).

`no_owner` means ownership cannot be determined, usually returned for
cases when doc_id is null (this is a _replicate endpoint replication).


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/e36f2dac
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/e36f2dac
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/e36f2dac

Branch: refs/heads/63012-defensive
Commit: e36f2dacd5dc06698a40beedcb89fd4897938c78
Parents: bb34ad5
Author: Nick Vatamaniuc <vatamane@gmail.com>
Authored: Wed Apr 20 15:42:29 2016 -0400
Committer: Robert Newson <rnewson@apache.org>
Committed: Fri May 13 15:21:20 2016 +0100

----------------------------------------------------------------------
 src/couch_replicator_clustering.erl | 154 +++++++++++++++++++++++++++++++
 1 file changed, 154 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/e36f2dac/src/couch_replicator_clustering.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_clustering.erl b/src/couch_replicator_clustering.erl
new file mode 100644
index 0000000..8c761d7
--- /dev/null
+++ b/src/couch_replicator_clustering.erl
@@ -0,0 +1,154 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_clustering).
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+% public API
+-export([start_link/0, owner/2, owner/1]).
+
+% gen_server callbacks
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2,
+         code_change/3, terminate/2]).
+
+% config_listener callbacks
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-define(DEFAULT_QUIET_PERIOD, 60). % seconds
+
+-record(state, {
+    last_change :: erlang:timestamp(),
+    quiet_period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer()
+}).
+
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+% owner/2 function computes ownership for a {DbName, DocId} tuple
+% Returns {ok no_owner} in case no DocId is null. That case
+% would happen in the old replicator_manager if replication was
+% posted from _replicate endpoint and not via a document in 
+% *_replicator db.
+%
+% {error, unstable} value is returned if cluster membership has
+% been changing recently. Recency is a configuration parameter.
+%
+-spec owner(Dbname :: binary(), DocId :: binary() | null) ->
+    {ok, node()} | {error, no_owner} | {error, unstable}.
+owner(_DbName, null) ->
+    {ok, no_owner};
+owner(<<"shards/", _/binary>> = DbName, DocId) ->
+    IsStable = gen_server:call(?MODULE, is_stable, infinity),
+    case IsStable of
+        false ->
+            {error, unstable};
+        true ->
+            {ok, owner_int(DbName, DocId)}
+    end;
+owner(_DbName, _DocId) ->
+    {ok, node()}.
+
+
+
+% owner/1 function computes ownership based on the single
+% input Key parameter. It will uniformly distribute this Key
+% across the list of current live nodes in the cluster without
+% regard to shard ownership.
+%
+% Originally this function was used in chttpd for replications
+% coming from _replicate endpoint. It was called choose_node
+% and was called like this:
+%  choose_node([couch_util:get_value(<<"source">>, Props),
+%               couch_util:get_value(<<"target">>, Props)])
+%
+-spec owner(term()) -> node().
+owner(Key) when is_binary(Key) ->
+    Checksum = erlang:crc32(Key),
+    Nodes = lists:sort([node() | nodes()]),
+    lists:nth(1 + Checksum rem length(Nodes), Nodes);
+owner(Key) ->
+    owner(term_to_binary(Key)).
+
+
+% gen_server callbacks
+
+
+init([]) ->
+    net_kernel:monitor_nodes(true),
+    ok = config:listen_for_changes(?MODULE, self()),
+    couch_log:debug("Initialized clustering gen_server ~w",[self()]),
+    {ok, #state{last_change = os:timestamp()}}.
+
+
+handle_call(is_stable, _From, State) ->
+    % os:timestamp() results are not guaranteed to be monotonic
+    Sec = case timer:now_diff(os:timestamp(), State#state.last_change) of
+              USec when USec < 0 ->
+                  0;
+              USec when USec >= 0 ->
+                  USec / 1000000
+          end,
+    {reply, Sec > State#state.quiet_period, State}.
+
+
+handle_cast({set_quiet_period, QuietPeriod}, State) when
+    is_integer(QuietPeriod), QuietPeriod > 0 ->
+    {noreply, State#state{quiet_period = QuietPeriod}}.
+
+
+handle_info({nodeup, _Node}, State) ->
+    {noreply, State#state{last_change = os:timestamp()}};
+
+handle_info({nodedown, _Node}, State) ->
+    {noreply, State#state{last_change = os:timestamp()}}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+%% Internal functions
+
+
+handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
+    ok = gen_server:cast(S, {set_quiet_period, list_to_integer(V)}),
+    {ok, S};
+handle_config_change(_, _, _, _, S) ->
+    {ok, S}.
+
+
+handle_config_terminate(_, stop, _) -> ok;
+handle_config_terminate(Self, _, _) ->
+    spawn(fun() ->
+        timer:sleep(5000),
+        config:listen_for_changes(?MODULE, Self)
+    end).
+
+
+owner_int(DbName, DocId) ->
+    Live = [node() | nodes()],
+    Nodes = [N || #shard{node=N} <- mem3:shards(mem3:dbname(DbName), DocId),
+                  lists:member(N, Live)],
+    hd(mem3_util:rotate_list({DbName, DocId}, lists:sort(Nodes))).
+
+


Mime
View raw message