Return-Path: Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: (qmail 98496 invoked from network); 7 Dec 2010 20:57:59 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Dec 2010 20:57:59 -0000 Received: (qmail 50773 invoked by uid 500); 7 Dec 2010 20:57:59 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 50652 invoked by uid 500); 7 Dec 2010 20:57:59 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 50645 invoked by uid 99); 7 Dec 2010 20:57:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Dec 2010 20:57:59 +0000 X-ASF-Spam-Status: No, hits=-1996.4 required=10.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Dec 2010 20:57:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 176F823889E2; Tue, 7 Dec 2010 20:57:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1043196 - in /couchdb/branches/new_replicator: etc/couchdb/default.ini.tpl.in src/couchdb/Makefile.am src/couchdb/couch_api_wrap.erl src/couchdb/couch_api_wrap.hrl src/couchdb/couch_api_wrap_httpc.erl src/couchdb/couch_httpc_pool.erl Date: Tue, 07 Dec 2010 20:57:33 -0000 To: commits@couchdb.apache.org From: fdmanana@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101207205734.176F823889E2@eris.apache.org> Author: fdmanana Date: Tue Dec 7 20:57:33 2010 New Revision: 1043196 URL: http://svn.apache.org/viewvc?rev=1043196&view=rev Log: New replicator: added custom httpc connection pool manager. This avoids sharing http connections between different replications that share the same remote endpoints. Added: couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl Modified: couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in couchdb/branches/new_replicator/src/couchdb/Makefile.am couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Modified: couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in?rev=1043196&r1=1043195&r2=1043196&view=diff ============================================================================== --- couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in (original) +++ couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in Tue Dec 7 20:57:33 2010 @@ -118,8 +118,6 @@ compressible_types = text/*, application [replicator] ; should be at least 2 worker_processes = 10 -; the maximum number of TCP connections to use against a single server -max_connections_per_server = 100 ; set to true to validate peer certificates verify_ssl_certificates = false ; file containing a list of peer trusted certificates (PEM format) Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/Makefile.am?rev=1043196&r1=1043195&r2=1043196&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/Makefile.am (original) +++ couchdb/branches/new_replicator/src/couchdb/Makefile.am Tue Dec 7 20:57:33 2010 @@ -41,6 +41,7 @@ source_files = \ couch_external_manager.erl \ couch_external_server.erl \ couch_file.erl \ + couch_httpc_pool.erl \ couch_httpd.erl \ couch_httpd_db.erl \ couch_httpd_auth.erl \ @@ -108,6 +109,7 @@ compiled_files = \ couch_external_manager.beam \ couch_external_server.beam \ couch_file.beam \ + couch_httpc_pool.beam \ couch_httpd.beam \ couch_httpd_db.beam \ couch_httpd_auth.beam \ Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1043196&r1=1043195&r2=1043196&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Dec 7 20:57:33 2010 @@ -97,8 +97,9 @@ db_open(DbName, Options, Create) -> throw({unauthorized, DbName}) end. -db_close(#httpdb{}) -> - ok; +db_close(#httpdb{httpc_pool = Pool}) -> + unlink(Pool), + ok = couch_httpc_pool:stop(Pool); db_close(DbName) -> couch_db:close(DbName). Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl?rev=1043196&r1=1043195&r2=1043196&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl Tue Dec 7 20:57:33 2010 @@ -20,7 +20,8 @@ proxy_options = [], ssl_options = [], retries = 10, - wait = 250 % milliseconds + wait = 250, % milliseconds + httpc_pool = nil }). -record(oauth, { Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=1043196&r1=1043195&r2=1043196&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Tue Dec 7 20:57:33 2010 @@ -29,20 +29,14 @@ -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})). -setup(#httpdb{url = Url} = Db) -> +setup(#httpdb{url = Url, httpc_pool = nil} = Db) -> #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), - MaxSessions = list_to_integer( - couch_config:get("replicator", "max_connections_per_server", "100")), - ok = ibrowse:set_max_sessions(Host, Port, MaxSessions), - ok = ibrowse:set_max_pipeline_size(Host, Port, 1), - ok = couch_config:register( - fun("replicator", "max_connections_per_server", NewMax) -> - ok = ibrowse:set_max_sessions(Host, Port, list_to_integer(NewMax)) - end), - {ok, Db}. + {ok, Pid} = couch_httpc_pool:start_link(Host, Port), + {ok, Db#httpdb{httpc_pool = Pid}}. -send_req(#httpdb{headers = BaseHeaders} = HttpDb, Params, Callback) -> +send_req(#httpdb{headers = BaseHeaders, httpc_pool = Pool} = HttpDb, + Params, Callback) -> Method = get_value(method, Params, get), Headers = get_value(headers, Params, []) ++ BaseHeaders, Body = get_value(body, Params, []), @@ -63,20 +57,19 @@ send_req(#httpdb{headers = BaseHeaders} Url = full_url(HttpDb, Params), case get_value(direct, Params, false) of true -> - {ok, Worker} = ibrowse:spawn_link_worker_process(Url), + {ok, Pid} = ibrowse:spawn_link_worker_process(Url), + Worker = {direct, Pid}, Response = ibrowse:send_req_direct( - Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity); + Pid, Url, Headers2, Method, Body, IbrowseOptions, infinity); false -> - Worker = nil, - Response = ibrowse:send_req( - Url, Headers2, Method, Body, IbrowseOptions, infinity) + {ok, Worker} = couch_httpc_pool:get_worker(Pool), + Response = ibrowse:send_req_direct( + Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity) end, process_response(Response, Worker, HttpDb, Params, Callback). -process_response({error, retry_later}, _Worker, HttpDb, Params, Callback) -> - % this means that the config option "max_connections_per_server" should - % probably be increased +process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) -> ok = timer:sleep(?RETRY_LATER_WAIT), send_req(HttpDb, Params, Callback); @@ -89,7 +82,7 @@ process_response({ibrowse_req_id, ReqId} process_stream_response(ReqId, Worker, HttpDb, Params, Callback); process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) -> - stop_worker(Worker), + stop_worker(Worker, HttpDb), case list_to_integer(Code) of Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) -> EJson = case Body of @@ -118,7 +111,7 @@ process_stream_response(ReqId, Worker, H stream_data_self(HttpDb, Params, Worker, ReqId, Callback) end, Ret = Callback(Ok, Headers, StreamDataFun), - stop_worker(Worker), + stop_worker(Worker, HttpDb), Ret; R when R =:= 301 ; R =:= 302 ; R =:= 303 -> do_redirect(Worker, R, Headers, HttpDb, Params, Callback); @@ -130,12 +123,12 @@ process_stream_response(ReqId, Worker, H end. -stop_worker(nil) -> - ok; -stop_worker(Worker) when is_pid(Worker) -> +stop_worker({direct, Worker}, _HttpDb) -> unlink(Worker), receive {'EXIT', Worker, _} -> ok after 0 -> ok end, - catch ibrowse:stop_worker_process(Worker). + catch ibrowse:stop_worker_process(Worker); +stop_worker(Worker, #httpdb{httpc_pool = Pool}) -> + ok = couch_httpc_pool:release_worker(Pool, Worker). maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) -> @@ -143,7 +136,7 @@ maybe_retry(Error, Worker, #httpdb{retri maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb, Params, Cb) -> - stop_worker(Worker), + stop_worker(Worker, HttpDb), Method = string:to_upper(atom_to_list(get_value(method, Params, get))), Url = couch_util:url_strip_password(full_url(HttpDb, Params)), ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~p", @@ -162,7 +155,7 @@ report_error(Worker, HttpDb, Params, Err Method = string:to_upper(atom_to_list(get_value(method, Params, get))), Url = couch_util:url_strip_password(full_url(HttpDb, Params)), do_report_error(Url, Method, Error), - stop_worker(Worker), + stop_worker(Worker, HttpDb), exit({http_request_failed, Method, Url, Error}). @@ -231,7 +224,7 @@ oauth_header(#httpdb{url = BaseUrl, oaut do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) -> - stop_worker(Worker), + stop_worker(Worker, HttpDb), RedirectUrl = redirect_url(Headers, Url), {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params), send_req(HttpDb2, Params2, Cb). Added: couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl?rev=1043196&view=auto ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl (added) +++ couchdb/branches/new_replicator/src/couchdb/couch_httpc_pool.erl Tue Dec 7 20:57:33 2010 @@ -0,0 +1,117 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_httpc_pool). +-behaviour(gen_server). + +% public API +-export([start_link/2, stop/1]). +-export([get_worker/1]). +-export([release_worker/2]). + +% gen_server API +-export([init/1, handle_call/3, handle_info/2, handle_cast/2]). +-export([code_change/3, terminate/2]). + +-include("couch_db.hrl"). + +-record(state, { + host, + port, + free = [], + busy = [], + shutdown = false +}). + + +start_link(Host, Port) -> + gen_server:start_link(?MODULE, {Host, Port}, []). + + +stop(Pool) -> + ok = gen_server:call(Pool, stop, infinity). + + +get_worker(Pool) -> + gen_server:call(Pool, get_worker, infinity). + + +release_worker(Pool, Worker) -> + ok = gen_server:call(Pool, {release_worker, Worker}, infinity). + + +init({Host, Port}) -> + process_flag(trap_exit, true), + {ok, #state{host = Host, port = Port}}. + + +handle_call(get_worker, _From, #state{shutdown = true} = State) -> + {reply, {error, shutting_down}, State}; + +handle_call(get_worker, _From, + #state{free = [], busy = Busy, host = Host, port = Port} = State) -> + {ok, Worker} = ibrowse_http_client:start_link({Host, Port}), + {reply, {ok, Worker}, State#state{busy = [Worker | Busy]}}; + +handle_call(get_worker, _From, + #state{free = [Worker | RestFree], busy = Busy} = State) -> + {reply, {ok, Worker}, State#state{free = RestFree, busy = [Worker | Busy]}}; + +handle_call({release_worker, Worker}, _From, + #state{free = Free, busy = Busy, shutdown = Shutdown} = State) -> + case Busy -- [Worker] of + Busy -> + {reply, ok, State}; + [] when Shutdown =:= true -> + {stop, normal, ok, State}; + Busy2 -> + {reply, ok, State#state{free = [Worker | Free], busy = Busy2}} + end; + +handle_call(stop, _From, #state{shutdown = true} = State) -> + {reply, ok, State}; + +handle_call(stop, _From, #state{free = Free, busy = []} = State) -> + lists:foreach(fun ibrowse_http_client:stop/1, Free), + {stop, normal, ok, State}; + +handle_call(stop, _From, #state{free = Free} = State) -> + lists:foreach(fun ibrowse_http_client:stop/1, Free), + {reply, ok, State#state{free = [], shutdown = true}}. + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + + +handle_info({'EXIT', From, Reason}, + #state{free = Free, busy = Busy, shutdown = Shutdown} = State) -> + case Busy -- [From] of + Busy -> + case Free -- [From] of + Free -> + {stop, {unknown_process_died, {From, Reason}}, State}; + Free2 -> + {noreply, State#state{free = Free2}} + end; + [] when Shutdown =:= true -> + {stop, normal, State}; + Busy2 -> + {noreply, State#state{busy = Busy2}} + end. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +terminate(_Reason, _State) -> + ok.