couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject couch-dbupdates commit: updated refs/heads/windsor-merge to b9acfa8
Date Sun, 17 Aug 2014 03:05:07 GMT
Repository: couchdb-couch-dbupdates
Updated Branches:
  refs/heads/windsor-merge [created] b9acfa82e


Rewrite couch_dbupdates to use couch_event


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-dbupdates/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-dbupdates/commit/b9acfa82
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-dbupdates/tree/b9acfa82
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-dbupdates/diff/b9acfa82

Branch: refs/heads/windsor-merge
Commit: b9acfa82ef047db2bbdcfeb1ba4dd25691c90e49
Parents: 690c8c2
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Sat Aug 16 22:04:35 2014 -0500
Committer: Paul J. Davis <paul.joseph.davis@gmail.com>
Committed: Sat Aug 16 22:04:35 2014 -0500

----------------------------------------------------------------------
 src/couch_dbupdates.erl       |  46 ----------------
 src/couch_dbupdates_httpd.erl | 106 ++++++++++++++++++++++++++-----------
 2 files changed, 74 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-dbupdates/blob/b9acfa82/src/couch_dbupdates.erl
----------------------------------------------------------------------
diff --git a/src/couch_dbupdates.erl b/src/couch_dbupdates.erl
deleted file mode 100644
index e37362f..0000000
--- a/src/couch_dbupdates.erl
+++ /dev/null
@@ -1,46 +0,0 @@
--module(couch_dbupdates).
-
--export([handle_dbupdates/3]).
-
-
-handle_dbupdates(Fun, Acc, Options) ->
-    NotifierPid = db_update_notifier(),
-    try
-        loop(Fun, Acc, Options)
-    after
-        couch_db_update_notifier:stop(NotifierPid)
-    end.
-
-
-loop(Fun, Acc, Options) ->
-    [{timeout, Timeout}, {heartbeat, Heartbeat}] = Options,
-    receive
-        {db_updated, Event} ->
-            case Fun(Event, Acc) of
-                {ok, Acc1} ->
-                    loop(Fun, Acc1, Options);
-                stop ->
-                    Fun(stop, Acc)
-
-            end
-    after Timeout ->
-        case Heartbeat of
-            true ->
-                case Fun(heartbeat, Acc) of
-                {ok, Acc1} ->
-                    loop(Fun, Acc1, Options);
-                stop ->
-                    Fun(stop, Acc)
-
-                end;
-            _ ->
-                Fun(stop, Acc)
-        end
-    end.
-
-db_update_notifier() ->
-    Self = self(),
-    {ok, Notifier} = couch_db_update_notifier:start_link(fun(Event) ->
-        Self ! {db_updated, Event}
-    end),
-    Notifier.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-dbupdates/blob/b9acfa82/src/couch_dbupdates_httpd.erl
----------------------------------------------------------------------
diff --git a/src/couch_dbupdates_httpd.erl b/src/couch_dbupdates_httpd.erl
index ec0c4d6..c27043f 100644
--- a/src/couch_dbupdates_httpd.erl
+++ b/src/couch_dbupdates_httpd.erl
@@ -4,17 +4,19 @@
 
 -include_lib("couch/include/couch_db.hrl").
 
--record(state, {resp, feed}).
+-record(st, {
+    resp,
+    feed,
+    heartbeat,
+    timeout
+}).
+
 
 handle_req(#httpd{method='GET'}=Req) ->
     ok = couch_httpd:verify_is_server_admin(Req),
     Qs = couch_httpd:qs(Req),
     Feed = proplists:get_value("feed", Qs, "longpoll"),
-
-    Timeout = list_to_integer(
-                proplists:get_value("timeout", Qs, "60000")
-    ),
-
+    Timeout = list_to_integer(proplists:get_value("timeout", Qs, "60000")),
     Heartbeat0 = proplists:get_value("heartbeat", Qs),
     Heartbeat = case {Feed, Heartbeat0} of
         {"longpoll", _} -> false;
@@ -22,8 +24,6 @@ handle_req(#httpd{method='GET'}=Req) ->
         _ -> true
     end,
 
-    Options = [{timeout, Timeout}, {heartbeat, Heartbeat}],
-
     {ok, Resp} = case Feed of
         "eventsource" ->
             Headers = [
@@ -35,35 +35,77 @@ handle_req(#httpd{method='GET'}=Req) ->
             couch_httpd:start_json_response(Req, 200)
     end,
 
-    State = #state{resp=Resp, feed=Feed},
-    couch_dbupdates:handle_dbupdates(fun handle_update/2,
-                                     State, Options).
+    St1 = #st{
+        resp = Resp,
+        feed = Feed,
+        heartbeat = Heartbeat,
+        timeout = Timeout
+    },
+    {ok, St2} = run(St, Timeout),
+    {ok, St2#st.resp};
 
 handle_req(Req, _Db) ->
     couch_httpd:send_method_not_allowed(Req, "GET").
 
-handle_update(stop, #state{resp=Resp}) ->
-    couch_httpd:end_json_response(Resp);
-handle_update(heartbeat, #state{resp=Resp}=State) ->
-    {ok, Resp1} = couch_httpd:send_chunk(Resp, "\n"),
-    {ok, State#state{resp=Resp1}};
-handle_update(Event, #state{resp=Resp, feed="eventsource"}=State) ->
-    EventObj = event_obj(Event),
-    {ok, Resp1} = couch_httpd:send_chunk(Resp, ["data: ",
-                                                ?JSON_ENCODE(EventObj),
-                                                "\n\n"]),
-    {ok, State#state{resp=Resp1}};
-handle_update(Event, #state{resp=Resp, feed="continuous"}=State) ->
+
+run(St, Timeout) ->
+    ok = couch_event:register_all(self()),
+    try
+        loop(St, Timeout)
+    after
+        ok = couch_event:unregister(self()),
+        drain_events()
+    end.
+
+
+loop(Timeout, St) ->
+    Event = receive
+        {'$couch_event', DbName, Event} ->
+            {DbName, Event};
+    after Timeout ->
+        timeout
+    end,
+    case handle_update(Event, St) of
+        {ok, NewSt} ->
+            loop(Timeout, NewSt);
+        {stop, NewSt} ->
+            {ok, NewSt}
+    end.
+
+
+handle_update(timeout, #st{heartbeat=true}=St) ->
+    {ok, Resp1} = couch_httpd:send_chunk(St#st.resp, "\n"),
+    {ok, St#st{resp=Resp1}};
+handle_update(timeout, #st{heartbeat=false}=St) ->
+    {ok, Resp1} = couch_httpd:end_json_response(St#st.resp),
+    {stop, St#st{resp=Resp1}};
+handle_update(Event, #st{feed="eventsource"}=St) ->
     EventObj = event_obj(Event),
-    {ok, Resp1} = couch_httpd:send_chunk(Resp, [?JSON_ENCODE(EventObj) |
-                            "\n"]),
-    {ok, State#state{resp=Resp1}};
-handle_update(Event, #state{resp=Resp, feed="longpoll"}) ->
+    Chunk = ["data: ", ?JSON_ENCODE(event_obj(Event)), "\n\n"],
+    {ok, Resp1} = couch_httpd:send_chunk(Resp, Chunk),
+    {ok St#st{resp=Resp1}};
+handle_update(Event, #st{resp=Resp, feed="continuous"}=St) ->
+    Chunk = [?JSON_ENCODE(event_obj(Event)), "\n"],
+    {ok, Resp1} = couch_httpd:send_chunk(St#st.resp, Chunk),
+    {ok, St#st{resp=Resp1}};
+handle_update(Event, #st{resp=Resp, feed="longpoll"}) ->
     {Props} = event_obj(Event),
     JsonObj = {[{<<"ok">>, true} | Props]},
-    couch_httpd:send_chunk(Resp, ?JSON_ENCODE(JsonObj)),
-    stop.
+    {ok, Resp1} = couch_httpd:send_chunk(St#st.resp, ?JSON_ENCODE(JsonObj)),
+    {ok, Resp2} = couch_httpd:end_json_response(Resp1),
+    {stop, St#st{resp=Resp2}}.
+
+
+event_obj({DbName, Event}) when is_atom(Event) ->
+    {[
+        {<<"type">>, couch_util:to_binary(Type)},
+        {<<"db_name">>, couch_util:to_binary(DbName)}
+    ]};
+event_obj({DbName, {ddoc_updated, DDocId}}) ->
+    {[
+        {<<"type">>, <<"ddoc_updated">>},
+        {<<"db_name">>, couch_util:to_binary(DbName)},
+        {<<"ddoc_id">>, couch_util:to_binary(DDocId)}
+    ]}.
+
 
-event_obj({Type, DbName}) ->
-    {[{<<"type">>, couch_util:to_binary(Type)},
-      {<<"db_name">>, couch_util:to_binary(DbName)}]}.


Mime
View raw message