couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1035320 - in /couchdb/branches/new_replicator: etc/couchdb/default.ini.tpl.in src/couchdb/couch_api_wrap.erl src/couchdb/couch_api_wrap.hrl src/couchdb/couch_api_wrap_httpc.erl src/couchdb/couch_replicator.erl
Date Mon, 15 Nov 2010 15:51:36 GMT
Author: fdmanana
Date: Mon Nov 15 15:51:36 2010
New Revision: 1035320

URL: http://svn.apache.org/viewvc?rev=1035320&view=rev
Log:
New replicator: reuse TCP connections and retry failed requests.

Modified:
    couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl

Modified: couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in?rev=1035320&r1=1035319&r2=1035320&view=diff
==============================================================================
--- couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in Mon Nov 15 15:51:36 2010
@@ -118,3 +118,5 @@ compressible_types = text/*, application
 [replicator]
 ; should be at least 2
 worker_processes = 10
+; the maximum number of TCP connections to use against a single server
+max_connections_per_server = 100
\ No newline at end of file

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1035320&r1=1035319&r2=1035320&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Mon Nov 15 15:51:36 2010
@@ -17,16 +17,6 @@
 %
 % Notes:
 % Many options and apis aren't yet supported here, they are added as needed.
-%
-% This file neesds a lot of work to "robustify" the common failures, and
-% convert the json errors back to Erlang style errors.
-%
-% Also, we open a new connection for every HTTP call, to avoid the
-% problems when requests are pipelined over a single connection and earlier
-% requests that fail and disconnect don't cause network errors for other
-% requests. This should eventually be optimized so each process has it's own
-% connection that's kept alive between requests.
-%
 
 -include("couch_db.hrl").
 -include("couch_api_wrap.hrl").
@@ -71,7 +61,8 @@ db_uri(#db{name = Name}) ->
 db_open(Db, Options) ->
     db_open(Db, Options, false).
 
-db_open(#httpdb{} = Db, _Options, Create) ->
+db_open(#httpdb{} = Db1, _Options, Create) ->
+    {ok, Db} = couch_api_wrap_httpc:setup(Db1),
     case Create of
     false ->
         ok;
@@ -169,14 +160,18 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re
                     {ibrowse_options, [{stream_to, {self(), once}}]},
                     {headers, [{"accept", "multipart/mixed"}]}],
                 fun(200, Headers, StreamDataFun) ->
+                    Self ! started,
                     couch_httpd:parse_multipart_request(
                         get_value("Content-Type", Headers),
                         StreamDataFun,
-                        fun(Ev) -> mp_parse_mixed(Ev) end)
+                        fun mp_parse_mixed/1)
                 end),
             unlink(Self)
         end),
-    receive_docs(Streamer, Fun, Acc);
+    receive
+    started ->
+        receive_docs_loop(Streamer, Fun, Acc)
+    end;
 open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
     {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
     {ok, lists:foldl(Fun, Acc, Results)}.
@@ -227,29 +222,11 @@ update_doc(#httpdb{} = HttpDb, #doc{id =
     false ->
         []
     end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
-    Self = self(),
-    DocStreamer = spawn_link(fun() ->
-        couch_doc:doc_to_multi_part_stream(
-            Boundary, JsonBytes, Doc#doc.atts,
-            fun(Data) ->
-                receive {get_data, From} ->
-                    From ! {data, Data}
-                end
-            end, false),
-        unlink(Self)
-    end),
-    SendFun = fun(0) ->
-            eof;
-        (LenLeft) when LenLeft > 0 ->
-            DocStreamer ! {get_data, self()},
-            receive {data, Data} ->
-                {ok, Data, LenLeft - iolist_size(Data)}
-            end
-    end,
+    Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
     send_req(
         HttpDb,
         [{method, put}, {path, encode_doc_id(DocId)},
-            {qs, QArgs}, {headers, Headers}, {body, {SendFun, Len}}],
+            {qs, QArgs}, {headers, Headers}, {body, Body}],
         fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->
                 {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
             (_, _, {Props}) ->
@@ -318,7 +295,7 @@ changes_since(#httpdb{} = HttpDb, Style,
         % Shouldn't be infinity, but somehow if it's not, issues arise
         % frequently with ibrowse.
         HttpDb#httpdb{timeout = infinity},
-        [{path, "_changes"}, {qs, QArgs},
+        [{path, "_changes"}, {qs, QArgs}, {direct, true},
             {ibrowse_options, [{stream_to, {self(), once}}]}],
         fun(200, _, DataStreamFun) ->
             case couch_util:get_value(continuous, Options, false) of
@@ -450,10 +427,20 @@ atts_since_arg(UrlLen, [PA | Rest], Acc)
     end.
 
 
+receive_docs_loop(Streamer, Fun, Acc) ->
+    try
+        receive_docs(Streamer, Fun, Acc)
+    catch
+    throw:restart ->
+        receive_docs_loop(Streamer, Fun, Acc)
+    end.
+
 receive_docs(Streamer, UserFun, UserAcc) ->
     Streamer ! {get_headers, self()},
     receive
-    {headers, Headers} ->    
+    started ->
+        throw(restart);
+    {headers, Headers} ->
         case get_value("content-type", Headers) of
         {"multipart/related", _} = ContentType ->
             case couch_doc:doc_from_multi_part_stream(ContentType, 
@@ -481,16 +468,20 @@ receive_docs(Streamer, UserFun, UserAcc)
 receive_all(Streamer, Acc)->
     Streamer ! {next_bytes, self()},
     receive
+    started ->
+        throw(restart);
     {body_bytes, Bytes} ->
         receive_all(Streamer, [Bytes | Acc]);
     body_done ->
         lists:reverse(Acc)
-     end.
+    end.
 
 
 receive_doc_data(Streamer)->    
     Streamer ! {next_bytes, self()},
     receive
+    started ->
+        throw(restart);
     {body_bytes, Bytes} ->
         {Bytes, fun() -> receive_doc_data(Streamer) end};
     body_done ->
@@ -611,3 +602,34 @@ bulk_results_to_errors(_Docs, Results, r
             end
         end,
         [], Results)).
+
+
+stream_doc({JsonBytes, Atts, Boundary, Len}) ->
+    case erlang:erase({doc_streamer, Boundary}) of
+    Pid when is_pid(Pid) ->
+        unlink(Pid),
+        exit(Pid, kill);
+    _ ->
+        ok
+    end,
+    Self = self(),
+    DocStreamer = spawn_link(fun() ->
+        couch_doc:doc_to_multi_part_stream(
+            Boundary, JsonBytes, Atts,
+            fun(Data) ->
+                receive {get_data, From} ->
+                    From ! {data, Data}
+                end
+            end, false),
+        unlink(Self)
+    end),
+    erlang:put({doc_streamer, Boundary}, DocStreamer),
+    {ok, <<>>, {Len, Boundary}};
+stream_doc({0, Id}) ->
+    erlang:erase({doc_streamer, Id}),
+    eof;
+stream_doc({LenLeft, Id}) when LenLeft > 0 ->
+    erlang:get({doc_streamer, Id}) ! {get_data, self()},
+    receive {data, Data} ->
+        {ok, Data, {LenLeft - iolist_size(Data), Id}}
+    end.

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl?rev=1035320&r1=1035319&r2=1035320&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl Mon Nov 15 15:51:36 2010
@@ -16,8 +16,10 @@
     url,
     oauth = nil,
     headers = [],
-    timeout = 30000,  % milliseconds
-    proxy_options = []
+    timeout = 30000,    % milliseconds
+    proxy_options = [],
+    retries = 10,
+    wait = 250          % milliseconds
 }).
 
 -record(oauth, {

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=1035320&r1=1035319&r2=1035320&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Mon Nov 15 15:51:36
2010
@@ -16,6 +16,7 @@
 -include("couch_api_wrap.hrl").
 -include("../ibrowse/ibrowse.hrl").
 
+-export([setup/1]).
 -export([send_req/3]).
 -export([full_url/2]).
 
@@ -24,6 +25,21 @@
     get_value/3
     ]).
 
+-define(RETRY_LATER_WAIT, 100).
+
+
+setup(#httpdb{url = Url} = Db) ->
+    #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
+    MaxSessions = list_to_integer(
+        couch_config:get("replicator", "max_connections_per_server", "100")),
+    ok = ibrowse:set_max_sessions(Host, Port, MaxSessions),
+    ok = ibrowse:set_max_pipeline_size(Host, Port, 1),
+    ok = couch_config:register(
+        fun("replicator", "max_connections_per_server", NewMax) ->
+            ok = ibrowse:set_max_sessions(Host, Port, list_to_integer(NewMax))
+        end),
+    {ok, Db}.
+
 
 send_req(#httpdb{headers = BaseHeaders} = HttpDb, Params, Callback) ->
     Method = get_value(method, Params, get),
@@ -43,36 +59,52 @@ send_req(#httpdb{headers = BaseHeaders} 
     ],
     Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
     Url = full_url(HttpDb, Params),
-    {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
-    Response = ibrowse:send_req_direct(
-            Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity),
+    case get_value(direct, Params, false) of
+    true ->
+        {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
+        Response = ibrowse:send_req_direct(
+            Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity);
+    false ->
+        Worker = nil,
+        Response = ibrowse:send_req(
+            Url, Headers2, Method, Body, IbrowseOptions, infinity)
+    end,
     process_response(Response, Worker, HttpDb, Params, Callback).
 
 
+process_response({error, retry_later}, _Worker, HttpDb, Params, Callback) ->
+    % this means that the config option "max_connections_per_server" should
+    % probably be increased
+    ok = timer:sleep(?RETRY_LATER_WAIT),
+    send_req(HttpDb, Params, Callback);
+
+process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
+    % ibrowse worker terminated because remote peer closed the socket
+    % -> not an error
+    send_req(HttpDb, Params, Cb);
+
 process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
     process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
 
-process_response(Resp, Worker, HttpDb, Params, Callback) ->
+process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
     stop_worker(Worker),
-    case Resp of
-    {ok, Code, Headers, Body} ->
-        case ?l2i(Code) of
-        Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
-            EJson = case Body of
-            <<>> ->
-                null;
-            Json ->
-                ?JSON_DECODE(Json)
-            end,
-            Callback(Ok, Headers, EJson);
-        R when R =:= 301 ; R =:= 302 ->
-            do_redirect(Worker, Headers, HttpDb, Params, Callback);
-        Error ->
-            report_error(nil, HttpDb, Params, {code, Error})
-        end;
+    case list_to_integer(Code) of
+    Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
+        EJson = case Body of
+        <<>> ->
+            null;
+        Json ->
+            ?JSON_DECODE(Json)
+        end,
+        Callback(Ok, Headers, EJson);
+    R when R =:= 301 ; R =:= 302 ->
+        do_redirect(Worker, Headers, HttpDb, Params, Callback);
     Error ->
-        report_error(nil, HttpDb, Params, {error, Error})
-    end.
+        maybe_retry({code, Error}, Worker, HttpDb, Params, Callback)
+    end;
+
+process_response(Error, Worker, HttpDb, Params, Callback) ->
+    maybe_retry(Error, Worker, HttpDb, Params, Callback).
 
 
 process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
@@ -81,7 +113,7 @@ process_stream_response(ReqId, Worker, H
         case ?l2i(Code) of
         Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
             StreamDataFun = fun() ->
-                stream_data_self(HttpDb, Params, Worker, ReqId)
+                stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
             end,
             Ret = Callback(Ok, Headers, StreamDataFun),
             stop_worker(Worker),
@@ -92,7 +124,7 @@ process_stream_response(ReqId, Worker, H
             report_error(Worker, HttpDb, Params, {code, Error})
         end;
     {ibrowse_async_response, ReqId, {error, _} = Error} ->
-        report_error(Worker, HttpDb, Params, Error)
+        maybe_retry(Error, Worker, HttpDb, Params, Callback)
     end.
 
 
@@ -104,6 +136,20 @@ stop_worker(Worker) when is_pid(Worker) 
     catch ibrowse:stop_worker_process(Worker).
 
 
+maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
+    report_error(Worker, HttpDb, Params, {error, Error});
+
+maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+    Params, Cb) ->
+    stop_worker(Worker),
+    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+    ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~p",
+        [Method, Url, Wait / 1000, Error]),
+    ok = timer:sleep(Wait),
+    send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait * 2}, Params, Cb).
+
+
 report_error(Worker, #httpdb{timeout = Timeout} = HttpDb, Params, timeout) ->
     report_error(Worker, HttpDb, Params, {timeout, Timeout});
 
@@ -131,13 +177,13 @@ do_report_error(Url, Method, {timeout, T
         " (~p milliseconds).", [Method, Url, Timeout]).
 
 
-stream_data_self(HttpDb, Params, Worker, ReqId) ->
+stream_data_self(HttpDb, Params, Worker, ReqId, Cb) ->
     ibrowse:stream_next(ReqId),
     receive
     {ibrowse_async_response, ReqId, {error, _} = Error} ->
-        report_error(Worker, HttpDb, Params, {error, Error});
+        maybe_retry(Error, Worker, HttpDb, Params, Cb);
     {ibrowse_async_response, ReqId, Data} ->
-        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end};
+        {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
     {ibrowse_async_response_end, ReqId} ->
         {<<>>, fun() ->
             report_error(Worker, HttpDb, Params, {error, more_data_expected})

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1035320&r1=1035319&r2=1035320&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Mon Nov 15 15:51:36 2010
@@ -28,10 +28,6 @@
     get_value/3
 ]).
 
-% Can't be greater than the maximum number of child restarts specified
-% in couch_rep_sup.erl.
--define(MAX_RESTARTS, 3).
-
 % maximum number of elements (per iteration) that each missing
 % revs finder process fetches from the missing revs queue
 -define(REV_BATCH_SIZE, 1000).
@@ -177,19 +173,11 @@ rep_result_listener(RepId) ->
 
 
 wait_for_result(RepId) ->
-    wait_for_result(RepId, ?MAX_RESTARTS).
-
-wait_for_result(RepId, RetriesLeft) ->
     receive
     {finished, RepId, RepResult} ->
         {ok, RepResult};
     {error, RepId, Reason} ->
-        case RetriesLeft > 0 of
-        true ->
-            wait_for_result(RepId, RetriesLeft - 1);
-        false ->
-            {error, Reason}
-        end
+        {error, Reason}
     end.
 
 



Mime
View raw message