couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From beno...@apache.org
Subject git commit: add Server-Sent Events protocol to db changes API. close #COUCHDB-986
Date Wed, 16 May 2012 05:36:15 GMT
Updated Branches:
  refs/heads/master af7441d8d -> 093d2aa65


add Server-Sent Events protocol to db changes API. close #COUCHDB-986

This patch add support for the new specification of w3c by adding a new
feed type named `eventsource`:

http://www.w3.org/TR/2009/WD-eventsource-20090423/

This patch is based on @indutny patch with edits.


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/093d2aa6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/093d2aa6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/093d2aa6

Branch: refs/heads/master
Commit: 093d2aa6544546a95f6133f1db3c4f4179793f3c
Parents: af7441d
Author: benoitc <benoitc@apache.org>
Authored: Wed May 16 07:30:19 2012 +0200
Committer: benoitc <benoitc@apache.org>
Committed: Wed May 16 07:30:19 2012 +0200

----------------------------------------------------------------------
 share/www/script/test/changes.js |   28 ++++++++++++++++++++++++++++
 src/couchdb/couch_changes.erl    |   15 ++++++++++-----
 src/couchdb/couch_httpd_db.erl   |   24 ++++++++++++++++++++++--
 3 files changed, 60 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/093d2aa6/share/www/script/test/changes.js
----------------------------------------------------------------------
diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js
index 19e22fd..c529b21 100644
--- a/share/www/script/test/changes.js
+++ b/share/www/script/test/changes.js
@@ -139,6 +139,34 @@ couchTests.changes = function(debug) {
     // otherwise we'll continue to receive heartbeats forever
     xhr.abort();
 
+    // test Server Sent Event (eventsource)
+    if (window.EventSource) {
+      var source = new EventSource(
+              "/test_suite_db/_changes?feed=eventsource");
+      var results = [];
+      var sourceListener = function(e) {
+        var data = JSON.parse(e.data);
+        results.push(data);
+
+      };
+
+      source.addEventListener('message', sourceListener , false);
+      
+      waitForSuccess(function() {
+        if (results.length != 3) 
+          throw "bad seq, try again";
+      });
+      
+      source.removeEventListener('message', sourceListener, false);
+
+      T(results[0].seq == 1);
+      T(results[0].id == "foo");
+    
+      T(results[1].seq == 2);
+      T(results[1].id == "bar");
+      T(results[1].changes[0].rev == docBar._rev);
+    }
+
     // test longpolling
     xhr = CouchDB.newXhr();
 

http://git-wip-us.apache.org/repos/asf/couchdb/blob/093d2aa6/src/couchdb/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl
index aec7873..85c9e54 100644
--- a/src/couchdb/couch_changes.erl
+++ b/src/couchdb/couch_changes.erl
@@ -63,7 +63,8 @@ handle_changes(Args1, Req, Db0) ->
         put(last_changes_heartbeat, now())
     end,
 
-    if Feed == "continuous" orelse Feed == "longpoll" ->
+    case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of
+    true ->
         fun(CallbackAcc) ->
             {Callback, UserAcc} = get_callback_acc(CallbackAcc),
             Self = self(),
@@ -89,7 +90,7 @@ handle_changes(Args1, Req, Db0) ->
                 get_rest_db_updated(ok) % clean out any remaining update messages
             end
         end;
-    true ->
+    false ->
         fun(CallbackAcc) ->
             {Callback, UserAcc} = get_callback_acc(CallbackAcc),
             UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
@@ -261,7 +262,9 @@ get_changes_timeout(Args, Callback) ->
             fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
     end.
 
-start_sending_changes(_Callback, UserAcc, "continuous") ->
+start_sending_changes(_Callback, UserAcc, ResponseType)
+        when ResponseType =:= "continuous"
+        orelse ResponseType =:= "eventsource" ->
     UserAcc;
 start_sending_changes(Callback, UserAcc, ResponseType) ->
     Callback(start, ResponseType, UserAcc).
@@ -434,7 +437,9 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
 end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
     Callback({stop, EndSeq}, ResponseType, UserAcc).
 
-changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) ->
+changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc)
+        when ResponseType =:= "continuous"
+        orelse ResponseType =:= "eventsource" ->
     #changes_acc{
         filter = FilterFun, callback = Callback,
         user_acc = UserAcc, limit = Limit, db = Db,
@@ -456,7 +461,7 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc)
->
         end;
     _ ->
         ChangesRow = changes_row(Results, DocInfo, Acc),
-        UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc),
+        UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
         reset_heartbeat(),
         {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}
     end;

http://git-wip-us.apache.org/repos/asf/couchdb/blob/093d2aa6/src/couchdb/couch_httpd_db.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index de39b9e..0920014 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -76,14 +76,23 @@ handle_changes_req1(Req, #db{name=DbName}=Db) ->
 
 handle_changes_req2(Req, Db) ->
     MakeCallback = fun(Resp) ->
-        fun({change, Change, _}, "continuous") ->
+        fun({change, {ChangeProp}=Change, _}, "eventsource") ->
+            Seq = proplists:get_value(<<"seq">>, ChangeProp),
+            send_chunk(Resp, ["data: ", ?JSON_ENCODE(Change),
+                              "\n", "id: ", ?JSON_ENCODE(Seq),
+                              "\n\n"]);
+        ({change, Change, _}, "continuous") ->
             send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]);
         ({change, Change, Prepend}, _) ->
             send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]);
+        (start, "eventsource") ->
+            ok;
         (start, "continuous") ->
             ok;
         (start, _) ->
             send_chunk(Resp, "{\"results\":[\n");
+        ({stop, _EndSeq}, "eventsource") ->
+            end_json_response(Resp);
         ({stop, EndSeq}, "continuous") ->
             send_chunk(
                 Resp,
@@ -118,6 +127,15 @@ handle_changes_req2(Req, Db) ->
                 end
             )
         end;
+    "eventsource" ->
+        Headers = [
+            {"Content-Type", "text/event-stream"},
+            {"Cache-Control", "no-cache"}
+        ],
+        {ok, Resp} = couch_httpd:start_json_response(Req, 200, Headers),
+        fun(FeedChangesFun) ->
+            FeedChangesFun(MakeCallback(Resp))
+        end;
     _ ->
         % "longpoll" or "continuous"
         {ok, Resp} = couch_httpd:start_json_response(Req, 200),
@@ -1097,13 +1115,15 @@ parse_doc_query(Req) ->
 
 parse_changes_query(Req) ->
     lists:foldl(fun({Key, Value}, Args) ->
-        case {Key, Value} of
+        case {string:to_lower(Key), Value} of
         {"feed", _} ->
             Args#changes_args{feed=Value};
         {"descending", "true"} ->
             Args#changes_args{dir=rev};
         {"since", _} ->
             Args#changes_args{since=list_to_integer(Value)};
+        {"last-event-id", _} ->
+            Args#changes_args{since=list_to_integer(Value)};
         {"limit", _} ->
             Args#changes_args{limit=list_to_integer(Value)};
         {"style", _} ->


Mime
View raw message