Retry when connection_closed is received during a streamed response The changes_reader uses a streamed response. During the stream, it's possible to receive a connection_closed error due to timeouts or network issues. We simply retry the request because for streamed responses a connection must be established first in order for the stream to begin. So if the network is truly down, the initial request will fail and the code path will go through the normal retry clause which decrements the number of retries. This way we won't be stuck forever if it's an actual network issue. BugzId: 70400 COUCHDB-3010 Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/2db0d7f2 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/2db0d7f2 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/2db0d7f2 Branch: refs/heads/master Commit: 2db0d7f2a697c7e4aae9ca0ccb1658591a7579c5 Parents: 80e9578 Author: Tony Sun Authored: Wed Jul 20 21:45:14 2016 -0700 Committer: Tony Sun Committed: Wed Sep 21 11:39:39 2016 -0700 ---------------------------------------------------------------------- src/couch_replicator_api_wrap.erl | 63 ++++++++++++++++------------ src/couch_replicator_changes_reader.erl | 5 ++- src/couch_replicator_httpc.erl | 3 ++ 3 files changed, 43 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2db0d7f2/src/couch_replicator_api_wrap.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl index ff6b00c..f22cac8 100644 --- a/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator_api_wrap.erl @@ -483,33 +483,42 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb, JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}), {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2} end, - send_req( - HttpDb, - [{method, Method}, {path, "_changes"}, {qs, QArgs}, - {headers, Headers}, {body, Body}, - {ibrowse_options, [{stream_to, {self(), once}}]}], - fun(200, _, DataStreamFun) -> - parse_changes_feed(Options, UserFun, DataStreamFun); - (405, _, _) when is_list(DocIds) -> - % CouchDB versions < 1.1.0 don't have the builtin _changes feed - % filter "_doc_ids" neither support POST - send_req(HttpDb, [{method, get}, {path, "_changes"}, - {qs, BaseQArgs}, {headers, Headers1}, - {ibrowse_options, [{stream_to, {self(), once}}]}], - fun(200, _, DataStreamFun2) -> - UserFun2 = fun(#doc_info{id = Id} = DocInfo) -> - case lists:member(Id, DocIds) of - true -> - UserFun(DocInfo); - false -> - ok - end; - (LastSeq) -> - UserFun(LastSeq) - end, - parse_changes_feed(Options, UserFun2, DataStreamFun2) - end) - end); + try + send_req( + HttpDb, + [{method, Method}, {path, "_changes"}, {qs, QArgs}, + {headers, Headers}, {body, Body}, + {ibrowse_options, [{stream_to, {self(), once}}]}], + fun(200, _, DataStreamFun) -> + parse_changes_feed(Options, UserFun, DataStreamFun); + (405, _, _) when is_list(DocIds) -> + % CouchDB versions < 1.1.0 don't have the builtin + % _changes feed filter "_doc_ids" neither support POST + send_req(HttpDb, [{method, get}, {path, "_changes"}, + {qs, BaseQArgs}, {headers, Headers1}, + {ibrowse_options, [{stream_to, {self(), once}}]}], + fun(200, _, DataStreamFun2) -> + UserFun2 = fun(#doc_info{id = Id} = DocInfo) -> + case lists:member(Id, DocIds) of + true -> + UserFun(DocInfo); + false -> + ok + end; + (LastSeq) -> + UserFun(LastSeq) + end, + parse_changes_feed(Options, UserFun2, + DataStreamFun2) + end) + end) + catch + exit:{http_request_failed, _, _, {error, {connection_closed, + mid_stream}}} -> + throw(retry_no_limit); + exit:{http_request_failed, _, _, _} = Error -> + throw({retry_limit, Error}) + end; changes_since(Db, Style, StartSeq, UserFun, Options) -> DocIds = get_value(doc_ids, Options), Selector = get_value(selector, Options), http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2db0d7f2/src/couch_replicator_changes_reader.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl index b7d18e0..bed318a 100644 --- a/src/couch_replicator_changes_reader.erl +++ b/src/couch_replicator_changes_reader.erl @@ -52,7 +52,10 @@ read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) -> throw:recurse -> LS = get(last_seq), read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1); - exit:{http_request_failed, _, _, _} = Error -> + throw:retry_no_limit -> + LS = get(last_seq), + read_changes(Parent, LS, Db, ChangesQueue, Options, Ts); + throw:{retry_limit, Error} -> couch_stats:increment_counter( [couch_replicator, changes_read_failures] ), http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2db0d7f2/src/couch_replicator_httpc.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl index 9a4cca4..366c325 100644 --- a/src/couch_replicator_httpc.erl +++ b/src/couch_replicator_httpc.erl @@ -180,6 +180,9 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> Ret = Callback(Ok, Headers, StreamDataFun), Ret catch + throw:{maybe_retry_req, connection_closed} -> + maybe_retry({connection_closed, mid_stream}, + Worker, HttpDb, Params); throw:{maybe_retry_req, Err} -> maybe_retry(Err, Worker, HttpDb, Params) end;