couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [1/4] mem3 commit: updated refs/heads/master to 0b70afb
Date Mon, 09 May 2016 21:29:18 GMT
Repository: couchdb-mem3
Updated Branches:
  refs/heads/master 699308f51 -> 0b70afb7c


Refactor mem3_sync events to dedicated module

COUCHDB-2984


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/d3ce2273
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/d3ce2273
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/d3ce2273

Branch: refs/heads/master
Commit: d3ce2273c0c1eba5b4107e7bb0a83aaa1736cc6a
Parents: 699308f
Author: Benjamin Anderson <b@banjiewen.net>
Authored: Sat Apr 9 20:55:58 2016 -0700
Committer: Benjamin Anderson <b@banjiewen.net>
Committed: Sat Apr 9 21:56:55 2016 -0700

----------------------------------------------------------------------
 src/mem3_sup.erl                 |  3 +-
 src/mem3_sync.erl                | 61 ++++---------------------
 src/mem3_sync_event_listener.erl | 85 +++++++++++++++++++++++++++++++++++
 3 files changed, 95 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/d3ce2273/src/mem3_sup.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sup.erl b/src/mem3_sup.erl
index 2662a7c..80b8ca3 100644
--- a/src/mem3_sup.erl
+++ b/src/mem3_sup.erl
@@ -23,7 +23,8 @@ init(_Args) ->
         child(mem3_nodes),
         child(mem3_sync_nodes), % Order important?
         child(mem3_sync),
-        child(mem3_shards)
+        child(mem3_shards),
+        child(mem3_sync_event_listener)
     ],
     {ok, {{one_for_one,10,1}, couch_epi:register_service(mem3_epi, Children)}}.
 

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/d3ce2273/src/mem3_sync.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl
index 28a8261..88f4ad4 100644
--- a/src/mem3_sync.erl
+++ b/src/mem3_sync.erl
@@ -17,28 +17,20 @@
     code_change/3]).
 
 -export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
-    remove_node/1, initial_sync/1, get_backlog/0]).
-
--export([handle_db_event/3]).
+    remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0,
+    shards_db/0, users_db/0, find_next_node/0]).
 
 -import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]).
 
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
--record(event_listener, {
-    nodes,
-    shards,
-    users
-}).
-
 -record(state, {
     active = [],
     count = 0,
     limit,
     dict = dict:new(),
-    waiting = queue:new(),
-    event_listener
+    waiting = queue:new()
 }).
 
 -record(job, {name, node, count=nil, pid=nil}).
@@ -70,13 +62,15 @@ push(_) ->
 remove_node(Node) ->
     gen_server:cast(?MODULE, {remove_node, Node}).
 
+remove_shard(Shard) ->
+    gen_server:cast(?MODULE, {remove_shard, Shard}).
+
 init([]) ->
     process_flag(trap_exit, true),
     Concurrency = config:get("mem3", "sync_concurrency", "10"),
     gen_event:add_handler(mem3_events, mem3_sync_event, []),
-    {ok, Pid} = start_event_listener(),
     initial_sync(),
-    {ok, #state{limit = list_to_integer(Concurrency), event_listener=Pid}}.
+    {ok, #state{limit = list_to_integer(Concurrency)}}.
 
 handle_call({push, Job}, From, State) ->
     handle_cast({push, Job#job{pid = From}}, State);
@@ -125,10 +119,6 @@ handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) ->
         S =:= Shard],
     {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}.
 
-handle_info({'EXIT', Pid, _}, #state{event_listener=Pid} = State) ->
-    {ok, NewPid} = start_event_listener(),
-    {noreply, State#state{event_listener=NewPid}};
-
 handle_info({'EXIT', Active, normal}, State) ->
     handle_replication_exit(State, Active);
 
@@ -271,42 +261,7 @@ submit_replication_tasks(LocalNode, Live, Shards) ->
 sync_push(ShardName, N) ->
     gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity).
 
-start_event_listener() ->
-    State = #event_listener{
-        nodes = nodes_db(),
-        shards = shards_db(),
-        users = users_db()
-    },
-    couch_event:link_listener(?MODULE, handle_db_event, State, [all_dbs]).
-
-handle_db_event(NodesDb, updated, #event_listener{nodes = NodesDb} = St) ->
-    Nodes = mem3:nodes(),
-    Live = nodes(),
-    [?MODULE:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)],
-    {ok, St};
-handle_db_event(ShardsDb, updated, #event_listener{shards = ShardsDb} = St) ->
-    ?MODULE:push(ShardsDb, find_next_node()),
-    {ok, St};
-handle_db_event(UsersDb, updated, #event_listener{users = UsersDb} = St) ->
-    ?MODULE:push(UsersDb, find_next_node()),
-    {ok, St};
-handle_db_event(<<"shards/", _/binary>> = ShardName, updated, St) ->
-    try mem3:shards(mem3:dbname(ShardName)) of
-    Shards ->
-        Targets = [S || #shard{node=N, name=Name} = S <- Shards,
-            N =/= node(), Name =:= ShardName],
-        Live = nodes(),
-        [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets,
-            lists:member(N, Live)]
-    catch error:database_does_not_exist ->
-        ok
-    end,
-    {ok, St};
-handle_db_event(<<"shards/", _:18/binary, _/binary>> =ShardName, deleted, St)
->
-    gen_server:cast(?MODULE, {remove_shard, ShardName}),
-    {ok, St};
-handle_db_event(_DbName, _Event, St) ->
-    {ok, St}.
+
 
 find_next_node() ->
     LiveNodes = [node()|nodes()],

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/d3ce2273/src/mem3_sync_event_listener.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync_event_listener.erl b/src/mem3_sync_event_listener.erl
new file mode 100644
index 0000000..7059347
--- /dev/null
+++ b/src/mem3_sync_event_listener.erl
@@ -0,0 +1,85 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sync_event_listener).
+-behavior(couch_event_listener).
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_event/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+-include_lib("mem3/include/mem3.hrl").
+
+-record(state, {
+    nodes,
+    shards,
+    users
+}).
+
+start_link() ->
+    couch_event_listener:start_link(?MODULE, [], [all_dbs]).
+
+init(_) ->
+    State = #state{
+        nodes = mem3_sync:nodes_db(),
+        shards = mem3_sync:shards_db(),
+        users = mem3_sync:users_db()
+    },
+    {ok, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_event(NodesDb, updated, #state{nodes = NodesDb} = St) ->
+    Nodes = mem3:nodes(),
+    Live = nodes(),
+    [mem3_sync:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)],
+    {ok, St};
+handle_event(ShardsDb, updated, #state{shards = ShardsDb} = St) ->
+    mem3_sync:push(ShardsDb, mem3_sync:find_next_node()),
+    {ok, St};
+handle_event(UsersDb, updated, #state{users = UsersDb} = St) ->
+    mem3_sync:push(UsersDb, mem3_sync:find_next_node()),
+    {ok, St};
+handle_event(<<"shards/", _/binary>> = ShardName, updated, St) ->
+    try mem3:shards(mem3:dbname(ShardName)) of
+    Shards ->
+        Targets = [S || #shard{node=N, name=Name} = S <- Shards,
+            N =/= node(), Name =:= ShardName],
+        Live = nodes(),
+        [mem3_sync:push(ShardName,N) || #shard{node=N} <- Targets,
+            lists:member(N, Live)]
+    catch error:database_does_not_exist ->
+        ok
+    end,
+    {ok, St};
+handle_event(<<"shards/", _:18/binary, _/binary>> = ShardName, deleted, St) ->
+    mem3_sync:remove_shard(ShardName),
+    {ok, St};
+handle_event(_DbName, _Event, St) ->
+    {ok, St}.
+
+handle_cast(Msg, St) ->
+    couch_log:notice("unexpected cast to mem3_sync_event_listener: ~p", [Msg]),
+    {ok, St}.
+
+handle_info(Msg, St) ->
+    couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]),
+    {ok, St}.


Mime
View raw message