Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1585A200C38 for ; Wed, 15 Mar 2017 20:25:32 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 143C5160B95; Wed, 15 Mar 2017 19:25:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 41824160B60 for ; Wed, 15 Mar 2017 20:25:31 +0100 (CET) Received: (qmail 53290 invoked by uid 500); 15 Mar 2017 19:25:30 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 53281 invoked by uid 99); 15 Mar 2017 19:25:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Mar 2017 19:25:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 550C7DFBC9; Wed, 15 Mar 2017 19:25:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davisp@apache.org To: commits@couchdb.apache.org Date: Wed, 15 Mar 2017 19:25:30 -0000 Message-Id: <04a19e465b9b447a92941b722cfd4031@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928 [Forced Update!] archived-at: Wed, 15 Mar 2017 19:25:32 -0000 Repository: couchdb-mem3 Updated Branches: refs/heads/COUCHDB-3326-clustered-purge 8b3a6b19a -> e4e892899 (forced update) Update handle_config_terminate API COUCHDB-3102 Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/432264ae Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/432264ae Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/432264ae Branch: refs/heads/COUCHDB-3326-clustered-purge Commit: 432264ae1f66ea7ad960352583899cb4940e2723 Parents: 15615b2 Author: ILYA Khlopotov Authored: Wed Aug 17 11:57:54 2016 -0700 Committer: ILYA Khlopotov Committed: Tue Aug 23 12:28:03 2016 -0700 ---------------------------------------------------------------------- src/mem3_shards.erl | 16 ++-- src/mem3_sync_event_listener.erl | 173 +++++++++++++++++++++++++--------- 2 files changed, 138 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/432264ae/src/mem3_shards.erl ---------------------------------------------------------------------- diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl index 8a1bb54..c7f33c6 100644 --- a/src/mem3_shards.erl +++ b/src/mem3_shards.erl @@ -12,7 +12,7 @@ -module(mem3_shards). -behaviour(gen_server). --vsn(2). +-vsn(3). -behaviour(config_listener). -export([init/1, terminate/2, code_change/3]). @@ -36,6 +36,7 @@ -define(DBS, mem3_dbs). -define(SHARDS, mem3_shards). -define(ATIMES, mem3_atimes). +-define(RELISTEN_DELAY, 5000). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -173,12 +174,10 @@ handle_config_change("mem3", "shards_db", _DbName, _, _) -> handle_config_change(_, _, _, _, _) -> {ok, nil}. -handle_config_terminate(_, stop, _) -> ok; -handle_config_terminate(_, _, _) -> - spawn(fun() -> - timer:sleep(5000), - config:listen_for_changes(?MODULE, nil) - end). +handle_config_terminate(_, stop, _) -> + ok; +handle_config_terminate(_Server, _Reason, _State) -> + erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). init([]) -> ets:new(?SHARDS, [ @@ -235,6 +234,9 @@ handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) -> handle_info({start_listener, Seq}, St) -> {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), {noreply, St#st{changes_pid=NewPid}}; +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; handle_info(_Msg, St) -> {noreply, St}. http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/432264ae/src/mem3_sync_event_listener.erl ---------------------------------------------------------------------- diff --git a/src/mem3_sync_event_listener.erl b/src/mem3_sync_event_listener.erl index ca058db..7859c31 100644 --- a/src/mem3_sync_event_listener.erl +++ b/src/mem3_sync_event_listener.erl @@ -12,7 +12,7 @@ -module(mem3_sync_event_listener). -behavior(couch_event_listener). --behavior(config_listener). +-vsn(1). -export([ start_link/0 @@ -26,13 +26,14 @@ handle_info/2 ]). --export([ - handle_config_change/5, - handle_config_terminate/3 -]). - -include_lib("mem3/include/mem3.hrl"). +-ifdef(TEST). +-define(RELISTEN_DELAY, 500). +-else. +-define(RELISTEN_DELAY, 5000). +-endif. + -record(state, { nodes, shards, @@ -59,7 +60,7 @@ start_link() -> couch_event_listener:start_link(?MODULE, [], [all_dbs]). init(_) -> - config:listen_for_changes(?MODULE, undefined), + ok = subscribe_for_config(), Delay = config:get_integer("mem3", "sync_delay", 5000), Frequency = config:get_integer("mem3", "sync_frequency", 500), Buckets = lists:duplicate(Delay div Frequency + 1, sets:new()), @@ -110,48 +111,32 @@ handle_cast(Msg, St) -> handle_info(timeout, St) -> maybe_push_shards(St); +handle_info({config_change, "mem3", "sync_delay", Value, _}, St) -> + set_config(set_delay, Value, "ignoring bad value for mem3.sync_delay"), + maybe_push_shards(St); +handle_info({config_change, "mem3", "sync_frequency", Value, _}, St) -> + set_config(set_frequency, Value, "ignoring bad value for mem3.sync_frequency"), + maybe_push_shards(St); +handle_info({gen_event_EXIT, _Handler, _Reason}, St) -> + erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener), + maybe_push_shards(St); +handle_info(restart_config_listener, St) -> + ok = subscribe_for_config(), + maybe_push_shards(St); +handle_info({get_state, Ref, Caller}, St) -> + Caller ! {Ref, St}, + {ok, St}; handle_info(Msg, St) -> couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]), maybe_push_shards(St). -handle_config_change("mem3", "sync_delay", Delay0, _, St) -> - try list_to_integer(Delay0) of - Delay1 -> - couch_event_listener:cast( - ?MODULE, - {set_delay, Delay1} - ) - catch error:badarg -> - couch_log:warning( - "ignoring bad value for mem3.sync_delay: ~p", - [Delay0] - ) - end, - {ok, St}; -handle_config_change("mem3", "sync_frequency", Frequency0, _, St) -> - try list_to_integer(Frequency0) of - Frequency1 -> - couch_event_listener:cast( - ?MODULE, - {set_frequency, Frequency1} - ) +set_config(Cmd, Value, Error) -> + try list_to_integer(Value) of + IntegerValue -> + couch_event_listener:cast(self(), {Cmd, IntegerValue}) catch error:badarg -> - couch_log:warning( - "ignoring bad value for mem3.sync_frequency: ~p", - [Frequency0] - ) - end, - {ok, St}; -handle_config_change(_, _, _, _, St) -> - {ok, St}. - -handle_config_terminate(_, stop, _) -> ok; -handle_config_terminate(_Server, _Reason, St) -> - Fun = fun() -> - timer:sleep(5000), - config:listen_for_changes(?MODULE, St) - end, - spawn(Fun). + couch_log:warning("~s: ~p", [Error, Value]) + end. bucket_shard(ShardName, [B|Bs]=Buckets0) -> case waiting(ShardName, Buckets0) of @@ -222,3 +207,103 @@ push_shard(ShardName) -> catch error:database_does_not_exist -> ok end. + +subscribe_for_config() -> + config:subscribe_for_changes([ + {"mem3", "sync_delay"}, + {"mem3", "sync_frequency"} + ]). + +-ifdef(TEST). +-include_lib("couch/include/couch_eunit.hrl"). + +setup() -> + ok = meck:new(couch_event, [passthrough]), + ok = meck:expect(couch_event, register_all, ['_'], ok), + + ok = meck:new(config_notifier, [passthrough]), + ok = meck:expect(config_notifier, handle_event, [ + {[{'_', '_', "error", '_'}, '_'], meck:raise(throw, raised_error)}, + {['_', '_'], meck:passthrough()} + ]), + + application:start(config), + {ok, Pid} = ?MODULE:start_link(), + erlang:unlink(Pid), + meck:wait(config_notifier, subscribe, '_', 1000), + Pid. + +teardown(Pid) -> + exit(Pid, shutdown), + application:stop(config), + (catch meck:unload(couch_event)), + (catch meck:unload(config_notifier)), + ok. + +subscribe_for_config_test_() -> + { + "Subscrive for configuration changes", + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_set_sync_delay/1, + fun should_set_sync_frequency/1, + fun should_restart_listener/1, + fun should_terminate/1 + ] + } + }. + +should_set_sync_delay(Pid) -> + ?_test(begin + config:set("mem3", "sync_delay", "123", false), + ?assertMatch(#state{delay = 123}, capture(Pid)), + ok + end). + +should_set_sync_frequency(Pid) -> + ?_test(begin + config:set("mem3", "sync_frequency", "456", false), + ?assertMatch(#state{frequency = 456}, capture(Pid)), + ok + end). + +should_restart_listener(Pid) -> + ?_test(begin + meck:reset(config_notifier), + config:set("mem3", "sync_frequency", "error", false), + + meck:wait(config_notifier, subscribe, '_', 1000), + ok + end). + +should_terminate(Pid) -> + ?_test(begin + ?assert(is_process_alive(Pid)), + + EventMgr = whereis(config_event), + + RestartFun = fun() -> exit(EventMgr, kill) end, + test_util:with_process_restart(config_event, RestartFun), + + ?assertNot(is_process_alive(EventMgr)), + ?assertNot(is_process_alive(Pid)), + ?assert(is_process_alive(whereis(config_event))), + ok + end). + +capture(Pid) -> + Ref = make_ref(), + WaitFun = fun() -> + Pid ! {get_state, Ref, self()}, + receive + {Ref, State} -> State + after 0 -> + wait + end + end, + test_util:wait(WaitFun). + + +-endif.