couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [26/35] git commit: Use khash for tracking event listeners
Date Fri, 01 Aug 2014 09:10:22 GMT
Use khash for tracking event listeners

After doing some testing locally it became apparent that ets is a bit of
a bottleneck when used as a bag with many duplicate keys. Theoretically
this new approach could be accomplished by nesting ets tables but the
ets table limit makes that approach untenable in the long run.

This just replaces the use of ets with khash as well as runs a nested
hash table structure to store the list of pids for each database name.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/commit/8e6797a2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/tree/8e6797a2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/diff/8e6797a2

Branch: refs/heads/windsor-merge
Commit: 8e6797a2db00813d62d0bf3054153de6618022b3
Parents: 1deb3d4
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Sat May 11 15:51:42 2013 -0500
Committer: Robert Newson <rnewson@apache.org>
Committed: Wed Jul 30 17:46:56 2014 +0100

----------------------------------------------------------------------
 src/couch_event.erl          |  28 +++----
 src/couch_event_dist.erl     |  88 ----------------------
 src/couch_event_listener.erl |   4 +-
 src/couch_event_registry.erl | 142 -----------------------------------
 src/couch_event_server.erl   | 154 ++++++++++++++++++++++++++++++++++++++
 src/couch_event_sup2.erl     |  13 +---
 6 files changed, 168 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/8e6797a2/src/couch_event.erl
----------------------------------------------------------------------
diff --git a/src/couch_event.erl b/src/couch_event.erl
index 72926d6..9f8e501 100644
--- a/src/couch_event.erl
+++ b/src/couch_event.erl
@@ -26,17 +26,15 @@
     register/2,
     register_many/2,
     register_all/1,
-    unregister/2,
-    unregister_many/2,
-    unregister_all/1
+    unregister/1
 ]).
 
--define(REGISTRY, couch_event_registry).
--define(DIST, couch_event_dist).
+
+-define(SERVER, couch_event_server).
 
 
 notify(DbName, Event) ->
-    gen_server:cast(?DIST, {DbName, Event}).
+    gen_server:cast(?SERVER, {notify, DbName, Event}).
 
 
 listen(Module, Function, State, Options) ->
@@ -52,24 +50,16 @@ stop_listener(Pid) ->
 
 
 register(Pid, DbName) ->
-    gen_server:call(?REGISTRY, {register, Pid, [DbName]}).
+    gen_server:call(?SERVER, {register, Pid, [DbName]}).
 
 
 register_many(Pid, DbNames) when is_list(DbNames) ->
-    gen_server:call(?REGISTRY, {register, Pid, DbNames}).
+    gen_server:call(?SERVER, {register, Pid, DbNames}).
 
 
 register_all(Pid) ->
-    gen_server:call(?REGISTRY, {register, Pid, [all_dbs]}).
-
-
-unregister(Pid, DbName) ->
-    gen_server:call(?REGISTRY, {unregister, Pid, [DbName]}).
-
-
-unregister_many(Pid, DbNames) when is_list(DbNames) ->
-    gen_server:call(?REGISTRY, {unregister, Pid, DbNames}).
+    gen_server:call(?SERVER, {register, Pid, [all_dbs]}).
 
 
-unregister_all(Pid) ->
-    gen_server:call(?REGISTRY, {unregister, Pid}).
+unregister(Pid) ->
+    gen_server:call(?SERVER, {unregister, Pid}).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/8e6797a2/src/couch_event_dist.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_dist.erl b/src/couch_event_dist.erl
deleted file mode 100644
index 95aaf65..0000000
--- a/src/couch_event_dist.erl
+++ /dev/null
@@ -1,88 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_event_dist).
--behavior(gen_server).
-
-
--export([
-    start_link/0
-]).
-
--export([
-    init/1,
-    terminate/2,
-    handle_call/3,
-    handle_cast/2,
-    handle_info/2,
-    code_change/3
-]).
-
-
--include("couch_event_int.hrl").
-
-
--record(st, {
-    batch_size
-}).
-
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
-
-
-init(_) ->
-    {ok, #st{batch_size=25}}.
-
-
-terminate(_Reason, _St) ->
-    ok.
-
-
-handle_call(Msg, From, St) ->
-    couch_log:notice("~s ignoring call ~w from ~w", [?MODULE, Msg, From]),
-    {reply, ignored, St}.
-
-
-handle_cast({DbName, Event}, #st{batch_size=BS}=St) when is_binary(DbName) ->
-    margaret_counter:increment([couch_event, events_received]),
-    T1 = notify_clients(#client{dbname=DbName, _='_'}, BS, DbName, Event),
-    T2 = notify_clients(#client{dbname=all_dbs, _='_'}, BS, DbName, Event),
-    margaret_counter:increment([couch_event, events_delivered], T1 + T2),
-    {noreply, St};
-
-handle_cast(Msg, St) ->
-    couch_log:notice("~s ignoring cast ~w", [?MODULE, Msg]),
-    {noreply, St}.
-
-
-handle_info(Msg, St) ->
-    couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
-    {noreply, St}.
-
-
-code_change(_OldVsn, St, _Extra) ->
-    {ok, St}.
-
-
-notify_clients(Pattern, BatchSize, DbName, Event) ->
-    MSpec = [{Pattern, [], ['$_']}],
-    do_notify(ets:select(?REGISTRY_TABLE, MSpec, BatchSize), DbName, Event, 0).
-
-
-do_notify('$end_of_table', _DbName, _Event, Total) ->
-    Total;
-do_notify({Clients, Cont}, DbName, Event, Total) ->
-    lists:foreach(fun(#client{pid=Pid}) ->
-        Pid ! {'$couch_event', DbName, Event}
-    end, Clients),
-    do_notify(ets:select(Cont), DbName, Event, Total + length(Clients)).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/8e6797a2/src/couch_event_listener.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_listener.erl b/src/couch_event_listener.erl
index 04d5fc1..34db139 100644
--- a/src/couch_event_listener.erl
+++ b/src/couch_event_listener.erl
@@ -179,10 +179,10 @@ do_info(#st{module=Module, state=State}=St, Message) ->
 
 do_terminate(Reason, #st{module=Module, state=State}) ->
     % Order matters. We want to make sure Module:terminate/1
-    % is called even if couch_event:unregister_all/1 hangs
+    % is called even if couch_event:unregister/1 hangs
     % indefinitely.
     catch Module:terminate(Reason, State),
-    catch couch_event:unregister_all(self()),
+    catch couch_event:unregister(self()),
     Status = case Reason of
         normal -> normal;
         shutdown -> normal;

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/8e6797a2/src/couch_event_registry.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_registry.erl b/src/couch_event_registry.erl
deleted file mode 100644
index adea994..0000000
--- a/src/couch_event_registry.erl
+++ /dev/null
@@ -1,142 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_event_registry).
--behavior(gen_server).
-
-
--export([
-    start_link/0
-]).
-
--export([
-    init/1,
-    terminate/2,
-    handle_call/3,
-    handle_cast/2,
-    handle_info/2,
-    code_change/3
-]).
-
-
--include("couch_event_int.hrl").
-
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
-
-
-init(_) ->
-    RegistryOpts = [
-        protected,
-        named_table,
-        bag,
-        {keypos, #client.dbname}
-    ],
-    MonitorOpts = [
-        protected,
-        named_table,
-        set
-    ],
-    ets:new(?REGISTRY_TABLE, RegistryOpts),
-    ets:new(?MONITOR_TABLE, MonitorOpts),
-    {ok, nil}.
-
-
-terminate(_Reason, _St) ->
-    ok.
-
-
-handle_call({register, Pid, DbNames}, _From, St) ->
-    ToAdd = [#client{dbname=DBN, pid=Pid} || DBN <- DbNames],
-    ets:insert(?REGISTRY_TABLE, ToAdd),
-    case ets:member(?MONITOR_TABLE, Pid) of
-        true ->
-            ok;
-        false ->
-            Ref = erlang:monitor(process, Pid),
-            ets:insert(?MONITOR_TABLE, {Pid, Ref})
-    end,
-    {reply, ok, St};
-
-handle_call({unregister, Pid, DbNames}, _From, St) ->
-    % TODO: Check into a multi-pattern matchspec and the
-    % use of select_delete to see if that's faster.
-    lists:foreach(fun(DbName) ->
-        ets:match_delete(?REGISTRY_TABLE, pattern(DbName, Pid))
-    end, DbNames),
-    maybe_drop_monitor(Pid),
-    {reply, ok, St};
-
-handle_call({unregister_all, Pid}, _From, St) ->
-    ets:match_delete(?REGISTRY_TABLE, pattern(Pid)),
-    drop_monitor(Pid),
-    {reply, ok, St};
-
-handle_call(Msg, From, St) ->
-    couch_log:notice("~s ignoring call ~w from ~w", [?MODULE, Msg, From]),
-    {reply, ignored, St}.
-
-
-handle_cast(Msg, St) ->
-    couch_log:notice("~s ignoring cast ~w", [?MODULE, Msg]),
-    {noreply, St}.
-
-
-handle_info({'DOWN', _Ref, process, Pid, _Reason}, St) ->
-    ets:match_delete(?REGISTRY_TABLE, pattern(Pid)),
-    ets:delete(?REGISTRY_TABLE, Pid),
-    {norepy, St};
-
-handle_info(Msg, St) ->
-    couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
-    {noreply, St}.
-
-
-code_change(_OldVsn, St, _Extra) ->
-    {ok, St}.
-
-
-maybe_drop_monitor(Pid) ->
-    case ets:select_count(?REGISTRY_TABLE, mspec(Pid)) of
-        0 -> drop_monitor(Pid);
-        _ -> ok
-    end.
-
-
-drop_monitor(Pid) ->
-    case ets:lookup(?MONITOR_TABLE, Pid) of
-        [{Pid, Ref}] ->
-            ets:delete(?MONITOR_TABLE, Pid),
-            erlang:demonitor(Ref, [flush]);
-        [] ->
-            ok
-    end.
-
-
--compile({inline, [
-    mspec/1,
-    pattern/1,
-    pattern/2
-]}).
-
-
-mspec(Pid) ->
-    [{pattern(Pid), [], ['$_']}].
-
-
-pattern(Pid) ->
-    #client{pid=Pid, _='_'}.
-
-
-pattern(DbName, Pid) ->
-    #client{dbname=DbName, pid=Pid, _='_'}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/8e6797a2/src/couch_event_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_server.erl b/src/couch_event_server.erl
new file mode 100644
index 0000000..77e6ce3
--- /dev/null
+++ b/src/couch_event_server.erl
@@ -0,0 +1,154 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event_server).
+-behavior(gen_server).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-include("couch_event_int.hrl").
+
+
+-record(st, {
+    by_pid,
+    by_dbname
+}).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+init(_) ->
+    {ok, ByPid} = khash:new(),
+    {ok, ByDbName} = khash:new(),
+    {ok, #st{
+        by_pid = ByPid,
+        by_dbname = ByDbName
+    }}.
+
+
+terminate(_Reason, _St) ->
+    ok.
+
+
+handle_call({register, Pid, NewDbNames}, _From, St) ->
+    case khash:get(St#st.by_pid, Pid) of
+        undefined ->
+            NewRef = erlang:monitor(process, Pid),
+            register(St, NewRef, Pid, NewDbNames);
+        {ReuseRef, OldDbNames} ->
+            unregister(St, Pid, OldDbNames),
+            register(St, ReuseRef, Pid, NewDbNames)
+    end,
+    {reply, ok, St};
+
+handle_call({unregister, Pid}, _From, St) ->
+    case khash:get(St#st.by_pid, Pid) of
+        undefined ->
+            {reply, not_registered, St};
+        {Ref, OldDbNames} ->
+            unregister(St, Pid, OldDbNames),
+            erlang:demonitor(Ref, [flush])
+    end,
+    {reply, ok, St};
+
+handle_call(Msg, From, St) ->
+    couch_log:notice("~s ignoring call ~w from ~w", [?MODULE, Msg, From]),
+    {reply, ignored, St}.
+
+
+handle_cast({notify, DbName, Event}, St) ->
+    notify_listeners(St#st.by_dbname, DbName, Event),
+    {noreply, St};
+
+handle_cast(Msg, St) ->
+    couch_log:notice("~s ignoring cast ~w", [?MODULE, Msg]),
+    {noreply, St}.
+
+
+handle_info({'DOWN', Ref, process, Pid, _Reason}, St) ->
+    case khash:get(St#st.by_pid, Pid) of
+        {Ref, OldDbNames} ->
+            unregister(St, Pid, OldDbNames);
+        undefined ->
+            ok
+    end,
+    {noreply, St};
+
+handle_info(Msg, St) ->
+    couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
+    {noreply, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+notify_listeners(ByDbName, DbName, Event) ->
+    Msg = {'$couch_event', DbName, Event},
+    notify_listeners(khash:get(ByDbName, all_dbs), Msg),
+    notify_listeners(khash:get(ByDbName, DbName), Msg).
+
+
+notify_listeners(undefined, _) ->
+    ok;
+notify_listeners(Listeners, Msg) ->
+    khash:fold(Listeners, fun(Pid, _, _) -> Pid ! Msg, nil end, nil).
+
+
+register(St, Ref, Pid, DbNames) ->
+    khash:put(St#st.by_pid, Pid, {Ref, DbNames}),
+    lists:foreach(fun(DbName) ->
+        add_listener(St#st.by_dbname, DbName, Pid)
+    end, DbNames).
+
+
+add_listener(ByDbName, DbName, Pid) ->
+    case khash:lookup(ByDbName, DbName) of
+        {value, Listeners} ->
+            khash:put(Listeners, Pid, nil);
+        not_found ->
+            {ok, NewListeners} = khash:new(),
+            khash:put(NewListeners, Pid, nil),
+            khash:put(ByDbName, DbName, NewListeners)
+    end.
+
+
+unregister(St, Pid, OldDbNames) ->
+    ok = khash:del(St#st.by_pid, Pid),
+    lists:foreach(fun(DbName) ->
+        rem_listener(St#st.by_dbname, DbName, Pid)
+    end, OldDbNames).
+
+
+rem_listener(ByDbName, DbName, Pid) ->
+    {value, Listeners} = khash:lookup(ByDbName, DbName),
+    khash:del(Listeners, Pid),
+    Size = khash:size(Listeners),
+    if Size > 0 -> ok; true ->
+        khash:del(ByDbName, DbName)
+    end.
+            

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/8e6797a2/src/couch_event_sup2.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_sup2.erl b/src/couch_event_sup2.erl
index 1a74979..36fbe54 100644
--- a/src/couch_event_sup2.erl
+++ b/src/couch_event_sup2.erl
@@ -32,19 +32,12 @@ start_link() ->
 
 init(_) ->
     Children = [
-        {couch_event_registry,
-            {couch_event_registry, start_link, []},
+        {couch_event_server,
+            {couch_event_server, start_link, []},
             permanent,
             5000,
             worker,
-            [couch_event_registry]
-        },
-        {couch_event_dist,
-            {couch_event_dist, start_link, []},
-            permanent,
-            5000,
-            worker,
-            [couch_event_dist]
+            [couch_event_server]
         },
         {couch_event_os_sup,
             {couch_event_os_sup, start_link, []},


Mime
View raw message