Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ACD6F112AB for ; Fri, 1 Aug 2014 09:09:57 +0000 (UTC) Received: (qmail 10968 invoked by uid 500); 1 Aug 2014 09:09:57 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 10771 invoked by uid 500); 1 Aug 2014 09:09:57 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 10745 invoked by uid 99); 1 Aug 2014 09:09:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Aug 2014 09:09:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3CEEE9BCB44; Fri, 1 Aug 2014 09:09:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rnewson@apache.org To: commits@couchdb.apache.org Date: Fri, 01 Aug 2014 09:09:58 -0000 Message-Id: <828174262e6849e9861004c370bafc1c@git.apache.org> In-Reply-To: <7ec92bc8976347e8be5a76bf4eac0b9b@git.apache.org> References: <7ec92bc8976347e8be5a76bf4eac0b9b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/35] git commit: Initial implementation Initial implementation There are a few places we could be more efficient in managing the ets table for event listeners but this simple approach should be able to give us a rough idea on how fast we can pump messages through this sort of design. Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/commit/cc616a77 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/tree/cc616a77 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/diff/cc616a77 Branch: refs/heads/windsor-merge Commit: cc616a77717730ac4dddc2940aff51f584385e6f Parents: 73b38e8 Author: Paul J. Davis Authored: Mon Apr 22 15:03:47 2013 -0500 Committer: Robert Newson Committed: Wed Jul 30 17:35:08 2014 +0100 ---------------------------------------------------------------------- .gitignore | 2 + src/couch_event.app.src | 22 +++++ src/couch_event.erl | 45 ++++++++++ src/couch_event_app.erl | 27 ++++++ src/couch_event_dist.erl | 83 +++++++++++++++++++ src/couch_event_int.hrl | 19 +++++ src/couch_event_listener.erl | 169 ++++++++++++++++++++++++++++++++++++++ src/couch_event_registry.erl | 109 ++++++++++++++++++++++++ src/couch_event_sup2.erl | 51 ++++++++++++ 9 files changed, 527 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1204ed7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +deps/ +ebin/ http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event.app.src ---------------------------------------------------------------------- diff --git a/src/couch_event.app.src b/src/couch_event.app.src new file mode 100644 index 0000000..17fbafd --- /dev/null +++ b/src/couch_event.app.src @@ -0,0 +1,22 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application, couch_event, [ + {description, "Event notification system for Apache CouchDB"}, + {vsn, git}, + {registered, [ + couch_event_sup, + couch_event_server + ]}, + {applications, [kernel, stdlib, couch_log, config]}, + {mod, {couch_event_app, []}} +]}. http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event.erl ---------------------------------------------------------------------- diff --git a/src/couch_event.erl b/src/couch_event.erl new file mode 100644 index 0000000..eaa4c88 --- /dev/null +++ b/src/couch_event.erl @@ -0,0 +1,45 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_event). + +-export([ + register/2, + register_all/1, + unregister/2, + unregister_all/1, + notify/2 +]). + + +-define(REGISTRY, couch_event_registry). +-define(DIST, couch_event_dist). + + +register(Pid, DbName) -> + gen_server:call(?REGISTRY, {register, Pid, DbName}). + + +register_all(Pid) -> + gen_server:call(?REGISTRY, {register, Pid, all_dbs}). + + +unregister(Pid, DbName) -> + gen_server:call(?REGISTRY, {unregister, Pid, DbName}). + + +unregister_all(Pid) -> + gen_server:call(?REGISTRY, {unregister, Pid}). + + +notify(DbName, Event) -> + gen_server:cast(?DIST, {DbName, Event}). http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_app.erl ---------------------------------------------------------------------- diff --git a/src/couch_event_app.erl b/src/couch_event_app.erl new file mode 100644 index 0000000..3a8341b --- /dev/null +++ b/src/couch_event_app.erl @@ -0,0 +1,27 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_event_app). +-behavior(application). + +-export([ + start/2, + stop/1 +]). + + +start(_StartType, _StartArgs) -> + couch_event_sup2:start_link(). + + +stop(_State) -> + ok. http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_dist.erl ---------------------------------------------------------------------- diff --git a/src/couch_event_dist.erl b/src/couch_event_dist.erl new file mode 100644 index 0000000..7f99a2d --- /dev/null +++ b/src/couch_event_dist.erl @@ -0,0 +1,83 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_event_dist). +-behavior(gen_server). + + +-export([ + start_link/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-include("couch_event_int.hrl"). + + +-record(st, { + batch_size +}). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []). + + +init(_) -> + {ok, #st{batch_size=25}}. + + +terminate(_Reason, _St) -> + ok. + + +handle_call(Msg, From, St) -> + couch_log:notice("~s ignoring call ~w from ~w", [?MODULE, Msg, From]), + {reply, ignored, St}. + + +handle_cast({DbName, Event}, #st{batch_size=BS}=St) when is_binary(DbName) -> + P1 = #client{dbname=DbName, _='_'}, + notify_clients(ets:select(?REGISTRY_TABLE, P1, BS), DbName, Event), + P2 = #client{dbname=all_dbs, _='_'}, + notify_clients(ets:select(?REGISTRY_TABLE, P2, BS), DbName, Event), + {noreply, St}; + +handle_cast(Msg, St) -> + couch_log:notice("~s ignoring cast ~w", [?MODULE, Msg]), + {noreply, St}. + + +handle_info(Msg, St) -> + couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]), + {noreply, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +notify_clients('$end_of_table', _DbName, _Event) -> + ok; +notify_clients({Clients, Cont}, DbName, Event) -> + lists:foreach(fun(#client{pid=Pid}) -> + Pid ! {'$couch_event', DbName, Event} + end, Clients), + notify_clients(ets:select(Cont), DbName, Event). http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_int.hrl ---------------------------------------------------------------------- diff --git a/src/couch_event_int.hrl b/src/couch_event_int.hrl new file mode 100644 index 0000000..dc4739e --- /dev/null +++ b/src/couch_event_int.hrl @@ -0,0 +1,19 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-define(REGISTRY_TABLE, couch_event_registry). + +-record(client, { + dbname, + pid, + ref +}). http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_listener.erl ---------------------------------------------------------------------- diff --git a/src/couch_event_listener.erl b/src/couch_event_listener.erl new file mode 100644 index 0000000..c665036 --- /dev/null +++ b/src/couch_event_listener.erl @@ -0,0 +1,169 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_event_listener). +-behavior(gen_server). + + +-export([ + start/3, + start/4, + start_link/3, + start_link/4 +]). + +-export([ + behaviour_info/1 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +behaviour_info(callbacks) -> + [ + {init,1}, + {terminate/2}, + {handle_event/2} + ]; +behaviour_info(_) -> + undefined. + + +start(Mod, Args, Options) -> + gen_server:start(?MODULE, {Mod, Args}, Options). + + +start(Name, Mod, Args, Options) -> + gen_server:start(Name, ?MODULE, {Mod, Args}, Options). + + +start_link(Mod, Args, Options) -> + gen_server:start_link(?MODULE, {Mod, Args}, Options). + + +start_link(Name, Mod, Args, Options) -> + gen_server:start_link(Name, ?MODULE, {Mod, Args}, Options). + + +init({Mod, Args}) -> + case Mod:init(Args) of + {ok, St} -> + {ok, {Mod, St}}; + {ok, St, Timeout} -> + {ok, {Mod, St}, Timeout}; + {stop, Reason} -> + {stop, Reason}; + ignore -> + ignore; + Else -> + erlang:error({bad_return, Else}) + end. + + +terminate(Reason, {Mod, St}) -> + Mod:terminate(Reason, St). + + +handle_call(Msg, From, {Mod, St}) -> + case erlang:function_exported(Mod, handle_call, 3) of + true -> + case Mod:handle_call(Msg, From, St) of + {reply, Reply, NewState} -> + {reply, Reply, {Mod, NewState}}; + {reply, Reply, NewState, Timeout} -> + {reply, Reply, {Mod, NewState}, Timeout}; + {noreply, NewState} -> + {noreply, {Mod, NewState}}; + {noreply, NewState, Timeout} -> + {noreply, {Mod, NewState}, Timeout}; + {stop, Reason, Reply, NewState} -> + {stop, Reason, Reply, {Mod, NewState}}; + {stop, Reason, NewState} -> + {stop, Reason, {Mod, NewState}}; + Else -> + erlang:error({bad_return, Else}) + end; + false -> + {stop, {invalid_call, Msg}, invalid_call, St} + end. + + +handle_cast(Msg, {Mod, St}) -> + case erlang:function_exported(Mod, handle_cast, 2) of + true -> + case Mod:handle_cast(Msg, St) of + {noreply, NewState} -> + {noreply, {Mod, NewState}}; + {noreply, NewState, Timeout} -> + {noreply, {Mod, NewState}, Timeout}; + {stop, Reason, NewState} -> + {stop, Reason, {Mod, NewState}}; + Else -> + erlang:error({bad_return, Else}) + end; + false -> + {stop, {invalid_cast, Msg}, St} + end. + + +handle_info({'$couch_event', DbName, Event}, {Mod, St}) -> + case Mod:handle_event(DbName, Event, St) of + {noreply, NewState} -> + {noreply, {Mod, NewState}}; + {noreply, NewState, Timeout} -> + {noreply, {Mod, NewState}, Timeout}; + {stop, Reason, NewState} -> + {stop, Reason, {Mod, NewState}}; + Else -> + erlang:error({bad_return, Else}) + end; + +handle_info(Msg, {Mod, St}) -> + case erlang:function_export(Mod, handle_info, 2) of + true -> + case Mod:handle_info(Msg, St) of + {noreply, NewState} -> + {noreply, {Mod, NewState}}; + {noreply, NewState, Timeout} -> + {noreply, {Mod, NewState}, Timeout}; + {stop, Reason, NewState} -> + {stop, Reason, {Mod, NewState}}; + Else -> + erlang:error({bad_return, Else}) + end; + false -> + {stop, {invalid_info, Msg}, St} + end. + + +code_change(OldVsn, {Mod, St}, Extra) -> + case erlang:function_exported(Mod, code_change, 3) of + true -> + case Mod:code_change(OldVsn, St, Extra) of + {ok, NewState} -> + {ok, {Mod, NewState}}; + {error, Reason} -> + {error, Reason}; + Else -> + erlang:error({bad_return, Else}) + end; + false -> + {ok, {Mod, St}} + end. + http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_registry.erl ---------------------------------------------------------------------- diff --git a/src/couch_event_registry.erl b/src/couch_event_registry.erl new file mode 100644 index 0000000..65fa160 --- /dev/null +++ b/src/couch_event_registry.erl @@ -0,0 +1,109 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_event_registry). +-behavior(gen_server). + + +-export([ + start_link/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-include("couch_event_int.hrl"). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []). + + +init(_) -> + EtsOpts = [ + protected, + named_table, + bag, + {keypos, #client.dbname} + ], + ets:new(?REGISTRY_TABLE, EtsOpts), + {ok, nil}. + + +terminate(_Reason, _St) -> + ok. + + +handle_call({register, Pid, DbName}, _From, St) -> + Client = #client{ + dbname = DbName, + pid = Pid, + ref = erlang:monitor(process, Pid) + }, + ets:insert(?REGISTRY_TABLE, Client), + {reply, ok, St}; + +handle_call({unregister, Pid, DbName}, _From, St) -> + Pattern = #client{dbname=DbName, pid=Pid, _='_'}, + case ets:match_object(?REGISTRY_TABLE, Pattern) of + [] -> + ok; + [#client{ref=Ref}=Cli] -> + erlang:demonitor(Ref, [flush]), + ets:delete_object(?REGISTRY_TABLE, Cli) + end, + {reply, ok, St}; + +handle_call({unregister_all, Pid}, _From, St) -> + Pattern = #client{pid=Pid, _='_'}, + case ets:match_object(?REGISTRY_TABLE, Pattern) of + [] -> + ok; + Clients -> + lists:foreach(fun(Cli) -> + erlang:demonitor(Cli#client.ref, [flush]), + % I wonder if match_delete/2 is faster + % than repeated calls to delete_object. + ets:delete_object(Cli) + end, Clients) + end, + {reply, ok, St}; + +handle_call(Msg, From, St) -> + couch_log:notice("~s ignoring call ~w from ~w", [?MODULE, Msg, From]), + {reply, ignored, St, 0}. + + +handle_cast(Msg, St) -> + couch_log:notice("~s ignoring cast ~w", [?MODULE, Msg]), + {noreply, St, 0}. + + +handle_info({'DOWN', Ref, process, Pid, _Reason}, St) -> + Pattern = #client{pid=Pid, ref=Ref, _='_'}, + ets:match_delete(?REGISTRY_TABLE, Pattern), + {noreply, St}; + +handle_info(Msg, St) -> + couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]), + {noreply, St, 0}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_sup2.erl ---------------------------------------------------------------------- diff --git a/src/couch_event_sup2.erl b/src/couch_event_sup2.erl new file mode 100644 index 0000000..039b897 --- /dev/null +++ b/src/couch_event_sup2.erl @@ -0,0 +1,51 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% This is named couch_event_sup2 to avoid +% naming collisions with the couch_event_sup +% module contained in the couch app. When +% that supervisor is removed we'll be free +% to rename this one. + +-module(couch_event_sup2). +-behavior(supervisor). + + +-export([ + start_link/0, + init/1 +]). + + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, nil). + + +init(_) -> + Children = [ + {couch_event_registry, + {couch_event_registry, start_link, []}, + permanent, + 5000, + worker, + [couch_event_register] + }, + {couch_event_dist, + {couch_event_dist, start_link, []}, + permanent, + 5000, + worker, + [couch_event_dist] + } + ], + {ok, {{one_for_one, 5, 10}, Children}}. +