couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [22/50] fabric commit: updated refs/heads/master to a71701c
Date Thu, 28 Aug 2014 12:20:57 GMT
Report number of pending changes in shard

Also refactor the _changes accumulator into a record.

BugzID: 24236


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/9de10969
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/9de10969
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/9de10969

Branch: refs/heads/master
Commit: 9de109699509ffb10987c3c190a0b2190efefb94
Parents: 161f088
Author: Adam Kocoloski <adam@cloudant.com>
Authored: Thu Oct 17 14:17:12 2013 -0400
Committer: Robert Newson <rnewson@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 47 +++++++++++++++++++++++++++++++++--------------
 1 file changed, 33 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9de10969/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index cbc913e..9b6c217 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -27,6 +27,14 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
+-record (cacc, {
+    db,
+    seq,
+    args,
+    options,
+    pending
+}).
+
 %% rpc endpoints
 %%  call to with_db will supply your M:F with a #db{} and then remaining args
 
@@ -44,11 +52,20 @@ changes(DbName, Options, StartVector, DbOptions) ->
         StartSeq = calculate_start_seq(Db, node(), StartVector),
         Enum = fun changes_enumerator/2,
         Opts = [{dir,Dir}],
-        Acc0 = {Db, StartSeq, Args, Options},
+        Acc0 = #cacc{
+          db = Db,
+          seq = StartSeq,
+          args = Args,
+          options = Options,
+          pending = couch_db:count_changes_since(Db, StartSeq)
+        },
         try
-            {ok, {_, LastSeq, _, _}} =
+            {ok, #cacc{seq=LastSeq, pending=Pending}} =
                 couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
-            rexi:reply({complete, [{seq, {LastSeq, uuid(Db)}}]})
+            rexi:reply({complete, [
+                {seq, {LastSeq, uuid(Db)}},
+                {pending, Pending}
+            ]})
         after
             couch_db:close(Db)
         end;
@@ -297,22 +314,24 @@ send(Key, Value, Acc) ->
         end
     end.
 
-changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq},
-        {Db, _OldSeq, Args, Options}) ->
-    {ok, {Db, Seq, Args, Options}};
-changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
-    #changes_args{
-        include_docs = IncludeDocs,
-        filter = Acc
-    } = Args,
+changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc)
->
+    {ok, Acc#cacc{seq = Seq, pending = Acc#cacc.pending-1}};
+changes_enumerator(DocInfo, Acc) ->
+    #cacc{
+        db = Db,
+        args = #changes_args{include_docs = IncludeDocs, filter = Filter},
+        options = Options,
+        pending = Pending
+    } = Acc,
     Conflicts = proplists:get_value(conflicts, Options, false),
     #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo,
-    case [X || X <- couch_changes:filter(Db, DocInfo, Acc), X /= null] of
+    case [X || X <- couch_changes:filter(Db, DocInfo, Filter), X /= null] of
     [] ->
-        {ok, {Db, Seq, Args, Options}};
+        {ok, Acc#cacc{seq = Seq, pending = Pending-1}};
     Results ->
         Opts = if Conflicts -> [conflicts]; true -> [] end,
         ChangesRow = {change, [
+	    {pending, Pending-1},
             {seq, {Seq, uuid(Db)}},
             {id, Id},
             {changes, Results},
@@ -320,7 +339,7 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
             if IncludeDocs -> [doc_member(Db, DocInfo, Opts)]; true -> [] end
         ]},
         Go = rexi:sync_reply(ChangesRow),
-        {Go, {Db, Seq, Args, Options}}
+        {Go, Acc#cacc{seq = Seq, pending = Pending-1}}
     end.
 
 doc_member(Shard, DocInfo, Opts) ->


Mime
View raw message