Return-Path: Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: (qmail 12308 invoked from network); 15 Oct 2010 18:16:34 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 15 Oct 2010 18:16:34 -0000 Received: (qmail 19531 invoked by uid 500); 15 Oct 2010 18:16:34 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 19410 invoked by uid 500); 15 Oct 2010 18:16:33 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 19402 invoked by uid 99); 15 Oct 2010 18:16:32 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Oct 2010 18:16:32 +0000 X-ASF-Spam-Status: No, hits=-1996.4 required=10.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Oct 2010 18:16:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 12D0823889E1; Fri, 15 Oct 2010 18:15:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1023054 - in /couchdb/branches/new_replicator/src/couchdb: couch_api_wrap.erl couch_api_wrap_httpc.erl Date: Fri, 15 Oct 2010 18:15:36 -0000 To: commits@couchdb.apache.org From: fdmanana@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101015181536.12D0823889E1@eris.apache.org> Author: fdmanana Date: Fri Oct 15 18:15:35 2010 New Revision: 1023054 URL: http://svn.apache.org/viewvc?rev=1023054&view=rev Log: New replicator: small improvements for error handling. Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1023054&r1=1023053&r2=1023054&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Fri Oct 15 18:15:35 2010 @@ -164,11 +164,12 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re _ -> ?JSON_ENCODE(couch_doc:revs_to_strs(Revs)) end, - Self = self(), QArgs = [ {"revs", "true"}, {"open_revs", RevStr} | options_to_query_args(Options, []) ], + TrapExit = element(2, erlang:process_info(self(), trap_exit)), + process_flag(trap_exit, true), Streamer = spawn_link(fun() -> send_req( HttpDb, @@ -180,10 +181,13 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re get_value("Content-Type", Headers), StreamDataFun, fun(Ev) -> mp_parse_mixed(Ev) end) - end), - unlink(Self) + end) end), - receive_docs(Streamer, Fun, Acc); + Result = receive_docs(Streamer, Fun, Acc), + unlink(Streamer), + receive {'EXIT', Streamer, _} -> ok after 0 -> ok end, + process_flag(trap_exit, TrapExit), + Result; open_doc_revs(Db, Id, Revs, Options, Fun, Acc) -> {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options), {ok, lists:foldl(Fun, Acc, Results)}. @@ -234,29 +238,26 @@ update_doc(#httpdb{} = HttpDb, #doc{id = false -> [] end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}], - Self = self(), - Ref = make_ref(), + TrapExit = element(2, erlang:process_info(self(), trap_exit)), + process_flag(trap_exit, true), DocStreamer = spawn_link(fun() -> couch_doc:doc_to_multi_part_stream( Boundary, JsonBytes, Doc#doc.atts, fun(Data) -> - receive {get_data, Ref, From} -> - From ! {data, Ref, Data} + receive {get_data, From} -> + From ! {data, Data} end - end, false), - unlink(Self) + end, false) end), SendFun = fun(0) -> eof; (LenLeft) when LenLeft > 0 -> - DocStreamer ! {get_data, Ref, self()}, - receive {data, Ref, Data} -> + DocStreamer ! {get_data, self()}, + receive {data, Data} -> {ok, Data, LenLeft - iolist_size(Data)} - after HttpDb#httpdb.timeout -> - http_request_failed end end, - send_req( + Result = send_req( HttpDb, [{method, put}, {path, encode_doc_id(DocId)}, {qs, QArgs}, {headers, Headers}, {body, {SendFun, Len}}], @@ -264,7 +265,11 @@ update_doc(#httpdb{} = HttpDb, #doc{id = {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))}; (_, _, {Props}) -> {error, get_value(<<"error">>, Props)} - end); + end), + process_flag(trap_exit, TrapExit), + unlink(DocStreamer), + receive {'EXIT', DocStreamer, _} -> ok after 0 -> ok end, + Result; update_doc(Db, Doc, Options, Type) -> try couch_db:update_doc(Db, Doc, Options, Type) Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=1023054&r1=1023053&r2=1023054&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Fri Oct 15 18:15:35 2010 @@ -36,7 +36,7 @@ send_req(#httpdb{headers = BaseHeaders} end, IbrowseOptions = [ {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout}, - {socket_options, [{reuseaddr, true}]} + {socket_options, [{reuseaddr, true}, {keepalive, true}]} | get_value(ibrowse_options, Params, []) ++ HttpDb#httpdb.proxy_options ], Headers2 = oauth_header(HttpDb, Params) ++ BaseHeaders ++ Headers, @@ -64,12 +64,12 @@ process_response(Resp, Worker, HttpDb, P end, Callback(Ok, Headers, EJson); R when R =:= 301 ; R =:= 302 -> - do_redirect(Headers, HttpDb, Params, Callback); + do_redirect(Worker, Headers, HttpDb, Params, Callback); Error -> report_error(nil, HttpDb, Params, {code, Error}) end; - {error, Reason} -> - report_error(nil, HttpDb, Params, {reason, Reason}) + Error -> + report_error(nil, HttpDb, Params, {error, Error}) end. @@ -85,13 +85,12 @@ process_stream_response(ReqId, Worker, H stop_worker(Worker), Ret; R when R =:= 301 ; R =:= 302 -> - stop_worker(Worker), - do_redirect(Headers, HttpDb, Params, Callback); + do_redirect(Worker, Headers, HttpDb, Params, Callback); Error -> report_error(Worker, HttpDb, Params, {code, Error}) - end - after HttpDb#httpdb.timeout -> - report_error(Worker, HttpDb, Params, timeout) + end; + {ibrowse_async_response, ReqId, {error, _} = Error} -> + report_error(Worker, HttpDb, Params, Error) end. @@ -106,6 +105,9 @@ stop_worker(Worker) when is_pid(Worker) report_error(Worker, #httpdb{timeout = Timeout} = HttpDb, Params, timeout) -> report_error(Worker, HttpDb, Params, {timeout, Timeout}); +report_error(Worker, #httpdb{timeout = T} = Db, Params, {error, req_timedout}) -> + report_error(Worker, Db, Params, {timeout, T}); + report_error(Worker, HttpDb, Params, Error) -> Method = string:to_upper(atom_to_list(get_value(method, Params, get))), Url = couch_util:url_strip_password(full_url(HttpDb, Params)), @@ -114,9 +116,9 @@ report_error(Worker, HttpDb, Params, Err exit({http_request_failed, Method, Url, Error}). -do_report_error(FullUrl, Method, {reason, Reason}) -> +do_report_error(FullUrl, Method, {error, Error}) -> ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~p", - [Method, FullUrl, Reason]); + [Method, FullUrl, Error]); do_report_error(Url, Method, {code, Code}) -> ?LOG_ERROR("Replicator, request ~s to ~p failed. The received " @@ -130,14 +132,14 @@ do_report_error(Url, Method, {timeout, T stream_data_self(HttpDb, Params, Worker, ReqId) -> ibrowse:stream_next(ReqId), receive - {ibrowse_async_response, ReqId, {error, Error}} -> - report_error(Worker, HttpDb, Params, {reason, Error}); + {ibrowse_async_response, ReqId, {error, _} = Error} -> + report_error(Worker, HttpDb, Params, {error, Error}); {ibrowse_async_response, ReqId, Data} -> {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end}; {ibrowse_async_response_end, ReqId} -> - {<<>>, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end} - after HttpDb#httpdb.timeout -> - report_error(Worker, HttpDb, Params, timeout) + {<<>>, fun() -> + report_error(Worker, HttpDb, Params, {error, more_data_expected}) + end} end. @@ -177,10 +179,11 @@ oauth_header(#httpdb{url = BaseUrl, oaut "OAuth " ++ oauth_uri:params_to_header_string(OAuthParams)}]. -do_redirect(RespHeaders, #httpdb{url = OrigUrl} = HttpDb, Params, Callback) -> - RedirectUrl = redirect_url(RespHeaders, OrigUrl), +do_redirect(Worker, RespHeaders, #httpdb{url = Url} = HttpDb, Params, Cb) -> + stop_worker(Worker), + RedirectUrl = redirect_url(RespHeaders, Url), {HttpDb2, Params2} = after_redirect(RedirectUrl, HttpDb, Params), - send_req(HttpDb2, Params2, Callback). + send_req(HttpDb2, Params2, Cb). redirect_url(RespHeaders, OrigUrl) ->