couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r985712 - in /couchdb/branches/new_replicator/src/couchdb: Makefile.am couch_api_wrap.erl couch_api_wrap.hrl couch_api_wrap_httpc.erl couch_db.hrl couch_replicate.erl
Date Sun, 15 Aug 2010 17:01:19 GMT
Author: fdmanana
Date: Sun Aug 15 17:01:18 2010
New Revision: 985712

URL: http://svn.apache.org/viewvc?rev=985712&view=rev
Log:
New replicator:
1) refactored couch_api_wrap to move HTTP request handling (through ibrowse) into a separate
module;
2) added HTTP error handling (including handling redirect responses);
3) added missing error handling conditions in the replicator gen_server


Added:
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
Modified:
    couchdb/branches/new_replicator/src/couchdb/Makefile.am
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
    couchdb/branches/new_replicator/src/couchdb/couch_db.hrl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl

Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/Makefile.am?rev=985712&r1=985711&r2=985712&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/Makefile.am (original)
+++ couchdb/branches/new_replicator/src/couchdb/Makefile.am Sun Aug 15 17:01:18 2010
@@ -83,6 +83,7 @@ source_files = \
     couch_replication_notifier.erl \
     couch_httpd_rep.erl \
     couch_api_wrap.erl \
+    couch_api_wrap_httpc.erl \
     json_stream_parse.erl
 
 
@@ -146,6 +147,7 @@ compiled_files = \
     couch_replication_notifier.beam \
     couch_httpd_rep.beam \
     couch_api_wrap.beam \
+    couch_api_wrap_httpc.beam \
     json_stream_parse.beam
 
 # doc_base = \

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=985712&r1=985711&r2=985712&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Sun Aug 15 17:01:18 2010
@@ -47,25 +47,29 @@
     changes_since/5
     ]).
 
+-import(couch_api_wrap_httpc, [
+    httpdb_setup/1,
+    send_req/3
+    ]).
+
+
 db_open(Db, Options) ->
     db_open(Db, Options, false).
 
 db_open(#httpdb{} = Db, _Options, Create) ->
-    #httpdb{url=Url, oauth=OAuth, headers=Headers} = Db,
-    Headers2 = oauth_header(Url, [], put, OAuth) ++ Headers,
+    {ok, Db2} = httpdb_setup(Db),
     case Create of
     false ->
         ok;
     true ->
-        catch ibrowse:send_req(Url, Headers2, put)
+        send_req(Db2, [{method, put}], fun(_, _, _) -> ok end)
     end,
-    case (catch ibrowse:send_req(Url, Headers2, head)) of
-    {ok, "200", _, _} ->
-        {ok, Db};
-    {ok, _Code, _, _} ->
-        % TODO deal with HTTP redirects
-        throw({db_not_found, ?l2b(Url)})
-    end;
+    send_req(Db2, [{method, head}],
+        fun(200, _, _) ->
+            {ok, Db2};
+        (_, _, _) ->
+            throw({db_not_found, ?l2b(Db2#httpdb.url)})
+        end);
 db_open(DbName, Options, Create) ->
     case Create of
     false ->
@@ -91,15 +95,11 @@ db_close(DbName) ->
     couch_db:close(DbName).
 
 
-get_db_info(#httpdb{url=Url,oauth=OAuth,headers=Headers}) ->
-    Headers2 = oauth_header(Url, [], get, OAuth) ++ Headers,
-    case ibrowse:send_req(Url, Headers2, get, [], [ 
-           {response_format,binary}
-           ], infinity) of
-    {ok, "200", _RespHeaders, Body} ->
-        {Props} = ?JSON_DECODE(Body),
-        {ok, [{couch_util:to_existing_atom(K), V} || {K, V} <- Props]}
-    end;
+get_db_info(#httpdb{} = Db) ->
+    send_req(Db, [],
+        fun(200, _, {Props}) ->
+            {ok, [{couch_util:to_existing_atom(K), V} || {K, V} <- Props]}
+        end);
 get_db_info(Db) ->
     couch_db:get_db_info(Db).
 
@@ -108,51 +108,41 @@ update_doc(Db, Doc, Options) ->
     update_doc(Db,Doc,Options,interactive_edit).
 
 
-ensure_full_commit(#httpdb{url=Url, oauth=OAuth, headers=Headers}) ->
-    Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers,
-    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
-    case ibrowse:send_req_direct(Worker, Url ++ "_ensure_full_commit", Headers2,
-            post, [], [{response_format, binary}], infinity) of
-    {ok, "201", _RespHeaders, Body} ->
-        catch ibrowse:stop_worker_process(Worker),
-        {Props} = ?JSON_DECODE(Body),
-       {ok, couch_util:get_value(<<"instance_start_time">>, Props)}
-    end;
+ensure_full_commit(#httpdb{} = Db) ->
+    send_req(
+        Db,
+        [{method, post}, {path, "_ensure_full_commit"}, {direct, true}],
+        fun(201, _, {Props}) ->
+            {ok, couch_util:get_value(<<"instance_start_time">>, Props)}
+        end);
 ensure_full_commit(Db) ->
     couch_db:ensure_full_commit(Db).
 
 
-get_missing_revs(#httpdb{url=Url, oauth=OAuth, headers=Headers}, IdRevs) ->
-    Json = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs],
-    Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers,
-    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
-    case ibrowse:send_req_direct(Worker, Url ++ "_revs_diff", Headers2, post,
-            ?JSON_ENCODE({Json}), [{response_format, binary}], infinity) of
-    {ok, "200", _RespHeaders, Body} ->
-        catch ibrowse:stop_worker_process(Worker),
-        {JsonResults} = ?JSON_DECODE(Body),
-        ConvertToNativeFun = fun({Id, {Result}}) ->
-            {
-                Id,
-                couch_doc:parse_revs(
+get_missing_revs(#httpdb{} = Db, IdRevs) ->
+    JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]},
+    send_req(
+        Db,
+        [{method, post}, {path, "_revs_diff"}, {direct, true},
+            {body, ?JSON_ENCODE(JsonBody)}],
+        fun(200, _, {Props}) ->
+            ConvertToNativeFun = fun({Id, {Result}}) ->
+                MissingRevs = couch_doc:parse_revs(
                     couch_util:get_value(<<"missing">>, Result)
                 ),
-                couch_doc:parse_revs(
+                PossibleAncestors = couch_doc:parse_revs(
                     couch_util:get_value(<<"possible_ancestors">>, Result, [])
-                )
-            }
-        end,
-        {ok, lists:map(ConvertToNativeFun, JsonResults)}
-    end;
+                ),
+                {Id, MissingRevs, PossibleAncestors}
+            end,
+            {ok, lists:map(ConvertToNativeFun, Props)}
+        end);
 get_missing_revs(Db, IdRevs) ->
     couch_db:get_missing_revs(Db, IdRevs).
 
 
 
 open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
-    #httpdb{url=Url, oauth=OAuth, headers=Headers} = HttpDb,
     Self = self(),
     QArgs = [
         {"revs", "true"},
@@ -165,26 +155,19 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re
     _ ->
         couch_util:url_encode(Id)
     end,
-    Headers2 = oauth_header(Url ++ IdEncoded, QArgs, get, OAuth) ++
-        [{"accept", "multipart/mixed"} | Headers],
     Streamer = spawn_link(fun() ->
-            FullUrl = Url ++ IdEncoded ++ query_args_to_string(QArgs, []),
-            #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
-            {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
-            {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, FullUrl,
-                Headers2, get, [],
-                [{response_format, binary}, {stream_to, {self(), once}}],
-                infinity),
-
-            receive
-            {ibrowse_async_headers, ReqId, "200", RespHeaders} ->
-                CType = couch_util:get_value("Content-Type", RespHeaders),
-                couch_httpd:parse_multipart_request(
-                    CType,
-                    fun() -> stream_data_self(ReqId) end,
-                    fun(Ev) -> mp_parse_mixed(Ev) end)
-            end,
-            catch ibrowse:stop_worker_process(Worker),
+            send_req(
+                HttpDb,
+                [{path, IdEncoded}, {qs, QArgs}, {direct, true},
+                    {ibrowse_options, [{stream_to, {self(), once}}]},
+                    {headers, [{"accept", "multipart/mixed"}]}],
+                fun(200, Headers, StreamDataFun) ->
+                    CType = couch_util:get_value("Content-Type", Headers),
+                    couch_httpd:parse_multipart_request(
+                        CType,
+                        StreamDataFun,
+                        fun(Ev) -> mp_parse_mixed(Ev) end)
+                end),
             unlink(Self)
         end),
     receive_docs(Streamer, Fun, Acc);
@@ -196,7 +179,6 @@ open_doc(Db, Id, Options, Fun) ->
     Fun(open_doc(Db, Id, Options)).
 
 open_doc(#httpdb{} = HttpDb, Id, Options) ->
-    #httpdb{url=Url, oauth=OAuth, headers=Headers} = HttpDb,
     QArgs = [
         {"attachments", "true"},
         {"revs", "true"} |
@@ -208,44 +190,36 @@ open_doc(#httpdb{} = HttpDb, Id, Options
     _ ->
         couch_util:url_encode(Id)
     end,
-    Headers2 = oauth_header(Url ++ IdEncoded, QArgs, get, OAuth) ++
-        [{"accept", "application/json, multipart/related"} | Headers],
     Self = self(),
     Streamer = spawn_link(fun() ->
-            FullUrl = Url ++ IdEncoded ++ query_args_to_string(QArgs, []),
-            #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
-            {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
-            {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, FullUrl,
-                Headers2, get, [],
-                [{response_format, binary}, {stream_to, {self(), once}}],
-                infinity),
-
-            receive
-            {ibrowse_async_headers, ReqId, Code, RespHeaders} ->
-                CType = couch_util:get_value("Content-Type", RespHeaders),
-                Self ! {self(), CType},
-                case CType of
-                "application/json" ->
-                    receive
-                    {get, From} ->
-                        EJson = json_stream_parse:to_ejson(
-                            fun() -> stream_data_self(ReqId) end),
-                        case Code of
-                        "200" ->
-                            Doc = couch_doc:from_json_obj(EJson),
-                            From ! {data, self(), Doc};
-                        _ErrorCode ->
-                            From ! {data, self(), EJson}
-                         end
-                    end;
-                "multipart/related;" ++ _ ->
-                    couch_httpd:parse_multipart_request(
-                        CType,
-                        fun() -> stream_data_self(ReqId) end,
-                        fun(Ev)-> couch_doc:mp_parse_doc(Ev, []) end)
-                end
-            end,
-            catch ibrowse:stop_worker_process(Worker),
+            send_req(
+                HttpDb,
+                [{headers, [{"accept", "application/json, multipart/related"}]},
+                    {path, IdEncoded}, {qs, QArgs}, {direct, true},
+                    {ibrowse_options, [{stream_to, {self(), once}}]}],
+                fun(Code, Headers, StreamDataFun) ->
+                    CType = couch_util:get_value("Content-Type", Headers),
+                    Self ! {self(), CType},
+                    case CType of
+                    "application/json" ->
+                        receive
+                        {get, From} ->
+                            EJson = json_stream_parse:to_ejson(StreamDataFun),
+                            case Code of
+                            200 ->
+                                Doc = couch_doc:from_json_obj(EJson),
+                                From ! {data, self(), Doc};
+                            _ErrorCode ->
+                                From ! {data, self(), EJson}
+                            end
+                        end;
+                    "multipart/related;" ++ _ ->
+                        couch_httpd:parse_multipart_request(
+                            CType,
+                            StreamDataFun,
+                            fun(Ev)-> couch_doc:mp_parse_doc(Ev, []) end)
+                    end
+                end),
             unlink(Self)
         end),
     receive
@@ -264,91 +238,70 @@ open_doc(Db, Id, Options) ->
     couch_db:open_doc(Db, Id, Options).
 
 
-update_doc(#httpdb{} = HttpDb, Doc, Options, Type) ->
-    #httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb,
+update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
     QArgs = case Type of
     replicated_changes ->
         [{"new_edits", "false"}];
     _ ->
         []
     end ++ options_to_query_args(Options, []),
-
     Boundary = couch_uuids:random(),
     JsonBytes = ?JSON_ENCODE(
         couch_doc:to_json_obj(Doc, [revs, attachments, follows | Options])),
     {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
         JsonBytes, Doc#doc.atts, false),
-    Self = self(),
-    Headers2 = case lists:member(delay_commit, Options) of 
+    Headers = case lists:member(delay_commit, Options) of
     true ->
         [{"X-Couch-Full-Commit", "false"}];
     false ->
         []
-    end ++ [{"Content-Type", ?b2l(ContentType)}] ++
-        oauth_header(Url, QArgs, put, OAuth) ++ Headers,
-
+    end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
+    Self = self(),
     Ref = make_ref(),
-    % this streams the doc data to the ibrowse requester
     DocStreamer = spawn_link(fun() ->
-        couch_doc:doc_to_multi_part_stream(Boundary,
-            JsonBytes, Doc#doc.atts,
+        couch_doc:doc_to_multi_part_stream(
+            Boundary, JsonBytes, Doc#doc.atts,
             fun(Data) ->
-                receive {get_data, Ref, Pid} ->
-                    Pid ! {data, Ref, Data}
+                receive {get_data, Ref, From} ->
+                    From ! {data, Ref, Data}
                 end
-            end,
-            false),
-            unlink(Self)
+            end, false),
+        unlink(Self)
     end),
-    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
     SendFun = fun(0) ->
             eof;
         (LenLeft) when LenLeft > 0 ->
             DocStreamer ! {get_data, Ref, self()},
             receive {data, Ref, Data} ->
                 {ok, Data, LenLeft - iolist_size(Data)}
+            after HttpDb#httpdb.timeout ->
+                http_request_failed
             end
     end,
-    case ibrowse:send_req_direct(Worker,
-        Url ++ couch_util:url_encode(Doc#doc.id) ++
-        query_args_to_string(QArgs, []), [{"Content-Length", Len} | Headers2],
-        put, {SendFun, Len}, [], infinity) of
-    {ok, [$2,$0, _], _RespHeaders, Body} ->
-        catch ibrowse:stop_worker_process(Worker),
-        {Props} = ?JSON_DECODE(Body),
-        {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))}
-    end;
+    send_req(
+        HttpDb,
+        [{method, put}, {path, couch_util:url_encode(DocId)}, {direct, true},
+            {qs, QArgs}, {headers, Headers}, {body, {SendFun, Len}}],
+        fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->
+            {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))}
+        end);
 update_doc(Db, Doc, Options, Type) ->
     couch_db:update_doc(Db, Doc, Options, Type).
 
 changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Options) ->
-    #httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb,
-    Url2 = Url ++ "_changes",
     QArgs = changes_q_args(
         [{"style", atom_to_list(Style)}, {"since", integer_to_list(StartSeq)}],
         Options),
-    Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,        
-    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
-
-    {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker,
-        Url2 ++ query_args_to_string(QArgs, ""), Headers2, get, [],
-        [{response_format, binary}, {stream_to, {self(), once}}], infinity),
-
-    DataFun = fun() ->
-        receive {ibrowse_async_headers, ReqId, "200", _Headers} ->
-            stream_data_self(ReqId)
-        end
-    end,
-    EventFun = fun(Ev) ->
-        changes_ev1(Ev, fun(DocInfo, _Acc) -> UserFun(DocInfo) end, [])
-    end,
-    try
-        json_stream_parse:events(DataFun, EventFun)
-    after
-        catch ibrowse:stop_worker_process(Worker)
-    end;
+    send_req(
+        HttpDb,
+        [{path, "_changes"}, {qs, QArgs}, {direct, true},
+            {ibrowse_options, [{stream_to, {self(), once}}]}],
+        fun(200, _, DataStreamFun) ->
+            EventFun = fun(Ev) ->
+                changes_ev1(Ev, fun(DocInfo, _Acc) -> UserFun(DocInfo) end, [])
+            end,
+            json_stream_parse:events(DataStreamFun, EventFun)
+        end);
 changes_since(Db, Style, StartSeq, UserFun, Options) ->
     Args = #changes_args{
         style = Style,
@@ -426,12 +379,6 @@ options_to_query_args([{atts_since, Poss
     AncestorsJson = ?JSON_ENCODE(couch_doc:revs_to_strs(PossibleAncestors)),
     options_to_query_args(Rest, [{"atts_since", AncestorsJson} | Acc]).
 
-query_args_to_string([], []) ->
-    "";
-query_args_to_string([], Acc) ->
-    "?" ++ string:join(lists:reverse(Acc), "&");
-query_args_to_string([{K, V} | Rest], Acc) ->
-    query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]).
 
 receive_docs(Streamer, UserFun, UserAcc) ->
     Streamer ! {get_headers, self()},
@@ -510,14 +457,6 @@ mp_parse_mixed(body_end) ->
         mp_parse_mixed(Next)
     end.
 
-stream_data_self(ReqId) ->
-    ibrowse:stream_next(ReqId),
-    receive {ibrowse_async_response, ReqId, Data} ->
-        {Data, fun() -> stream_data_self(ReqId) end};
-    {ibrowse_async_response_end, ReqId} ->
-        {<<>>, fun() -> stream_data_self(ReqId) end}
-    end.
-
 changes_ev1(object_start, UserFun, UserAcc) ->
     fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
 
@@ -555,24 +494,3 @@ json_to_doc_info({Props}) ->
             #rev_info{rev=Rev, deleted=Del}
         end, Changes),
     #doc_info{id=Id, high_seq=Seq, revs=RevsInfo}.
-
-oauth_header(_Url, _QS, _Action, nil) ->
-    [];
-oauth_header(Url, QS, Action, OAuth) ->
-    Consumer = {
-        OAuth#oauth.consumer_key,
-        OAuth#oauth.consumer_secret,
-        OAuth#oauth.signature_method
-    },
-    Method = case Action of
-    get -> "GET";
-    post -> "POST";
-    put -> "PUT";
-    head -> "HEAD"
-    end,
-    Params = oauth:signed_params(Method, Url, QS, Consumer, 
-        #oauth.token,
-        #oauth.token_secret),
-    [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}].
-
-

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl?rev=985712&r1=985711&r2=985712&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl Sun Aug 15 17:01:18 2010
@@ -14,8 +14,9 @@
 
 -record(httpdb, {
     url,
-    oauth=nil,
-    headers = []
+    oauth = nil,
+    headers = [],
+    timeout = 30000  % milliseconds
 }).
 
 -record(oauth, {
@@ -24,4 +25,4 @@
     token_secret,
     consumer_secret,
     signature_method
-}).
\ No newline at end of file
+}).

Added: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=985712&view=auto
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (added)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Sun Aug 15 17:01:18
2010
@@ -0,0 +1,226 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_api_wrap_httpc).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-export([httpdb_setup/1]).
+-export([send_req/3]).
+
+
+httpdb_setup(#httpdb{url = Url} = Db) ->
+    #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
+    ibrowse:set_max_sessions(Host, Port, max_sessions()),
+    ibrowse:set_max_pipeline_size(Host, Port, pipeline_size()),
+    {ok, Db}.
+
+
+pipeline_size() ->
+    ?l2i(couch_config:get("replicator", "max_http_pipeline_size", "10")).
+
+max_sessions() ->
+    ?l2i(couch_config:get("replicator", "max_http_sessions", "10")).
+
+
+send_req(HttpDb, Params, Callback) ->
+    #httpdb{headers = BaseHeaders, oauth = OAuth} = HttpDb,
+    Method = couch_util:get_value(method, Params, get),
+    Headers = couch_util:get_value(headers, Params, []),
+    Body = couch_util:get_value(body, Params, []),
+    IbrowseOptions = [
+        {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout}
+        | couch_util:get_value(ibrowse_options, Params, [])
+    ],
+    Url = full_url(HttpDb, Params),
+    Headers2 = oauth_header(Url, [], Method, OAuth) ++ BaseHeaders ++ Headers,
+    {Response, Worker} = case couch_util:get_value(direct, Params, false) of
+    false ->
+        Resp = ibrowse:send_req(
+            Url, Headers2, Method, Body, IbrowseOptions, infinity),
+        {Resp, nil};
+    true ->
+        #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
+        {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
+        Resp = ibrowse:send_req_direct(
+            Conn, Url, Headers2, Method, Body, IbrowseOptions, infinity),
+        {Resp, Conn}
+    end,
+    process_response(Response, Worker, HttpDb, Params, Callback).
+
+
+process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
+    process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
+
+process_response(Resp, Worker, HttpDb, Params, Callback) ->
+    stop_worker(Worker),
+    case Resp of
+    {ok, Code, Headers, Body} ->
+        case ?l2i(Code) of
+        Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
+            EJson = case Body of
+            <<>> ->
+                null;
+            Json ->
+                ?JSON_DECODE(Json)
+            end,
+            Callback(Ok, Headers, EJson);
+        R when R =:= 301 ; R =:= 302 ->
+            do_redirect(Headers, HttpDb, Params, Callback);
+        Error ->
+            report_error(nil, HttpDb, Params, {code, Error})
+        end;
+    {error, Reason} ->
+        report_error(nil, HttpDb, Params, {reason, Reason})
+    end.
+
+
+process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
+    receive
+    {ibrowse_async_headers, ReqId, Code, Headers} ->
+        case ?l2i(Code) of
+        Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
+            StreamDataFun = fun() ->
+                stream_data_self(HttpDb, Params, Worker, ReqId)
+            end,
+            Ret = Callback(Ok, Headers, StreamDataFun),
+            stop_worker(Worker),
+            Ret;
+        R when R =:= 301 ; R =:= 302 ->
+            stop_worker(Worker),
+            do_redirect(Headers, HttpDb, Params, Callback);
+        Error ->
+            report_error(Worker, HttpDb, Params, {code, Error})
+        end
+    after HttpDb#httpdb.timeout ->
+        report_error(Worker, HttpDb, Params, timeout)
+    end.
+
+
+stop_worker(nil) ->
+    ok;
+stop_worker(Worker) when is_pid(Worker) ->
+    unlink(Worker),
+    receive {'EXIT', Worker, _} -> ok after 0 -> ok end,
+    catch ibrowse:stop_worker_process(Worker).
+
+
+report_error(Worker, #httpdb{timeout = Timeout} = HttpDb, Params, timeout) ->
+    report_error(Worker, HttpDb, Params, {timeout, Timeout});
+
+report_error(Worker, HttpDb, Params, Error) ->
+    Method = string:to_upper(
+        atom_to_list(couch_util:get_value(method, Params, get))),
+    Url = strip_creds(full_url(HttpDb, Params)),
+    do_report_error(Url, Method, Error),
+    stop_worker(Worker),
+    exit({http_request_failed, Method, Url}).
+
+
+do_report_error(FullUrl, Method, {reason, Reason}) ->
+    ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~p",
+        [Method, FullUrl, Reason]);
+
+do_report_error(Url, Method, {code, Code}) ->
+    ?LOG_ERROR("Replicator, request ~s to ~p failed. The received "
+        "HTTP error code is ~p", [Method, Url, Code]);
+
+do_report_error(Url, Method, {timeout, Timeout}) ->
+    ?LOG_ERROR("Replicator, request ~s to ~p failed. Inactivity timeout "
+        " (~p milliseconds).", [Method, Url, Timeout]).
+
+
+stream_data_self(HttpDb, Params, Worker, ReqId) ->
+    ibrowse:stream_next(ReqId),
+    receive {ibrowse_async_response, ReqId, Data} ->
+        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end};
+    {ibrowse_async_response_end, ReqId} ->
+        {<<>>, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end}
+    after HttpDb#httpdb.timeout ->
+        report_error(Worker, HttpDb, Params, timeout)
+    end.
+
+
+full_url(#httpdb{url = BaseUrl}, Params) ->
+    Path = couch_util:get_value(path, Params, []),
+    QueryArgs = couch_util:get_value(qs, Params, []),
+    BaseUrl ++ Path ++ query_args_to_string(QueryArgs, []).
+
+
+strip_creds(Url) ->
+    re:replace(Url,
+        "http(s)?://([^:]+):[^@]+@(.*)$",
+        "http\\1://\\2:*****@\\3",
+        [{return, list}]).
+
+
+query_args_to_string([], []) ->
+    "";
+query_args_to_string([], Acc) ->
+    "?" ++ string:join(lists:reverse(Acc), "&");
+query_args_to_string([{K, V} | Rest], Acc) ->
+    query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]).
+
+
+oauth_header(_Url, _QS, _Action, nil) ->
+    [];
+oauth_header(Url, QS, Action, OAuth) ->
+    Consumer = {
+        OAuth#oauth.consumer_key,
+        OAuth#oauth.consumer_secret,
+        OAuth#oauth.signature_method
+    },
+    Method = case Action of
+    get -> "GET";
+    post -> "POST";
+    put -> "PUT";
+    head -> "HEAD"
+    end,
+    Params = oauth:signed_params(Method, Url, QS, Consumer,
+        #oauth.token,
+        #oauth.token_secret),
+    [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}].
+
+
+do_redirect(RespHeaders, #httpdb{url = OrigUrl} = HttpDb, Params, Callback) ->
+    RedirectUrl = redirect_url(RespHeaders, OrigUrl),
+    {HttpDb2, Params2} = after_redirect(RedirectUrl, HttpDb, Params),
+    send_req(HttpDb2, Params2, Callback).
+
+
+redirect_url(RespHeaders, OrigUrl) ->
+    MochiHeaders = mochiweb_headers:make(RespHeaders),
+    RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
+    #url{
+        host = Base,
+        port = Port,
+        path = Path,  % includes query string
+        protocol = Proto
+    } = ibrowse_lib:parse_url(RedUrl),
+    #url{
+        username = User,
+        password = Passwd
+    } = ibrowse_lib:parse_url(OrigUrl),
+    Creds = case is_list(User) andalso is_list(Passwd) of
+    true ->
+        User ++ ":" ++ Passwd ++ "@";
+    false ->
+        []
+    end,
+    atom_to_list(Proto) ++ "://" ++ Creds ++ Base ++ ":" ++
+        integer_to_list(Port) ++ Path.
+
+after_redirect(RedirectUrl, HttpDb, Params) ->
+    Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
+    {HttpDb#httpdb{url = RedirectUrl}, Params2}.

Modified: couchdb/branches/new_replicator/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_db.hrl?rev=985712&r1=985711&r2=985712&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_db.hrl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_db.hrl Sun Aug 15 17:01:18 2010
@@ -23,6 +23,7 @@
 -define(b2a(V), list_to_atom(binary_to_list(V))).
 -define(b2l(V), binary_to_list(V)).
 -define(l2b(V), list_to_binary(V)).
+-define(l2i(V), list_to_integer(V)).
 
 -define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>).
 

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=985712&r1=985711&r2=985712&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Sun Aug 15 17:01:18 2010
@@ -23,6 +23,10 @@
 -include("couch_db.hrl").
 -include("couch_api_wrap.hrl").
 
+% Can't be greater than the maximum number of child restarts specified
+% in couch_rep_sup.erl.
+-define(MAX_RESTARTS, 3).
+
 
 -record(stats, {
     missing_checked = 0,
@@ -124,14 +128,22 @@ rep_result_listener(RepId) ->
 
 
 wait_for_result(RepId, Listener) ->
-    Result = receive
+    wait_for_result(RepId, Listener, ?MAX_RESTARTS).
+
+wait_for_result(RepId, Listener, RetriesLeft) ->
+    receive
     {finished, RepId, RepResult} ->
+        couch_replication_notifier:stop(Listener),
         {ok, RepResult};
     {error, RepId, Reason} ->
-        {error, Reason}
-    end,
-    couch_replication_notifier:stop(Listener),
-    Result.
+        case RetriesLeft > 0 of
+        true ->
+            wait_for_result(RepId, Listener, RetriesLeft - 1);
+        false ->
+            couch_replication_notifier:stop(Listener),
+            {error, Reason}
+        end
+    end.
 
 
 end_replication({BaseId, Extension}) ->
@@ -242,12 +254,6 @@ handle_info({add_stat, {StatPos, Val}}, 
     NewStats = setelement(StatPos, Stats, Stat + Val),
     {noreply, State#rep_state{stats = NewStats}};
 
-handle_info(timed_checkpoint, State) ->
-    State2 = do_checkpoint(State),
-    NewTimer = erlang:start_timer(checkpoint_interval(State2),
-        self(), timed_checkpoint),
-    {noreply, State2#rep_state{timer = NewTimer}};
-
 handle_info(done, #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
     % This means all the worker processes have completed their work.
     % Assert that all the seqs have been processed
@@ -260,6 +266,7 @@ handle_info({'EXIT', Pid, normal}, #rep_
     {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+    cancel_timer(State),
     ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
     {stop, changes_reader_died, State};
 
@@ -267,6 +274,7 @@ handle_info({'EXIT', Pid, normal}, #rep_
     {noreply, St};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_finder=Pid} = St) ->
+    cancel_timer(St),
     ?LOG_ERROR("MissingRevsFinder process died with reason: ~p", [Reason]),
     {stop, missing_revs_finder_died, St};
 
@@ -274,6 +282,7 @@ handle_info({'EXIT', Pid, normal}, #rep_
     {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{doc_copier=Pid} = State) ->
+    cancel_timer(State),
     ?LOG_ERROR("DocCopier process died with reason: ~p", [Reason]),
     {stop, doc_copier_died, State};
 
@@ -281,6 +290,7 @@ handle_info({'EXIT', Pid, normal}, #rep_
     {noreply, St};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_queue=Pid} = St) ->
+    cancel_timer(St),
     ?LOG_ERROR("MissingRevsQueue process died with reason: ~p", [Reason]),
     {stop, missing_revs_queue_died, St};
 
@@ -288,6 +298,7 @@ handle_info({'EXIT', Pid, normal}, #rep_
     {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+    cancel_timer(State),
     ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
     {stop, changes_queue_died, State}.
 
@@ -297,6 +308,10 @@ handle_call(Msg, _From, State) ->
     {stop, unexpected_sync_message, State}.
 
 
+handle_cast(checkpoint, State) ->
+    State2 = do_checkpoint(State),
+    {noreply, State2#rep_state{timer = start_timer(State)}};
+
 handle_cast(Msg, State) ->
     ?LOG_ERROR("Replicator received an unexpected asynchronous call: ~p", [Msg]),
     {stop, unexpected_async_message, State}.
@@ -319,23 +334,31 @@ terminate(Reason, #rep_state{rep_id = Re
     couch_replication_notifier:notify({error, RepId, Reason}).
 
 
-terminate_cleanup(State) ->
-    #rep_state{
-        missing_revs_queue = MissingRevsQueue,
-        changes_queue = ChangesQueue,
-        source = Source,
-        target = Target
-    } = State,
-    couch_work_queue:close(MissingRevsQueue),
-    couch_work_queue:close(ChangesQueue),
+terminate_cleanup(#rep_state{source = Source, target = Target}) ->
     couch_api_wrap:db_close(Source),
     couch_api_wrap:db_close(Target).
 
 
+start_timer(#rep_state{rep_options = Options} = State) ->
+    case couch_util:get_value(doc_ids, Options) of
+    undefined ->
+        After = checkpoint_interval(State),
+        case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
+        {ok, Ref} ->
+            Ref;
+        Error ->
+            ?LOG_ERROR("Replicator, error scheduling checkpoint:  ~p", [Error]),
+            nil
+        end;
+    _DocIdList ->
+        nil
+    end.
+
+
 cancel_timer(#rep_state{timer = nil}) ->
     ok;
 cancel_timer(#rep_state{timer = Timer}) ->
-    erlang:cancel_timer(Timer).
+    {ok, cancel} = timer:cancel(Timer).
 
 
 get_result(#rep_state{stats = Stats, rep_options = Options} = State) ->
@@ -390,15 +413,7 @@ init_state({BaseId, _Ext} = RepId, Src, 
         src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
         tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo)
     },
-    State#rep_state{
-        timer = case couch_util:get_value(doc_ids, Options) of
-        undefined ->
-            erlang:start_timer(checkpoint_interval(State),
-                self(), timed_checkpoint);
-        _DocIdList ->
-            nil
-        end
-    }.
+    State#rep_state{timer = start_timer(State)}.
 
 
 spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue, Options) ->
@@ -598,6 +613,10 @@ commit_to_both(Source, Target) ->
     SourceStartTime =
     receive
     {SrcCommitPid, {ok, Timestamp}} ->
+        receive
+        {'EXIT', SrcCommitPid, normal} ->
+            ok
+        end,
         Timestamp;
     {'EXIT', SrcCommitPid, _} ->
         exit(replication_link_failure)



Mime
View raw message