couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1144203 - in /couchdb/trunk: etc/couchdb/default.ini.tpl.in src/couchdb/couch_api_wrap.hrl src/couchdb/couch_api_wrap_httpc.erl src/couchdb/couch_httpc_pool.erl src/couchdb/couch_replicator.erl src/couchdb/couch_replicator_utils.erl
Date Fri, 08 Jul 2011 09:15:51 GMT
Author: fdmanana
Date: Fri Jul  8 09:15:49 2011
New Revision: 1144203

URL: http://svn.apache.org/viewvc?rev=1144203&view=rev
Log:
Simplify replicator's http connection pool

Dropped support for pipelining, making it less tied to
ibrowse and much simpler and shorter. In practice the
pipelining doesn't offer significant gains and it's
problematic on slow/congestioned networks (an error
in one request will cause all subsequent requests in
the same connection to be retried).


Modified:
    couchdb/trunk/etc/couchdb/default.ini.tpl.in
    couchdb/trunk/src/couchdb/couch_api_wrap.hrl
    couchdb/trunk/src/couchdb/couch_api_wrap_httpc.erl
    couchdb/trunk/src/couchdb/couch_httpc_pool.erl
    couchdb/trunk/src/couchdb/couch_replicator.erl
    couchdb/trunk/src/couchdb/couch_replicator_utils.erl

Modified: couchdb/trunk/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/trunk/etc/couchdb/default.ini.tpl.in?rev=1144203&r1=1144202&r2=1144203&view=diff
==============================================================================
--- couchdb/trunk/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/trunk/etc/couchdb/default.ini.tpl.in Fri Jul  8 09:15:49 2011
@@ -163,10 +163,8 @@ worker_processes = 4
 ; With lower batch sizes checkpoints are done more frequently. Lower batch sizes
 ; also reduce the total amount of used RAM memory.
 worker_batch_size = 500
-; Maximum number of HTTP connections and pipeline size (for each connection)
-; per replication. These two settings have more impact on pull replications.
+; Maximum number of HTTP connections per replication.
 http_connections = 20
-http_pipeline_size = 1
 ; HTTP connection timeout per replication.
 ; Even for very fast/reliable networks it might need to be increased if a remote
 ; database is too busy.

Modified: couchdb/trunk/src/couchdb/couch_api_wrap.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_api_wrap.hrl?rev=1144203&r1=1144202&r2=1144203&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_api_wrap.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_api_wrap.hrl Fri Jul  8 09:15:49 2011
@@ -24,8 +24,7 @@
     retries = 10,
     wait = 250,         % milliseconds
     httpc_pool = nil,
-    http_connections,
-    http_pipeline_size
+    http_connections
 }).
 
 -record(oauth, {

Modified: couchdb/trunk/src/couchdb/couch_api_wrap_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_api_wrap_httpc.erl?rev=1144203&r1=1144202&r2=1144203&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_api_wrap_httpc.erl (original)
+++ couchdb/trunk/src/couchdb/couch_api_wrap_httpc.erl Fri Jul  8 09:15:49 2011
@@ -28,15 +28,8 @@
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 
 
-setup(#httpdb{httpc_pool = nil, url = Url, ibrowse_options = IbrowseOptions,
-    http_connections = MaxConns, http_pipeline_size = PipeSize} = Db) ->
-    HttpcPoolOptions = [
-        {ssl_options, get_value(ssl_options, IbrowseOptions, [])},
-        {max_piped_connections, MaxConns},
-        {pipeline_size, PipeSize}
-    ],
-    {ok, Pid} = couch_httpc_pool:start_link(
-        ibrowse_lib:parse_url(Url), HttpcPoolOptions),
+setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
+    {ok, Pid} = couch_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
     {ok, Db#httpdb{httpc_pool = Pid}}.
 
 
@@ -56,25 +49,11 @@ send_ibrowse_req(#httpdb{headers = BaseH
     Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
     Url = full_url(HttpDb, Params),
     Body = get_value(body, Params, []),
-    {_Type, WorkerPid} = Worker =
     case get_value(path, Params) of
     "_changes" ->
-        {ok, Pid} = ibrowse:spawn_link_worker_process(Url),
-        {ibrowse_direct, Pid};
+        {ok, Worker} = ibrowse:spawn_link_worker_process(Url);
     _ ->
-        % Direct means no usage of HTTP pipeline. As section 8.1.2.2 of
-        % RFC 2616 says, clients should not pipeline non-idempotent requests.
-        % Let the caller explicitly say which requests are not idempotent.
-        % For e.g. POSTs against "/some_db/_revs_diff" are idempotent
-        % (despite the verb not being GET).
-        case get_value(direct, Params, false) of
-        true ->
-            {ok, Pid} = couch_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool),
-            {direct, Pid};
-        false ->
-            Pid = get_piped_worker(HttpDb),
-            {piped, Pid}
-        end
+        {ok, Worker} = couch_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool)
     end,
     IbrowseOptions = [
         {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} |
@@ -82,26 +61,11 @@ send_ibrowse_req(#httpdb{headers = BaseH
             HttpDb#httpdb.ibrowse_options)
     ],
     Response = ibrowse:send_req_direct(
-        WorkerPid, Url, Headers2, Method, Body, IbrowseOptions, infinity),
+        Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity),
     {Worker, Response}.
 
 
-wait_retry() ->
-    ok = timer:sleep(random:uniform(40) + 10).
-
-
-get_piped_worker(#httpdb{httpc_pool = Pool} = HttpDb) ->
-    case couch_httpc_pool:get_piped_worker(Pool) of
-    {ok, Worker} ->
-        Worker;
-    retry_later ->
-        wait_retry(),
-        get_piped_worker(HttpDb)
-    end.
-
-
 process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
-    wait_retry(),
     send_req(HttpDb, Params, Callback);
 
 process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
@@ -113,7 +77,7 @@ process_response({ibrowse_req_id, ReqId}
     process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
 
 process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
-    stop_worker(Worker, HttpDb),
+    release_worker(Worker, HttpDb),
     case list_to_integer(Code) of
     Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
         EJson = case Body of
@@ -144,7 +108,7 @@ process_stream_response(ReqId, Worker, H
             ibrowse:stream_next(ReqId),
             try
                 Ret = Callback(Ok, Headers, StreamDataFun),
-                stop_worker(Worker, HttpDb),
+                release_worker(Worker, HttpDb),
                 clean_mailbox_req(ReqId),
                 Ret
             catch throw:{maybe_retry_req, Err} ->
@@ -177,14 +141,8 @@ clean_mailbox_req(ReqId) ->
     end.
 
 
-stop_worker({ibrowse_direct, Worker}, _HttpDb) ->
-    unlink(Worker),
-    receive {'EXIT', Worker, _} -> ok after 0 -> ok end,
-    catch ibrowse:stop_worker_process(Worker);
-stop_worker({direct, Worker}, #httpdb{httpc_pool = Pool}) ->
-    ok = couch_httpc_pool:release_worker(Pool, Worker);
-stop_worker({piped, _Worker}, _HttpDb) ->
-    ok.
+release_worker(Worker, #httpdb{httpc_pool = Pool}) ->
+    ok = couch_httpc_pool:release_worker(Pool, Worker).
 
 
 maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
@@ -192,7 +150,7 @@ maybe_retry(Error, Worker, #httpdb{retri
 
 maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
     Params, Cb) ->
-    stop_worker(Worker, HttpDb),
+    release_worker(Worker, HttpDb),
     Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
     ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~s",
@@ -205,7 +163,7 @@ report_error(Worker, HttpDb, Params, Err
     Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
     do_report_error(Url, Method, Error),
-    stop_worker(Worker, HttpDb),
+    release_worker(Worker, HttpDb),
     exit({http_request_failed, Method, Url, Error}).
 
 
@@ -281,7 +239,7 @@ oauth_header(#httpdb{url = BaseUrl, oaut
 
 
 do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
-    stop_worker(Worker, HttpDb),
+    release_worker(Worker, HttpDb),
     RedirectUrl = redirect_url(Headers, Url),
     {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
     send_req(HttpDb2, Params2, Cb).

Modified: couchdb/trunk/src/couchdb/couch_httpc_pool.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpc_pool.erl?rev=1144203&r1=1144202&r2=1144203&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpc_pool.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpc_pool.erl Fri Jul  8 09:15:49 2011
@@ -10,23 +10,11 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
-% This module is similar to Ibrowse's ibrowse_lb.erl (load balancer) module.
-% The main differences are:
-%
-% 1) Several HTTP connection pools can be spawned. This is important for
-%    replications, as each replication can have its own pool which allows
-%    for better error isolation - connections (and their pipelines) are not
-%    shared between different replications;
-%
-% 2) The caller can request both pipelined connections and non-pipelined
-%    connections.
-%
 -module(couch_httpc_pool).
 -behaviour(gen_server).
 
 % public API
 -export([start_link/2, stop/1]).
--export([get_piped_worker/1]).
 -export([get_worker/1, release_worker/2]).
 
 % gen_server API
@@ -34,7 +22,6 @@
 -export([code_change/3, terminate/2]).
 
 -include("couch_db.hrl").
--include("../ibrowse/ibrowse.hrl").
 
 -import(couch_util, [
     get_value/2,
@@ -43,117 +30,88 @@
 
 -record(state, {
     url,
-    ssl_options,
-    max_piped_workers,
-    pipeline_size,
-    piped_workers,
-    used_piped_workers = 0,
-    free_workers = [],  % free workers (connections) without pipeline
-    busy_workers = []   % busy workers (connections) without pipeline
+    limit,                  % max # of workers allowed
+    free = [],              % free workers (connections)
+    busy = [],              % busy workers (connections)
+    waiting = queue:new()   % blocked clients waiting for a worker
 }).
 
 
-start_link(BaseUrl, Options) ->
-    gen_server:start_link(?MODULE, {BaseUrl, Options}, []).
+start_link(Url, Options) ->
+    gen_server:start_link(?MODULE, {Url, Options}, []).
 
 
 stop(Pool) ->
     ok = gen_server:call(Pool, stop, infinity).
 
 
-get_piped_worker(Pool) ->
-    gen_server:call(Pool, get_piped_worker, infinity).
-
-
 get_worker(Pool) ->
-    gen_server:call(Pool, get_worker, infinity).
+    {ok, _Worker} = gen_server:call(Pool, get_worker, infinity).
 
 
-% Only workers without a pipeline need to be released.
 release_worker(Pool, Worker) ->
-    ok = gen_server:call(Pool, {release_worker, Worker}, infinity).
+    ok = gen_server:cast(Pool, {release_worker, Worker}).
 
 
-init({BaseUrl, Options}) ->
+init({Url, Options}) ->
     process_flag(trap_exit, true),
     State = #state{
-        url = BaseUrl,
-        ssl_options = get_value(ssl_options, Options, []),
-        pipeline_size = get_value(pipeline_size, Options),
-        max_piped_workers = get_value(max_piped_connections, Options),
-        piped_workers = ets:new(httpc_pool, [ordered_set, public])
+        url = Url,
+        limit = get_value(max_connections, Options)
     },
     {ok, State}.
 
 
-handle_call(get_piped_worker, _From,
-    #state{piped_workers = WorkersEts, max_piped_workers = Max,
-        used_piped_workers = Used, url = Url,
-        ssl_options = SslOptions} = State) when Used < Max ->
-    {ok, Worker} = ibrowse_http_client:start_link({WorkersEts, Url,
-        {SslOptions, SslOptions =/= []}}),
-    true = ets:insert(WorkersEts, {{1, Worker}, []}),
-    {reply, {ok, Worker}, State#state{used_piped_workers = Used + 1}};
-
-handle_call(get_piped_worker, _From,
-    #state{piped_workers = WorkersEts, pipeline_size = PipeSize} = State) ->
-    case ets:first(WorkersEts) of
-	{NumSessions, Worker} when NumSessions < PipeSize ->
-	    true = ets:delete(WorkersEts, {NumSessions, Worker}),
-	    true = ets:insert(WorkersEts, {{NumSessions + 1, Worker}, []}),
-        {reply, {ok, Worker}, State};
-	_ ->
-	    {reply, retry_later, State}
-    end;
-
-handle_call(get_worker, _From, #state{
-        free_workers = [], busy_workers = Busy,
-        url = #url{host = Host, port = Port}} = State) ->
-    {ok, Worker} = ibrowse_http_client:start_link({Host, Port}),
-    {reply, {ok, Worker}, State#state{busy_workers = [Worker | Busy]}};
-
-handle_call(get_worker, _From, #state{
-        free_workers = [Worker | RestFree], busy_workers = Busy} = State) ->
-    {reply, {ok, Worker}, State#state{
-        busy_workers = [Worker | Busy], free_workers = RestFree}};
-
-handle_call({release_worker, Worker}, _From, #state{
-        free_workers = Free, busy_workers = Busy} = State) ->
-    case Busy -- [Worker] of
-    Busy ->
-        {reply, ok, State};
-    Busy2 ->
-        {reply, ok, State#state{
-            busy_workers = Busy2, free_workers = [Worker | Free]}}
+handle_call(get_worker, From, #state{waiting = Waiting} = State) ->
+    #state{url = Url, limit = Limit, busy = Busy, free = Free} = State,
+    case length(Busy) >= Limit of
+    true ->
+        {noreply, State#state{waiting = queue:in(From, Waiting)}};
+    false ->
+        case Free of
+        [] ->
+           {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
+           Free2 = Free;
+        [Worker | Free2] ->
+           ok
+        end,
+        NewState = State#state{free = Free2, busy = [Worker | Busy]},
+        {reply, {ok, Worker}, NewState}
     end;
 
-handle_call(stop, _From, #state{piped_workers = WorkersEts,
-        free_workers = Free, busy_workers = Busy} = State) ->
-    ets:foldl(
-        fun({{_, W}, _}, _) -> ibrowse_http_client:stop(W) end, ok, WorkersEts),
-    lists:foreach(fun ibrowse_http_client:stop/1, Free),
-    lists:foreach(fun ibrowse_http_client:stop/1, Busy),
+handle_call(stop, _From, State) ->
     {stop, normal, ok, State}.
 
 
-handle_cast(Msg, State) ->
-    {stop, {unexpected_cast, Msg}, State}.
+handle_cast({release_worker, Worker}, #state{waiting = Waiting} = State) ->
+    case queue:out(Waiting) of
+    {empty, Waiting2} ->
+        Busy2 = State#state.busy -- [Worker],
+        Free2 = [Worker | State#state.free];
+    {{value, From}, Waiting2} ->
+        gen_server:reply(From, {ok, Worker}),
+        Busy2 = State#state.busy,
+        Free2 = State#state.free
+    end,
+    NewState = State#state{
+        busy = Busy2,
+        free = Free2,
+        waiting = Waiting2
+    },
+    {noreply, NewState}.
 
 
-handle_info({'EXIT', Pid, _Reason}, #state{
-        piped_workers = WorkersEts, used_piped_workers = Used,
-        busy_workers = Busy, free_workers = Free} = State) ->
+handle_info({'EXIT', Pid, _Reason}, #state{busy = Busy, free = Free} = State) ->
     case Free -- [Pid] of
     Free ->
         case Busy -- [Pid] of
         Busy ->
-            true = ets:match_delete(WorkersEts, {{'_', Pid}, '_'}),
-            {noreply, State#state{used_piped_workers = Used - 1}};
+            {noreply, State};
         Busy2 ->
-            {noreply, State#state{busy_workers = Busy2}}
+            {noreply, State#state{busy = Busy2}}
         end;
     Free2 ->
-        {noreply, State#state{free_workers = Free2}}
+        {noreply, State#state{free = Free2}}
     end.
 
 
@@ -161,5 +119,7 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-terminate(_Reason, _State) ->
-    ok.
+terminate(_Reason, State) ->
+    lists:foreach(fun ibrowse_http_client:stop/1, State#state.free),
+    lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).
+

Modified: couchdb/trunk/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1144203&r1=1144202&r2=1144203&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Fri Jul  8 09:15:49 2011
@@ -244,14 +244,11 @@ do_init(#rep{options = Options, id = {Ba
         lists:seq(1, RevFindersCount)),
     % This starts the doc copy processes. They fetch documents from the
     % MissingRevsQueue and copy them from the source to the target database.
-    MaxHttpConns = get_value(http_connections, Options),
-    HttpPipeSize = get_value(http_pipeline_size, Options),
-    MaxParallelConns = lists:max(
-        [((MaxHttpConns * HttpPipeSize) div CopiersCount) - 1, 1]),
+    MaxConns = get_value(http_connections, Options),
     Workers = lists:map(
         fun(_) ->
             {ok, Pid} = couch_replicator_doc_copier:start_link(
-                self(), Source, Target, MissingRevsQueue, MaxParallelConns),
+                self(), Source, Target, MissingRevsQueue, MaxConns),
             Pid
         end,
         lists:seq(1, CopiersCount)),
@@ -275,11 +272,11 @@ do_init(#rep{options = Options, id = {Ba
     ?LOG_INFO("Replication `~p` is using:~n"
         "~c~p worker processes~n"
         "~ca worker batch size of ~p~n"
-        "~c~p HTTP connections, each with a pipeline size of ~p~n"
+        "~c~p HTTP connections~n"
         "~ca connection timeout of ~p milliseconds~n"
         "~csocket options are: ~s~s",
-        [BaseId ++ Ext, $\t, CopiersCount, $\t, BatchSize, $\t, MaxHttpConns,
-            HttpPipeSize, $\t, get_value(connection_timeout, Options),
+        [BaseId ++ Ext, $\t, CopiersCount, $\t, BatchSize, $\t,
+            MaxConns, $\t, get_value(connection_timeout, Options),
             $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
             case StartSeq of
             ?LOWEST_SEQ ->

Modified: couchdb/trunk/src/couchdb/couch_replicator_utils.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_utils.erl?rev=1144203&r1=1144202&r2=1144203&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_utils.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator_utils.erl Fri Jul  8 09:15:49 2011
@@ -186,8 +186,7 @@ parse_rep_db({Props}, ProxyParams, Optio
             [{socket_options, get_value(socket_options, Options)} |
                 ProxyParams ++ ssl_params(Url)]),
         timeout = get_value(connection_timeout, Options),
-        http_connections = get_value(http_connections, Options),
-        http_pipeline_size = get_value(http_pipeline_size, Options)
+        http_connections = get_value(http_connections, Options)
     };
 parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
     parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
@@ -213,7 +212,6 @@ make_options(Props) ->
     DefWorkers = couch_config:get("replicator", "worker_processes", "4"),
     DefBatchSize = couch_config:get("replicator", "worker_batch_size", "500"),
     DefConns = couch_config:get("replicator", "http_connections", "20"),
-    DefPipeSize = couch_config:get("replicator", "http_pipeline_size", "1"),
     DefTimeout = couch_config:get("replicator", "connection_timeout", "30000"),
     {ok, DefSocketOptions} = couch_util:parse_term(
         couch_config:get("replicator", "socket_options",
@@ -221,7 +219,6 @@ make_options(Props) ->
     lists:ukeymerge(1, Options, [
         {connection_timeout, list_to_integer(DefTimeout)},
         {http_connections, list_to_integer(DefConns)},
-        {http_pipeline_size, list_to_integer(DefPipeSize)},
         {socket_options, DefSocketOptions},
         {worker_batch_size, list_to_integer(DefBatchSize)},
         {worker_processes, list_to_integer(DefWorkers)}
@@ -251,8 +248,6 @@ convert_options([{<<"worker_batch_size">
     [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
 convert_options([{<<"http_connections">>, V} | R]) ->
     [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_pipeline_size">>, V} | R]) ->
-    [{http_pipeline_size, couch_util:to_integer(V)} | convert_options(R)];
 convert_options([{<<"connection_timeout">>, V} | R]) ->
     [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
 convert_options([{<<"socket_options">>, V} | R]) ->



Mime
View raw message