couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bitdid...@apache.org
Subject git commit: Consume all ibrowse messages before stream_next
Date Wed, 31 Oct 2012 11:37:52 GMT
Updated Branches:
  refs/heads/master 88c52b232 -> 8ccf696f8


Consume all ibrowse messages before stream_next

The flow control in ibrowse's async response streams is tricky.  We call
stream_next to pull more data off the socket, but it seems that ibrowse
will sometimes split that data into multiple messages.  If we call
stream_next for each message we process we end up with an overflowing
mailbox.

This patch changes the consumer so that it clears out the mailbox before
calling stream_next.


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/8ccf696f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/8ccf696f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/8ccf696f

Branch: refs/heads/master
Commit: 8ccf696f833a0f0a453d733807356501ae5b355e
Parents: 88c52b2
Author: Adam Kocolosk <kocolosk@apache.org>
Authored: Wed Oct 31 07:35:08 2012 -0400
Committer: Bob Dionne <bob@cloudant.com>
Committed: Wed Oct 31 07:35:08 2012 -0400

----------------------------------------------------------------------
 .../src/couch_replicator_httpc.erl                 |   25 ++++++++++----
 1 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/8ccf696f/src/couch_replicator/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index 6804448..8773383 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -185,22 +185,33 @@ error_cause(Cause) ->
 
 
 stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
+    case accumulate_messages(ReqId, [], T + 500) of
+    {Data, ibrowse_async_response} ->
+        ibrowse:stream_next(ReqId),
+        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
+    {Data, ibrowse_async_response_end} ->
+        {Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
+    end.
+
+accumulate_messages(ReqId, Acc, Timeout) ->
     receive
     {ibrowse_async_response, ReqId, {error, Error}} ->
         throw({maybe_retry_req, Error});
     {ibrowse_async_response, ReqId, <<>>} ->
-        ibrowse:stream_next(ReqId),
-        stream_data_self(HttpDb, Params, Worker, ReqId, Cb);
+        accumulate_messages(ReqId, Acc, Timeout);
     {ibrowse_async_response, ReqId, Data} ->
-        ibrowse:stream_next(ReqId),
-        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
+        accumulate_messages(ReqId, [Data | Acc], 0);
     {ibrowse_async_response_end, ReqId} ->
-        {<<>>, fun() -> throw({maybe_retry_req, more_data_expected}) end}
-    after T + 500 ->
+        {iolist_to_binary(lists:reverse(Acc)), ibrowse_async_response_end}
+    after Timeout ->
         % Note: ibrowse should always reply with timeouts, but this doesn't
         % seem to be always true when there's a very high rate of requests
         % and many open connections.
-        throw({maybe_retry_req, timeout})
+        if Acc =:= [] ->
+            throw({maybe_retry_req, timeout});
+        true ->
+            {iolist_to_binary(lists:reverse(Acc)), ibrowse_async_response}
+        end
     end.
 
 


Mime
View raw message