couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [12/15] git commit: Consume all ibrowse messages before stream_next
Date Thu, 08 Nov 2012 22:44:11 GMT
Consume all ibrowse messages before stream_next

The flow control in ibrowse's async response streams is tricky.  We call
stream_next to pull more data off the socket, but it seems that ibrowse
will sometimes split that data into multiple messages.  If we call
stream_next for each message we process we end up with an overflowing
mailbox.

This patch changes the consumer so that it clears out the mailbox before
calling stream_next.


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/0c110e94
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/0c110e94
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/0c110e94

Branch: refs/heads/431-feature_cors
Commit: 0c110e94f8e4f3039fad5e98a76f190ada69e936
Parents: 789d26c
Author: Adam Kocolosk <kocolosk@apache.org>
Authored: Wed Oct 31 07:35:08 2012 -0400
Committer: Jan Lehnardt <jan@apache.org>
Committed: Thu Nov 8 23:37:33 2012 +0100

----------------------------------------------------------------------
 .../src/couch_replicator_httpc.erl                 |   25 ++++++++++----
 1 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/0c110e94/src/couch_replicator/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index 6804448..8773383 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -185,22 +185,33 @@ error_cause(Cause) ->
 
 
 stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
+    case accumulate_messages(ReqId, [], T + 500) of
+    {Data, ibrowse_async_response} ->
+        ibrowse:stream_next(ReqId),
+        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
+    {Data, ibrowse_async_response_end} ->
+        {Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
+    end.
+
+accumulate_messages(ReqId, Acc, Timeout) ->
     receive
     {ibrowse_async_response, ReqId, {error, Error}} ->
         throw({maybe_retry_req, Error});
     {ibrowse_async_response, ReqId, <<>>} ->
-        ibrowse:stream_next(ReqId),
-        stream_data_self(HttpDb, Params, Worker, ReqId, Cb);
+        accumulate_messages(ReqId, Acc, Timeout);
     {ibrowse_async_response, ReqId, Data} ->
-        ibrowse:stream_next(ReqId),
-        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
+        accumulate_messages(ReqId, [Data | Acc], 0);
     {ibrowse_async_response_end, ReqId} ->
-        {<<>>, fun() -> throw({maybe_retry_req, more_data_expected}) end}
-    after T + 500 ->
+        {iolist_to_binary(lists:reverse(Acc)), ibrowse_async_response_end}
+    after Timeout ->
         % Note: ibrowse should always reply with timeouts, but this doesn't
         % seem to be always true when there's a very high rate of requests
         % and many open connections.
-        throw({maybe_retry_req, timeout})
+        if Acc =:= [] ->
+            throw({maybe_retry_req, timeout});
+        true ->
+            {iolist_to_binary(lists:reverse(Acc)), ibrowse_async_response}
+        end
     end.
 
 


Mime
View raw message