couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bitdid...@apache.org
Subject [2/2] git commit: Ensure heartbeats are not skipped
Date Mon, 28 Nov 2011 11:50:57 GMT
Ensure heartbeats are not skipped

In both the normal and continuous feeds filter functions may return
false. This patch ensures heartbeats are not skipped.

Jira-1289


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/bcbcb427
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/bcbcb427
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/bcbcb427

Branch: refs/heads/master
Commit: bcbcb4270665c2bfb620435b1fa736bc337c619d
Parents: ffd7112
Author: Bob Dionne <bitdiddle@apache.org>
Authored: Fri Nov 25 10:44:15 2011 -0500
Committer: Bob Dionne <bitdiddle@apache.org>
Committed: Mon Nov 28 06:13:14 2011 -0500

----------------------------------------------------------------------
 src/couchdb/couch_changes.erl |  139 ++++++++++++++++++++++++------------
 1 files changed, 92 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/bcbcb427/src/couchdb/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl
index 267f3d7..6858d77 100644
--- a/src/couchdb/couch_changes.erl
+++ b/src/couchdb/couch_changes.erl
@@ -29,7 +29,9 @@
     resp_type,
     limit,
     include_docs,
-    conflicts
+    conflicts,
+    timeout,
+    timeout_fun
 }).
 
 %% @type Req -> #httpd{} | {json_req, JsonObj()}
@@ -49,6 +51,14 @@ handle_changes(Args1, Req, Db) ->
     fwd ->
         Since
     end,
+    % begin timer to deal with heartbeat when filter function fails
+    case Args#changes_args.heartbeat of
+    undefined ->
+        erlang:erase(last_changes_heartbeat);
+    Val when is_integer(Val); Val =:= true ->
+        put(last_changes_heartbeat, now())
+    end,
+
     if Feed == "continuous" orelse Feed == "longpoll" ->
         fun(CallbackAcc) ->
             {Callback, UserAcc} = get_callback_acc(CallbackAcc),
@@ -62,16 +72,12 @@ handle_changes(Args1, Req, Db) ->
             ),
             UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
             {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+            Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
+                             <<"">>, Timeout, TimeoutFun),
             try
                 keep_sending_changes(
-                    Args,
-                    Callback,
-                    UserAcc2,
-                    Db,
-                    StartSeq,
-                    <<"">>,
-                    Timeout,
-                    TimeoutFun,
+                    Args#changes_args{dir=fwd},
+                    Acc0,
                     true)
             after
                 couch_db_update_notifier:stop(Notify),
@@ -82,14 +88,13 @@ handle_changes(Args1, Req, Db) ->
         fun(CallbackAcc) ->
             {Callback, UserAcc} = get_callback_acc(CallbackAcc),
             UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+            {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+            Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
+                             UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun),
             {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
                 send_changes(
                     Args#changes_args{feed="normal"},
-                    Callback,
-                    UserAcc2,
-                    Db,
-                    StartSeq,
-                    <<>>,
+                    Acc0,
                     true),
             end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
         end
@@ -255,18 +260,15 @@ start_sending_changes(_Callback, UserAcc, "continuous") ->
 start_sending_changes(Callback, UserAcc, ResponseType) ->
     Callback(start, ResponseType, UserAcc).
 
-send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, FirstRound) ->
+build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) ->
     #changes_args{
         include_docs = IncludeDocs,
         conflicts = Conflicts,
         limit = Limit,
         feed = ResponseType,
-        dir = Dir,
-        filter = FilterName,
-        filter_args = FilterArgs,
         filter_fun = FilterFun
     } = Args,
-    Acc0 = #changes_acc{
+    #changes_acc{
         db = Db,
         seq = StartSeq,
         prepend = Prepend,
@@ -276,8 +278,21 @@ send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, FirstRound)
->
         resp_type = ResponseType,
         limit = Limit,
         include_docs = IncludeDocs,
-        conflicts = Conflicts
-    },
+        conflicts = Conflicts,
+        timeout = Timeout,
+        timeout_fun = TimeoutFun
+    }.
+
+send_changes(Args, Acc0, FirstRound) ->
+    #changes_args{
+        dir = Dir,
+        filter = FilterName,
+        filter_args = FilterArgs
+    } = Args,
+    #changes_acc{
+        db = Db,
+        seq = StartSeq
+    } = Acc0,
     case FirstRound of
     true ->
         case FilterName of
@@ -367,8 +382,7 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
     end.
 
 
-keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout,
-    TimeoutFun, FirstRound) ->
+keep_sending_changes(Args, Acc0, FirstRound) ->
     #changes_args{
         feed = ResponseType,
         limit = Limit,
@@ -377,13 +391,10 @@ keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend,
Timeout,
 
     {ok, ChangesAcc} = send_changes(
         Args#changes_args{dir=fwd},
-        Callback,
-        UserAcc,
-        Db,
-        StartSeq,
-        Prepend,
+        Acc0,
         FirstRound),
     #changes_acc{
+        db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun,
         seq = EndSeq, prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit
     } = ChangesAcc,
 
@@ -392,28 +403,25 @@ keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend,
Timeout,
         end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
     true ->
         case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
-        {updated, UserAcc3} ->
-            % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]),
+        {updated, UserAcc4} ->
             DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
             case couch_db:open(Db#db.name, DbOptions1) of
             {ok, Db2} ->
                 keep_sending_changes(
-                    Args#changes_args{limit=NewLimit},
-                    Callback,
-                    UserAcc3,
-                    Db2,
-                    EndSeq,
-                    Prepend2,
-                    Timeout,
-                    TimeoutFun,
-                    false
-                );
+                  Args#changes_args{limit=NewLimit},
+                  ChangesAcc#changes_acc{
+                    db = Db2,
+                    user_acc = UserAcc4,
+                    seq = EndSeq,
+                    prepend = Prepend2,
+                    timeout = Timeout,
+                    timeout_fun = TimeoutFun},
+                  false);
             _Else ->
                 end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
             end;
-        {stop, UserAcc3} ->
-            % ?LOG_INFO("wait_db_updated stop ~p",[{Db#db.name, EndSeq}]),
-            end_sending_changes(Callback, UserAcc3, EndSeq, ResponseType)
+        {stop, UserAcc4} ->
+            end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
         end
     end.
 
@@ -423,24 +431,34 @@ end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
 changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) ->
     #changes_acc{
         filter = FilterFun, callback = Callback,
-        user_acc = UserAcc, limit = Limit, db = Db
+        user_acc = UserAcc, limit = Limit, db = Db,
+        timeout = Timeout, timeout_fun = TimeoutFun
     } = Acc,
     #doc_info{high_seq = Seq} = DocInfo,
     Results0 = FilterFun(Db, DocInfo),
     Results = [Result || Result <- Results0, Result /= null],
+    %% TODO: I'm thinking this should be < 1 and not =< 1
     Go = if Limit =< 1 -> stop; true -> ok end,
     case Results of
     [] ->
-        {Go, Acc#changes_acc{seq = Seq}};
+        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+        case Done of
+        stop ->
+            {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+        ok ->
+            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+        end;
     _ ->
         ChangesRow = changes_row(Results, DocInfo, Acc),
         UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc),
+        reset_heartbeat(),
         {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}
     end;
 changes_enumerator(DocInfo, Acc) ->
     #changes_acc{
         filter = FilterFun, callback = Callback, prepend = Prepend,
-        user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db
+        user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+        timeout = Timeout, timeout_fun = TimeoutFun
     } = Acc,
     #doc_info{high_seq = Seq} = DocInfo,
     Results0 = FilterFun(Db, DocInfo),
@@ -448,10 +466,17 @@ changes_enumerator(DocInfo, Acc) ->
     Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
     case Results of
     [] ->
-        {Go, Acc#changes_acc{seq = Seq}};
+        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+        case Done of
+        stop ->
+            {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+        ok ->
+            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+        end;
     _ ->
         ChangesRow = changes_row(Results, DocInfo, Acc),
         UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+        reset_heartbeat(),
         {Go, Acc#changes_acc{
             seq = Seq, prepend = <<",\n">>,
             user_acc = UserAcc2, limit = Limit - 1}}
@@ -504,3 +529,23 @@ get_rest_db_updated(UserAcc) ->
     after 0 ->
         {updated, UserAcc}
     end.
+
+reset_heartbeat() ->
+    put(last_changes_heartbeat,now()).
+
+maybe_heartbeat(Timeout, TimeoutFun, Acc) ->
+    Now = now(),
+    Before = get(last_changes_heartbeat),
+    case Before of
+    undefined ->
+        {ok, Acc};
+    _ ->
+        case timer:now_diff(Now, Before) div 1000 >= Timeout of
+        true ->
+            Acc2 = TimeoutFun(Acc),
+            put(last_changes_heartbeat, Now),
+            Acc2;
+        false ->
+            {ok, Acc}
+        end
+    end.


Mime
View raw message