Return-Path: Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: (qmail 32993 invoked from network); 15 Aug 2010 17:02:39 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 15 Aug 2010 17:02:39 -0000 Received: (qmail 18681 invoked by uid 500); 15 Aug 2010 17:02:39 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 18562 invoked by uid 500); 15 Aug 2010 17:02:38 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 18555 invoked by uid 99); 15 Aug 2010 17:02:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 15 Aug 2010 17:02:38 +0000 X-ASF-Spam-Status: No, hits=-1996.4 required=10.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 15 Aug 2010 17:02:36 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2E5492388A3B; Sun, 15 Aug 2010 17:01:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r985712 - in /couchdb/branches/new_replicator/src/couchdb: Makefile.am couch_api_wrap.erl couch_api_wrap.hrl couch_api_wrap_httpc.erl couch_db.hrl couch_replicate.erl Date: Sun, 15 Aug 2010 17:01:19 -0000 To: commits@couchdb.apache.org From: fdmanana@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100815170119.2E5492388A3B@eris.apache.org> Author: fdmanana Date: Sun Aug 15 17:01:18 2010 New Revision: 985712 URL: http://svn.apache.org/viewvc?rev=985712&view=rev Log: New replicator: 1) refactored couch_api_wrap to move HTTP request handling (through ibrowse) into a separate module; 2) added HTTP error handling (including handling redirect responses); 3) added missing error handling conditions in the replicator gen_server Added: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl couchdb/branches/new_replicator/src/couchdb/couch_db.hrl couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/Makefile.am?rev=985712&r1=985711&r2=985712&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/Makefile.am (original) +++ couchdb/branches/new_replicator/src/couchdb/Makefile.am Sun Aug 15 17:01:18 2010 @@ -83,6 +83,7 @@ source_files = \ couch_replication_notifier.erl \ couch_httpd_rep.erl \ couch_api_wrap.erl \ + couch_api_wrap_httpc.erl \ json_stream_parse.erl @@ -146,6 +147,7 @@ compiled_files = \ couch_replication_notifier.beam \ couch_httpd_rep.beam \ couch_api_wrap.beam \ + couch_api_wrap_httpc.beam \ json_stream_parse.beam # doc_base = \ Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=985712&r1=985711&r2=985712&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Sun Aug 15 17:01:18 2010 @@ -47,25 +47,29 @@ changes_since/5 ]). +-import(couch_api_wrap_httpc, [ + httpdb_setup/1, + send_req/3 + ]). + + db_open(Db, Options) -> db_open(Db, Options, false). db_open(#httpdb{} = Db, _Options, Create) -> - #httpdb{url=Url, oauth=OAuth, headers=Headers} = Db, - Headers2 = oauth_header(Url, [], put, OAuth) ++ Headers, + {ok, Db2} = httpdb_setup(Db), case Create of false -> ok; true -> - catch ibrowse:send_req(Url, Headers2, put) + send_req(Db2, [{method, put}], fun(_, _, _) -> ok end) end, - case (catch ibrowse:send_req(Url, Headers2, head)) of - {ok, "200", _, _} -> - {ok, Db}; - {ok, _Code, _, _} -> - % TODO deal with HTTP redirects - throw({db_not_found, ?l2b(Url)}) - end; + send_req(Db2, [{method, head}], + fun(200, _, _) -> + {ok, Db2}; + (_, _, _) -> + throw({db_not_found, ?l2b(Db2#httpdb.url)}) + end); db_open(DbName, Options, Create) -> case Create of false -> @@ -91,15 +95,11 @@ db_close(DbName) -> couch_db:close(DbName). -get_db_info(#httpdb{url=Url,oauth=OAuth,headers=Headers}) -> - Headers2 = oauth_header(Url, [], get, OAuth) ++ Headers, - case ibrowse:send_req(Url, Headers2, get, [], [ - {response_format,binary} - ], infinity) of - {ok, "200", _RespHeaders, Body} -> - {Props} = ?JSON_DECODE(Body), - {ok, [{couch_util:to_existing_atom(K), V} || {K, V} <- Props]} - end; +get_db_info(#httpdb{} = Db) -> + send_req(Db, [], + fun(200, _, {Props}) -> + {ok, [{couch_util:to_existing_atom(K), V} || {K, V} <- Props]} + end); get_db_info(Db) -> couch_db:get_db_info(Db). @@ -108,51 +108,41 @@ update_doc(Db, Doc, Options) -> update_doc(Db,Doc,Options,interactive_edit). -ensure_full_commit(#httpdb{url=Url, oauth=OAuth, headers=Headers}) -> - Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers, - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), - case ibrowse:send_req_direct(Worker, Url ++ "_ensure_full_commit", Headers2, - post, [], [{response_format, binary}], infinity) of - {ok, "201", _RespHeaders, Body} -> - catch ibrowse:stop_worker_process(Worker), - {Props} = ?JSON_DECODE(Body), - {ok, couch_util:get_value(<<"instance_start_time">>, Props)} - end; +ensure_full_commit(#httpdb{} = Db) -> + send_req( + Db, + [{method, post}, {path, "_ensure_full_commit"}, {direct, true}], + fun(201, _, {Props}) -> + {ok, couch_util:get_value(<<"instance_start_time">>, Props)} + end); ensure_full_commit(Db) -> couch_db:ensure_full_commit(Db). -get_missing_revs(#httpdb{url=Url, oauth=OAuth, headers=Headers}, IdRevs) -> - Json = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs], - Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers, - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), - case ibrowse:send_req_direct(Worker, Url ++ "_revs_diff", Headers2, post, - ?JSON_ENCODE({Json}), [{response_format, binary}], infinity) of - {ok, "200", _RespHeaders, Body} -> - catch ibrowse:stop_worker_process(Worker), - {JsonResults} = ?JSON_DECODE(Body), - ConvertToNativeFun = fun({Id, {Result}}) -> - { - Id, - couch_doc:parse_revs( +get_missing_revs(#httpdb{} = Db, IdRevs) -> + JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]}, + send_req( + Db, + [{method, post}, {path, "_revs_diff"}, {direct, true}, + {body, ?JSON_ENCODE(JsonBody)}], + fun(200, _, {Props}) -> + ConvertToNativeFun = fun({Id, {Result}}) -> + MissingRevs = couch_doc:parse_revs( couch_util:get_value(<<"missing">>, Result) ), - couch_doc:parse_revs( + PossibleAncestors = couch_doc:parse_revs( couch_util:get_value(<<"possible_ancestors">>, Result, []) - ) - } - end, - {ok, lists:map(ConvertToNativeFun, JsonResults)} - end; + ), + {Id, MissingRevs, PossibleAncestors} + end, + {ok, lists:map(ConvertToNativeFun, Props)} + end); get_missing_revs(Db, IdRevs) -> couch_db:get_missing_revs(Db, IdRevs). open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> - #httpdb{url=Url, oauth=OAuth, headers=Headers} = HttpDb, Self = self(), QArgs = [ {"revs", "true"}, @@ -165,26 +155,19 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re _ -> couch_util:url_encode(Id) end, - Headers2 = oauth_header(Url ++ IdEncoded, QArgs, get, OAuth) ++ - [{"accept", "multipart/mixed"} | Headers], Streamer = spawn_link(fun() -> - FullUrl = Url ++ IdEncoded ++ query_args_to_string(QArgs, []), - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), - {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, FullUrl, - Headers2, get, [], - [{response_format, binary}, {stream_to, {self(), once}}], - infinity), - - receive - {ibrowse_async_headers, ReqId, "200", RespHeaders} -> - CType = couch_util:get_value("Content-Type", RespHeaders), - couch_httpd:parse_multipart_request( - CType, - fun() -> stream_data_self(ReqId) end, - fun(Ev) -> mp_parse_mixed(Ev) end) - end, - catch ibrowse:stop_worker_process(Worker), + send_req( + HttpDb, + [{path, IdEncoded}, {qs, QArgs}, {direct, true}, + {ibrowse_options, [{stream_to, {self(), once}}]}, + {headers, [{"accept", "multipart/mixed"}]}], + fun(200, Headers, StreamDataFun) -> + CType = couch_util:get_value("Content-Type", Headers), + couch_httpd:parse_multipart_request( + CType, + StreamDataFun, + fun(Ev) -> mp_parse_mixed(Ev) end) + end), unlink(Self) end), receive_docs(Streamer, Fun, Acc); @@ -196,7 +179,6 @@ open_doc(Db, Id, Options, Fun) -> Fun(open_doc(Db, Id, Options)). open_doc(#httpdb{} = HttpDb, Id, Options) -> - #httpdb{url=Url, oauth=OAuth, headers=Headers} = HttpDb, QArgs = [ {"attachments", "true"}, {"revs", "true"} | @@ -208,44 +190,36 @@ open_doc(#httpdb{} = HttpDb, Id, Options _ -> couch_util:url_encode(Id) end, - Headers2 = oauth_header(Url ++ IdEncoded, QArgs, get, OAuth) ++ - [{"accept", "application/json, multipart/related"} | Headers], Self = self(), Streamer = spawn_link(fun() -> - FullUrl = Url ++ IdEncoded ++ query_args_to_string(QArgs, []), - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), - {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, FullUrl, - Headers2, get, [], - [{response_format, binary}, {stream_to, {self(), once}}], - infinity), - - receive - {ibrowse_async_headers, ReqId, Code, RespHeaders} -> - CType = couch_util:get_value("Content-Type", RespHeaders), - Self ! {self(), CType}, - case CType of - "application/json" -> - receive - {get, From} -> - EJson = json_stream_parse:to_ejson( - fun() -> stream_data_self(ReqId) end), - case Code of - "200" -> - Doc = couch_doc:from_json_obj(EJson), - From ! {data, self(), Doc}; - _ErrorCode -> - From ! {data, self(), EJson} - end - end; - "multipart/related;" ++ _ -> - couch_httpd:parse_multipart_request( - CType, - fun() -> stream_data_self(ReqId) end, - fun(Ev)-> couch_doc:mp_parse_doc(Ev, []) end) - end - end, - catch ibrowse:stop_worker_process(Worker), + send_req( + HttpDb, + [{headers, [{"accept", "application/json, multipart/related"}]}, + {path, IdEncoded}, {qs, QArgs}, {direct, true}, + {ibrowse_options, [{stream_to, {self(), once}}]}], + fun(Code, Headers, StreamDataFun) -> + CType = couch_util:get_value("Content-Type", Headers), + Self ! {self(), CType}, + case CType of + "application/json" -> + receive + {get, From} -> + EJson = json_stream_parse:to_ejson(StreamDataFun), + case Code of + 200 -> + Doc = couch_doc:from_json_obj(EJson), + From ! {data, self(), Doc}; + _ErrorCode -> + From ! {data, self(), EJson} + end + end; + "multipart/related;" ++ _ -> + couch_httpd:parse_multipart_request( + CType, + StreamDataFun, + fun(Ev)-> couch_doc:mp_parse_doc(Ev, []) end) + end + end), unlink(Self) end), receive @@ -264,91 +238,70 @@ open_doc(Db, Id, Options) -> couch_db:open_doc(Db, Id, Options). -update_doc(#httpdb{} = HttpDb, Doc, Options, Type) -> - #httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb, +update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) -> QArgs = case Type of replicated_changes -> [{"new_edits", "false"}]; _ -> [] end ++ options_to_query_args(Options, []), - Boundary = couch_uuids:random(), JsonBytes = ?JSON_ENCODE( couch_doc:to_json_obj(Doc, [revs, attachments, follows | Options])), {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary, JsonBytes, Doc#doc.atts, false), - Self = self(), - Headers2 = case lists:member(delay_commit, Options) of + Headers = case lists:member(delay_commit, Options) of true -> [{"X-Couch-Full-Commit", "false"}]; false -> [] - end ++ [{"Content-Type", ?b2l(ContentType)}] ++ - oauth_header(Url, QArgs, put, OAuth) ++ Headers, - + end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}], + Self = self(), Ref = make_ref(), - % this streams the doc data to the ibrowse requester DocStreamer = spawn_link(fun() -> - couch_doc:doc_to_multi_part_stream(Boundary, - JsonBytes, Doc#doc.atts, + couch_doc:doc_to_multi_part_stream( + Boundary, JsonBytes, Doc#doc.atts, fun(Data) -> - receive {get_data, Ref, Pid} -> - Pid ! {data, Ref, Data} + receive {get_data, Ref, From} -> + From ! {data, Ref, Data} end - end, - false), - unlink(Self) + end, false), + unlink(Self) end), - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), SendFun = fun(0) -> eof; (LenLeft) when LenLeft > 0 -> DocStreamer ! {get_data, Ref, self()}, receive {data, Ref, Data} -> {ok, Data, LenLeft - iolist_size(Data)} + after HttpDb#httpdb.timeout -> + http_request_failed end end, - case ibrowse:send_req_direct(Worker, - Url ++ couch_util:url_encode(Doc#doc.id) ++ - query_args_to_string(QArgs, []), [{"Content-Length", Len} | Headers2], - put, {SendFun, Len}, [], infinity) of - {ok, [$2,$0, _], _RespHeaders, Body} -> - catch ibrowse:stop_worker_process(Worker), - {Props} = ?JSON_DECODE(Body), - {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))} - end; + send_req( + HttpDb, + [{method, put}, {path, couch_util:url_encode(DocId)}, {direct, true}, + {qs, QArgs}, {headers, Headers}, {body, {SendFun, Len}}], + fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 -> + {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))} + end); update_doc(Db, Doc, Options, Type) -> couch_db:update_doc(Db, Doc, Options, Type). changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Options) -> - #httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb, - Url2 = Url ++ "_changes", QArgs = changes_q_args( [{"style", atom_to_list(Style)}, {"since", integer_to_list(StartSeq)}], Options), - Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers, - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), - {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port), - - {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, - Url2 ++ query_args_to_string(QArgs, ""), Headers2, get, [], - [{response_format, binary}, {stream_to, {self(), once}}], infinity), - - DataFun = fun() -> - receive {ibrowse_async_headers, ReqId, "200", _Headers} -> - stream_data_self(ReqId) - end - end, - EventFun = fun(Ev) -> - changes_ev1(Ev, fun(DocInfo, _Acc) -> UserFun(DocInfo) end, []) - end, - try - json_stream_parse:events(DataFun, EventFun) - after - catch ibrowse:stop_worker_process(Worker) - end; + send_req( + HttpDb, + [{path, "_changes"}, {qs, QArgs}, {direct, true}, + {ibrowse_options, [{stream_to, {self(), once}}]}], + fun(200, _, DataStreamFun) -> + EventFun = fun(Ev) -> + changes_ev1(Ev, fun(DocInfo, _Acc) -> UserFun(DocInfo) end, []) + end, + json_stream_parse:events(DataStreamFun, EventFun) + end); changes_since(Db, Style, StartSeq, UserFun, Options) -> Args = #changes_args{ style = Style, @@ -426,12 +379,6 @@ options_to_query_args([{atts_since, Poss AncestorsJson = ?JSON_ENCODE(couch_doc:revs_to_strs(PossibleAncestors)), options_to_query_args(Rest, [{"atts_since", AncestorsJson} | Acc]). -query_args_to_string([], []) -> - ""; -query_args_to_string([], Acc) -> - "?" ++ string:join(lists:reverse(Acc), "&"); -query_args_to_string([{K, V} | Rest], Acc) -> - query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]). receive_docs(Streamer, UserFun, UserAcc) -> Streamer ! {get_headers, self()}, @@ -510,14 +457,6 @@ mp_parse_mixed(body_end) -> mp_parse_mixed(Next) end. -stream_data_self(ReqId) -> - ibrowse:stream_next(ReqId), - receive {ibrowse_async_response, ReqId, Data} -> - {Data, fun() -> stream_data_self(ReqId) end}; - {ibrowse_async_response_end, ReqId} -> - {<<>>, fun() -> stream_data_self(ReqId) end} - end. - changes_ev1(object_start, UserFun, UserAcc) -> fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end. @@ -555,24 +494,3 @@ json_to_doc_info({Props}) -> #rev_info{rev=Rev, deleted=Del} end, Changes), #doc_info{id=Id, high_seq=Seq, revs=RevsInfo}. - -oauth_header(_Url, _QS, _Action, nil) -> - []; -oauth_header(Url, QS, Action, OAuth) -> - Consumer = { - OAuth#oauth.consumer_key, - OAuth#oauth.consumer_secret, - OAuth#oauth.signature_method - }, - Method = case Action of - get -> "GET"; - post -> "POST"; - put -> "PUT"; - head -> "HEAD" - end, - Params = oauth:signed_params(Method, Url, QS, Consumer, - #oauth.token, - #oauth.token_secret), - [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}]. - - Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl?rev=985712&r1=985711&r2=985712&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl Sun Aug 15 17:01:18 2010 @@ -14,8 +14,9 @@ -record(httpdb, { url, - oauth=nil, - headers = [] + oauth = nil, + headers = [], + timeout = 30000 % milliseconds }). -record(oauth, { @@ -24,4 +25,4 @@ token_secret, consumer_secret, signature_method -}). \ No newline at end of file +}). Added: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=985712&view=auto ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (added) +++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Sun Aug 15 17:01:18 2010 @@ -0,0 +1,226 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_api_wrap_httpc). + +-include("couch_db.hrl"). +-include("couch_api_wrap.hrl"). +-include("../ibrowse/ibrowse.hrl"). + +-export([httpdb_setup/1]). +-export([send_req/3]). + + +httpdb_setup(#httpdb{url = Url} = Db) -> + #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), + ibrowse:set_max_sessions(Host, Port, max_sessions()), + ibrowse:set_max_pipeline_size(Host, Port, pipeline_size()), + {ok, Db}. + + +pipeline_size() -> + ?l2i(couch_config:get("replicator", "max_http_pipeline_size", "10")). + +max_sessions() -> + ?l2i(couch_config:get("replicator", "max_http_sessions", "10")). + + +send_req(HttpDb, Params, Callback) -> + #httpdb{headers = BaseHeaders, oauth = OAuth} = HttpDb, + Method = couch_util:get_value(method, Params, get), + Headers = couch_util:get_value(headers, Params, []), + Body = couch_util:get_value(body, Params, []), + IbrowseOptions = [ + {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} + | couch_util:get_value(ibrowse_options, Params, []) + ], + Url = full_url(HttpDb, Params), + Headers2 = oauth_header(Url, [], Method, OAuth) ++ BaseHeaders ++ Headers, + {Response, Worker} = case couch_util:get_value(direct, Params, false) of + false -> + Resp = ibrowse:send_req( + Url, Headers2, Method, Body, IbrowseOptions, infinity), + {Resp, nil}; + true -> + #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), + {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port), + Resp = ibrowse:send_req_direct( + Conn, Url, Headers2, Method, Body, IbrowseOptions, infinity), + {Resp, Conn} + end, + process_response(Response, Worker, HttpDb, Params, Callback). + + +process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) -> + process_stream_response(ReqId, Worker, HttpDb, Params, Callback); + +process_response(Resp, Worker, HttpDb, Params, Callback) -> + stop_worker(Worker), + case Resp of + {ok, Code, Headers, Body} -> + case ?l2i(Code) of + Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) -> + EJson = case Body of + <<>> -> + null; + Json -> + ?JSON_DECODE(Json) + end, + Callback(Ok, Headers, EJson); + R when R =:= 301 ; R =:= 302 -> + do_redirect(Headers, HttpDb, Params, Callback); + Error -> + report_error(nil, HttpDb, Params, {code, Error}) + end; + {error, Reason} -> + report_error(nil, HttpDb, Params, {reason, Reason}) + end. + + +process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> + receive + {ibrowse_async_headers, ReqId, Code, Headers} -> + case ?l2i(Code) of + Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) -> + StreamDataFun = fun() -> + stream_data_self(HttpDb, Params, Worker, ReqId) + end, + Ret = Callback(Ok, Headers, StreamDataFun), + stop_worker(Worker), + Ret; + R when R =:= 301 ; R =:= 302 -> + stop_worker(Worker), + do_redirect(Headers, HttpDb, Params, Callback); + Error -> + report_error(Worker, HttpDb, Params, {code, Error}) + end + after HttpDb#httpdb.timeout -> + report_error(Worker, HttpDb, Params, timeout) + end. + + +stop_worker(nil) -> + ok; +stop_worker(Worker) when is_pid(Worker) -> + unlink(Worker), + receive {'EXIT', Worker, _} -> ok after 0 -> ok end, + catch ibrowse:stop_worker_process(Worker). + + +report_error(Worker, #httpdb{timeout = Timeout} = HttpDb, Params, timeout) -> + report_error(Worker, HttpDb, Params, {timeout, Timeout}); + +report_error(Worker, HttpDb, Params, Error) -> + Method = string:to_upper( + atom_to_list(couch_util:get_value(method, Params, get))), + Url = strip_creds(full_url(HttpDb, Params)), + do_report_error(Url, Method, Error), + stop_worker(Worker), + exit({http_request_failed, Method, Url}). + + +do_report_error(FullUrl, Method, {reason, Reason}) -> + ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~p", + [Method, FullUrl, Reason]); + +do_report_error(Url, Method, {code, Code}) -> + ?LOG_ERROR("Replicator, request ~s to ~p failed. The received " + "HTTP error code is ~p", [Method, Url, Code]); + +do_report_error(Url, Method, {timeout, Timeout}) -> + ?LOG_ERROR("Replicator, request ~s to ~p failed. Inactivity timeout " + " (~p milliseconds).", [Method, Url, Timeout]). + + +stream_data_self(HttpDb, Params, Worker, ReqId) -> + ibrowse:stream_next(ReqId), + receive {ibrowse_async_response, ReqId, Data} -> + {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end}; + {ibrowse_async_response_end, ReqId} -> + {<<>>, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end} + after HttpDb#httpdb.timeout -> + report_error(Worker, HttpDb, Params, timeout) + end. + + +full_url(#httpdb{url = BaseUrl}, Params) -> + Path = couch_util:get_value(path, Params, []), + QueryArgs = couch_util:get_value(qs, Params, []), + BaseUrl ++ Path ++ query_args_to_string(QueryArgs, []). + + +strip_creds(Url) -> + re:replace(Url, + "http(s)?://([^:]+):[^@]+@(.*)$", + "http\\1://\\2:*****@\\3", + [{return, list}]). + + +query_args_to_string([], []) -> + ""; +query_args_to_string([], Acc) -> + "?" ++ string:join(lists:reverse(Acc), "&"); +query_args_to_string([{K, V} | Rest], Acc) -> + query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]). + + +oauth_header(_Url, _QS, _Action, nil) -> + []; +oauth_header(Url, QS, Action, OAuth) -> + Consumer = { + OAuth#oauth.consumer_key, + OAuth#oauth.consumer_secret, + OAuth#oauth.signature_method + }, + Method = case Action of + get -> "GET"; + post -> "POST"; + put -> "PUT"; + head -> "HEAD" + end, + Params = oauth:signed_params(Method, Url, QS, Consumer, + #oauth.token, + #oauth.token_secret), + [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}]. + + +do_redirect(RespHeaders, #httpdb{url = OrigUrl} = HttpDb, Params, Callback) -> + RedirectUrl = redirect_url(RespHeaders, OrigUrl), + {HttpDb2, Params2} = after_redirect(RedirectUrl, HttpDb, Params), + send_req(HttpDb2, Params2, Callback). + + +redirect_url(RespHeaders, OrigUrl) -> + MochiHeaders = mochiweb_headers:make(RespHeaders), + RedUrl = mochiweb_headers:get_value("Location", MochiHeaders), + #url{ + host = Base, + port = Port, + path = Path, % includes query string + protocol = Proto + } = ibrowse_lib:parse_url(RedUrl), + #url{ + username = User, + password = Passwd + } = ibrowse_lib:parse_url(OrigUrl), + Creds = case is_list(User) andalso is_list(Passwd) of + true -> + User ++ ":" ++ Passwd ++ "@"; + false -> + [] + end, + atom_to_list(Proto) ++ "://" ++ Creds ++ Base ++ ":" ++ + integer_to_list(Port) ++ Path. + +after_redirect(RedirectUrl, HttpDb, Params) -> + Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)), + {HttpDb#httpdb{url = RedirectUrl}, Params2}. Modified: couchdb/branches/new_replicator/src/couchdb/couch_db.hrl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_db.hrl?rev=985712&r1=985711&r2=985712&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_db.hrl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_db.hrl Sun Aug 15 17:01:18 2010 @@ -23,6 +23,7 @@ -define(b2a(V), list_to_atom(binary_to_list(V))). -define(b2l(V), binary_to_list(V)). -define(l2b(V), list_to_binary(V)). +-define(l2i(V), list_to_integer(V)). -define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>). Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=985712&r1=985711&r2=985712&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Sun Aug 15 17:01:18 2010 @@ -23,6 +23,10 @@ -include("couch_db.hrl"). -include("couch_api_wrap.hrl"). +% Can't be greater than the maximum number of child restarts specified +% in couch_rep_sup.erl. +-define(MAX_RESTARTS, 3). + -record(stats, { missing_checked = 0, @@ -124,14 +128,22 @@ rep_result_listener(RepId) -> wait_for_result(RepId, Listener) -> - Result = receive + wait_for_result(RepId, Listener, ?MAX_RESTARTS). + +wait_for_result(RepId, Listener, RetriesLeft) -> + receive {finished, RepId, RepResult} -> + couch_replication_notifier:stop(Listener), {ok, RepResult}; {error, RepId, Reason} -> - {error, Reason} - end, - couch_replication_notifier:stop(Listener), - Result. + case RetriesLeft > 0 of + true -> + wait_for_result(RepId, Listener, RetriesLeft - 1); + false -> + couch_replication_notifier:stop(Listener), + {error, Reason} + end + end. end_replication({BaseId, Extension}) -> @@ -242,12 +254,6 @@ handle_info({add_stat, {StatPos, Val}}, NewStats = setelement(StatPos, Stats, Stat + Val), {noreply, State#rep_state{stats = NewStats}}; -handle_info(timed_checkpoint, State) -> - State2 = do_checkpoint(State), - NewTimer = erlang:start_timer(checkpoint_interval(State2), - self(), timed_checkpoint), - {noreply, State2#rep_state{timer = NewTimer}}; - handle_info(done, #rep_state{seqs_in_progress = SeqsInProgress} = State) -> % This means all the worker processes have completed their work. % Assert that all the seqs have been processed @@ -260,6 +266,7 @@ handle_info({'EXIT', Pid, normal}, #rep_ {noreply, State}; handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) -> + cancel_timer(State), ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]), {stop, changes_reader_died, State}; @@ -267,6 +274,7 @@ handle_info({'EXIT', Pid, normal}, #rep_ {noreply, St}; handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_finder=Pid} = St) -> + cancel_timer(St), ?LOG_ERROR("MissingRevsFinder process died with reason: ~p", [Reason]), {stop, missing_revs_finder_died, St}; @@ -274,6 +282,7 @@ handle_info({'EXIT', Pid, normal}, #rep_ {noreply, State}; handle_info({'EXIT', Pid, Reason}, #rep_state{doc_copier=Pid} = State) -> + cancel_timer(State), ?LOG_ERROR("DocCopier process died with reason: ~p", [Reason]), {stop, doc_copier_died, State}; @@ -281,6 +290,7 @@ handle_info({'EXIT', Pid, normal}, #rep_ {noreply, St}; handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_queue=Pid} = St) -> + cancel_timer(St), ?LOG_ERROR("MissingRevsQueue process died with reason: ~p", [Reason]), {stop, missing_revs_queue_died, St}; @@ -288,6 +298,7 @@ handle_info({'EXIT', Pid, normal}, #rep_ {noreply, State}; handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) -> + cancel_timer(State), ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]), {stop, changes_queue_died, State}. @@ -297,6 +308,10 @@ handle_call(Msg, _From, State) -> {stop, unexpected_sync_message, State}. +handle_cast(checkpoint, State) -> + State2 = do_checkpoint(State), + {noreply, State2#rep_state{timer = start_timer(State)}}; + handle_cast(Msg, State) -> ?LOG_ERROR("Replicator received an unexpected asynchronous call: ~p", [Msg]), {stop, unexpected_async_message, State}. @@ -319,23 +334,31 @@ terminate(Reason, #rep_state{rep_id = Re couch_replication_notifier:notify({error, RepId, Reason}). -terminate_cleanup(State) -> - #rep_state{ - missing_revs_queue = MissingRevsQueue, - changes_queue = ChangesQueue, - source = Source, - target = Target - } = State, - couch_work_queue:close(MissingRevsQueue), - couch_work_queue:close(ChangesQueue), +terminate_cleanup(#rep_state{source = Source, target = Target}) -> couch_api_wrap:db_close(Source), couch_api_wrap:db_close(Target). +start_timer(#rep_state{rep_options = Options} = State) -> + case couch_util:get_value(doc_ids, Options) of + undefined -> + After = checkpoint_interval(State), + case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of + {ok, Ref} -> + Ref; + Error -> + ?LOG_ERROR("Replicator, error scheduling checkpoint: ~p", [Error]), + nil + end; + _DocIdList -> + nil + end. + + cancel_timer(#rep_state{timer = nil}) -> ok; cancel_timer(#rep_state{timer = Timer}) -> - erlang:cancel_timer(Timer). + {ok, cancel} = timer:cancel(Timer). get_result(#rep_state{stats = Stats, rep_options = Options} = State) -> @@ -390,15 +413,7 @@ init_state({BaseId, _Ext} = RepId, Src, src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo) }, - State#rep_state{ - timer = case couch_util:get_value(doc_ids, Options) of - undefined -> - erlang:start_timer(checkpoint_interval(State), - self(), timed_checkpoint); - _DocIdList -> - nil - end - }. + State#rep_state{timer = start_timer(State)}. spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue, Options) -> @@ -598,6 +613,10 @@ commit_to_both(Source, Target) -> SourceStartTime = receive {SrcCommitPid, {ok, Timestamp}} -> + receive + {'EXIT', SrcCommitPid, normal} -> + ok + end, Timestamp; {'EXIT', SrcCommitPid, _} -> exit(replication_link_failure)