couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [10/50] mem3 commit: updated refs/heads/master to 64c0c74
Date Thu, 28 Aug 2014 12:22:30 GMT
Update to use the new couch_event application


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/8f9f58f8
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/8f9f58f8
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/8f9f58f8

Branch: refs/heads/master
Commit: 8f9f58f87678f589be884f42c7ec92762534583e
Parents: f9c2276
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Tue Apr 23 17:26:30 2013 -0500
Committer: Robert Newson <rnewson@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3.app.src  |  3 ++-
 src/mem3_sync.erl | 70 ++++++++++++++++++++++++++++----------------------
 2 files changed, 41 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/8f9f58f8/src/mem3.app.src
----------------------------------------------------------------------
diff --git a/src/mem3.app.src b/src/mem3.app.src
index ee8ba56..79f2119 100644
--- a/src/mem3.app.src
+++ b/src/mem3.app.src
@@ -45,6 +45,7 @@
         mochiweb,
         couch,
         rexi,
-        couch_log
+        couch_log,
+        couch_event
     ]}
 ]}.

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/8f9f58f8/src/mem3_sync.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl
index 579c515..d85cb2f 100644
--- a/src/mem3_sync.erl
+++ b/src/mem3_sync.erl
@@ -18,6 +18,8 @@
 -export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
     remove_node/1, initial_sync/1, get_backlog/0]).
 
+-export([handle_db_event/3]).
+
 -import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]).
 
 -include_lib("mem3/include/mem3.hrl").
@@ -29,7 +31,7 @@
     limit,
     dict = dict:new(),
     waiting = queue:new(),
-    update_notifier
+    event_listener
 }).
 
 -record(job, {name, node, count=nil, pid=nil}).
@@ -65,9 +67,9 @@ init([]) ->
     process_flag(trap_exit, true),
     Concurrency = config:get("mem3", "sync_concurrency", "10"),
     gen_event:add_handler(mem3_events, mem3_sync_event, []),
-    {ok, Pid} = start_update_notifier(),
+    {ok, Pid} = start_event_listener(),
     initial_sync(),
-    {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
+    {ok, #state{limit = list_to_integer(Concurrency), event_listener=Pid}}.
 
 handle_call({push, Job}, From, State) ->
     handle_cast({push, Job#job{pid = From}}, State);
@@ -116,9 +118,9 @@ handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) ->
         S =:= Shard],
     {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}.
 
-handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
-    {ok, NewPid} = start_update_notifier(),
-    {noreply, State#state{update_notifier=NewPid}};
+handle_info({'EXIT', Pid, _}, #state{event_listener=Pid} = State) ->
+    {ok, NewPid} = start_event_listener(),
+    {noreply, State#state{event_listener=NewPid}};
 
 handle_info({'EXIT', Active, normal}, State) ->
     handle_replication_exit(State, Active);
@@ -262,32 +264,38 @@ submit_replication_tasks(LocalNode, Live, Shards) ->
 sync_push(ShardName, N) ->
     gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity).
 
-start_update_notifier() ->
-    Db1 = nodes_db(),
-    Db2 = shards_db(),
-    Db3 = users_db(),
-    couch_db_update_notifier:start_link(fun
-    ({updated, Db}) when Db == Db1 ->
-        Nodes = mem3:nodes(),
+start_event_listener() ->
+    State = {nodes_db(), shards_db(), users_db()},
+    couch_event:link_listener(?MODULE, handle_db_event, State, [all_dbs]).
+
+handle_db_event(NodesDb, updated, {NodesDb, _, _}=St) ->
+    Nodes = mem3:nodes(),
+    Live = nodes(),
+    [?MODULE:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)],
+    {ok, St};
+handle_db_event(ShardsDb, updated, {_, ShardsDb, _}=St) ->
+    ?MODULE:push(ShardsDb, find_next_node()),
+    {ok, St};
+handle_db_event(UsersDb, updated, {_, _, UsersDb}=St) ->
+    ?MODULE:push(UsersDb, find_next_node()),
+    {ok, St};
+handle_db_event(<<"shards/", _/binary>> = ShardName, updated, St) ->
+    try mem3:shards(mem3:dbname(ShardName)) of
+    Shards ->
+        Targets = [S || #shard{node=N, name=Name} = S <- Shards,
+            N =/= node(), Name =:= ShardName],
         Live = nodes(),
-        [?MODULE:push(Db1, N) || N <- Nodes, lists:member(N, Live)];
-    ({updated, Db}) when Db == Db2; Db == Db3 ->
-        ?MODULE:push(Db, find_next_node());
-    ({updated, <<"shards/", _/binary>> = ShardName}) ->
-        % TODO deal with split/merged partitions by comparing keyranges
-        try mem3:shards(mem3:dbname(ShardName)) of
-        Shards ->
-            Targets = [S || #shard{node=N, name=Name} = S <- Shards,
-                N =/= node(), Name =:= ShardName],
-            Live = nodes(),
-            [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets,
-                lists:member(N, Live)]
-        catch error:database_does_not_exist ->
-            ok
-        end;
-    ({deleted, <<"shards/", _:18/binary, _/binary>> = ShardName}) ->
-        gen_server:cast(?MODULE, {remove_shard, ShardName});
-    (_) -> ok end).
+        [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets,
+            lists:member(N, Live)]
+    catch error:database_does_not_eist ->
+        ok
+    end,
+    {ok, St};
+handle_db_event(<<"shards/", _:18/binary, _/binary>> =ShardName, deleted, St)
->
+    gen_server:cast(?MODULE, {remove_shard, ShardName}),
+    {ok, St};
+handle_db_event(_DbName, _Event, St) ->
+    {ok, St}.
 
 find_next_node() ->
     LiveNodes = [node()|nodes()],


Mime
View raw message