couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r995528 - /couchdb/trunk/src/couchdb/couch_changes.erl
Date Thu, 09 Sep 2010 18:19:05 GMT
Author: fdmanana
Date: Thu Sep  9 18:19:05 2010
New Revision: 995528

URL: http://svn.apache.org/viewvc?rev=995528&view=rev
Log:
Refactor changes module to allow for accumulators with the callback (optional, doesn't break
public API).

Modified:
    couchdb/trunk/src/couchdb/couch_changes.erl

Modified: couchdb/trunk/src/couchdb/couch_changes.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_changes.erl?rev=995528&r1=995527&r2=995528&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_changes.erl (original)
+++ couchdb/trunk/src/couchdb/couch_changes.erl Thu Sep  9 18:19:05 2010
@@ -17,17 +17,18 @@
 
 %% @type Req -> #httpd{} | {json_req, JsonObj()}
 handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
-    Args = Args1#changes_args{filter=
-            make_filter_fun(Args1#changes_args.filter, Style, Req, Db)},
+    #changes_args{feed = Feed} = Args = Args1#changes_args{
+        filter = make_filter_fun(Args1#changes_args.filter, Style, Req, Db)
+    },
     StartSeq = case Args#changes_args.dir of
     rev ->
         couch_db:get_update_seq(Db);
     fwd ->
         Args#changes_args.since
     end,
-    if Args#changes_args.feed == "continuous" orelse
-        Args#changes_args.feed == "longpoll" ->
-        fun(Callback) ->
+    if Feed == "continuous" orelse Feed == "longpoll" ->
+        fun(CallbackAcc) ->
+            {Callback, UserAcc} = get_callback_acc(CallbackAcc),
             Self = self(),
             {ok, Notify} = couch_db_update_notifier:start_link(
                 fun({_, DbName}) when DbName == Db#db.name ->
@@ -36,12 +37,13 @@ handle_changes(#changes_args{style=Style
                     ok
                 end
             ),
-            start_sending_changes(Callback, Args#changes_args.feed),
+            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
             {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
             try
                 keep_sending_changes(
                     Args,
                     Callback,
+                    UserAcc2,
                     Db,
                     StartSeq,
                     <<"">>,
@@ -50,24 +52,31 @@ handle_changes(#changes_args{style=Style
                 )
             after
                 couch_db_update_notifier:stop(Notify),
-                get_rest_db_updated() % clean out any remaining update messages
+                get_rest_db_updated(ok) % clean out any remaining update messages
             end
         end;
     true ->
-        fun(Callback) ->
-            start_sending_changes(Callback, Args#changes_args.feed),
-            {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} =
+        fun(CallbackAcc) ->
+            {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+            {ok, {_, LastSeq, _Prepend, _, _, UserAcc3, _, _, _}} =
                 send_changes(
                     Args#changes_args{feed="normal"},
                     Callback,
+                    UserAcc2,
                     Db,
                     StartSeq,
-                    <<"">>
+                    <<>>
                 ),
-            end_sending_changes(Callback, LastSeq, Args#changes_args.feed)
+            end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
         end
     end.
 
+get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
+    Pair;
+get_callback_acc(Callback) when is_function(Callback, 2) ->
+    {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
+
 %% @type Req -> #httpd{} | {json_req, JsonObj()}
 make_filter_fun(FilterName, Style, Req, Db) ->
     case [list_to_binary(couch_httpd:unquote(Part))
@@ -128,21 +137,23 @@ get_changes_timeout(Args, Callback) ->
         infinity ->
             {infinity, fun() -> stop end};
         _ ->
-            {lists:min([DefaultTimeout, Timeout]), fun() -> stop end}
+            {lists:min([DefaultTimeout, Timeout]),
+                fun(UserAcc) -> {stop, UserAcc} end}
         end;
     true ->
-        {DefaultTimeout, fun() -> Callback(timeout, ResponseType), ok end};
+        {DefaultTimeout,
+            fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
     _ ->
         {lists:min([DefaultTimeout, Heartbeat]),
-            fun() -> Callback(timeout, ResponseType), ok end}
+            fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
     end.
 
-start_sending_changes(_Callback, "continuous") ->
-    ok;
-start_sending_changes(Callback, ResponseType) ->
-    Callback(start, ResponseType).
+start_sending_changes(_Callback, UserAcc, "continuous") ->
+    UserAcc;
+start_sending_changes(Callback, UserAcc, ResponseType) ->
+    Callback(start, ResponseType, UserAcc).
 
-send_changes(Args, Callback, Db, StartSeq, Prepend) ->
+send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend) ->
     #changes_args{
         style = Style,
         include_docs = IncludeDocs,
@@ -157,11 +168,11 @@ send_changes(Args, Callback, Db, StartSe
         StartSeq,
         fun changes_enumerator/2,
         [{dir, Dir}],
-        {Db, StartSeq, Prepend, FilterFun, Callback, ResponseType, Limit,
-            IncludeDocs}
+        {Db, StartSeq, Prepend, FilterFun, Callback, UserAcc, ResponseType,
+            Limit, IncludeDocs}
     ).
 
-keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
+keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout,
     TimeoutFun) ->
     #changes_args{
         feed = ResponseType,
@@ -169,16 +180,16 @@ keep_sending_changes(Args, Callback, Db,
         db_open_options = DbOptions
     } = Args,
     % ?LOG_INFO("send_changes start ~p",[StartSeq]),
-    {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(
-        Args#changes_args{dir=fwd}, Callback, Db, StartSeq, Prepend
+    {ok, {_, EndSeq, Prepend2, _, _, UserAcc2, _, NewLimit, _}} = send_changes(
+        Args#changes_args{dir=fwd}, Callback, UserAcc, Db, StartSeq, Prepend
     ),
     % ?LOG_INFO("send_changes last ~p",[EndSeq]),
     couch_db:close(Db),
     if Limit > NewLimit, ResponseType == "longpoll" ->
-        end_sending_changes(Callback, EndSeq, ResponseType);
+        end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
     true ->
-        case wait_db_updated(Timeout, TimeoutFun) of
-        updated ->
+        case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
+        {updated, UserAcc3} ->
             % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]),
             DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
             case couch_db:open(Db#db.name, DbOptions1) of
@@ -186,6 +197,7 @@ keep_sending_changes(Args, Callback, Db,
                 keep_sending_changes(
                     Args#changes_args{limit=NewLimit},
                     Callback,
+                    UserAcc3,
                     Db2,
                     EndSeq,
                     Prepend2,
@@ -193,19 +205,19 @@ keep_sending_changes(Args, Callback, Db,
                     TimeoutFun
                 );
             _Else ->
-                end_sending_changes(Callback, EndSeq, ResponseType)
+                end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
             end;
-        stop ->
+        {stop, UserAcc3} ->
             % ?LOG_INFO("wait_db_updated stop ~p",[{Db#db.name, EndSeq}]),
-            end_sending_changes(Callback, EndSeq, ResponseType)
+            end_sending_changes(Callback, UserAcc3, EndSeq, ResponseType)
         end
     end.
 
-end_sending_changes(Callback, EndSeq, ResponseType) ->
-    Callback({stop, EndSeq}, ResponseType).
+end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
+    Callback({stop, EndSeq}, ResponseType, UserAcc).
 
-changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous",
-    Limit, IncludeDocs}) ->
+changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, UserAcc,
+    "continuous", Limit, IncludeDocs}) ->
 
     #doc_info{id=Id, high_seq=Seq,
             revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo,
@@ -214,18 +226,18 @@ changes_enumerator(DocInfo, {Db, _, _, F
     Go = if Limit =< 1 -> stop; true -> ok end,
     case Results of
     [] ->
-        {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit,
+        {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc, "continuous", Limit,
                 IncludeDocs}
         };
     _ ->
         ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
-        Callback({change, ChangesRow, <<"">>}, "continuous"),
-        {Go, {Db, Seq, nil, FilterFun, Callback, "continuous",  Limit - 1,
-                IncludeDocs}
+        UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc),
+        {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc2, "continuous",
+                Limit - 1, IncludeDocs}
         }
     end;
-changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType,
-    Limit, IncludeDocs}) ->
+changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, UserAcc,
+    ResponseType, Limit, IncludeDocs}) ->
 
     #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}
         = DocInfo,
@@ -234,14 +246,14 @@ changes_enumerator(DocInfo, {Db, _, Prep
     Go = if Limit =< 1 -> stop; true -> ok end,
     case Results of
     [] ->
-        {Go, {Db, Seq, Prepend, FilterFun, Callback, ResponseType, Limit,
-                IncludeDocs}
+        {Go, {Db, Seq, Prepend, FilterFun, Callback, UserAcc, ResponseType,
+                Limit, IncludeDocs}
         };
     _ ->
         ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
-        Callback({change, ChangesRow, Prepend}, ResponseType),
-        {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, ResponseType, Limit -
1,
-                IncludeDocs}
+        UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+        {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, UserAcc2, ResponseType,
+                Limit - 1, IncludeDocs}
         }
     end.
 
@@ -257,16 +269,24 @@ deleted_item(true) -> [{<<"deleted">>, t
 deleted_item(_) -> [].
 
 % waits for a db_updated msg, if there are multiple msgs, collects them.
-wait_db_updated(Timeout, TimeoutFun) ->
-    receive db_updated -> get_rest_db_updated()
+wait_db_updated(Timeout, TimeoutFun, UserAcc) ->
+    receive
+    db_updated ->
+        get_rest_db_updated(UserAcc)
     after Timeout ->
-        case TimeoutFun() of
-        ok -> wait_db_updated(Timeout, TimeoutFun);
-        stop -> stop
+        {Go, UserAcc2} = TimeoutFun(UserAcc),
+        case Go of
+        ok ->
+            wait_db_updated(Timeout, TimeoutFun, UserAcc2);
+        stop ->
+            {stop, UserAcc2}
         end
     end.
 
-get_rest_db_updated() ->
-    receive db_updated -> get_rest_db_updated()
-    after 0 -> updated
+get_rest_db_updated(UserAcc) ->
+    receive
+    db_updated ->
+        get_rest_db_updated(UserAcc)
+    after 0 ->
+        {updated, UserAcc}
     end.



Mime
View raw message