Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9C8F8115D1 for ; Sat, 6 Sep 2014 11:16:52 +0000 (UTC) Received: (qmail 85309 invoked by uid 500); 6 Sep 2014 11:16:52 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 85123 invoked by uid 500); 6 Sep 2014 11:16:52 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 85109 invoked by uid 99); 6 Sep 2014 11:16:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Sep 2014 11:16:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DC97DA0C4BF; Sat, 6 Sep 2014 11:16:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rnewson@apache.org To: commits@couchdb.apache.org Date: Sat, 06 Sep 2014 11:16:52 -0000 Message-Id: <1d75b27ad41c4a2698eb99bafa7a8f63@git.apache.org> In-Reply-To: <3874b550e1484ed181d9230eabfedcdd@git.apache.org> References: <3874b550e1484ed181d9230eabfedcdd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] couch commit: updated refs/heads/master to db58e79 Fix bugs with couch_proc_manager limits This fixes the couch_proc_manager limit counting by rearranging the increment and decrements when processes are created and destroyed. It ensures that each time we remove a process from the ets table that we decrement appropriately. For incrementing, things are a bit more complicated in that we need to increment before inserting to the table. This is so that our hard limit applies even if one of our asynchronous spawn calls is opening a new process. This is accomplished by incrementing the counter and storing the async open call information in a new ets table. If the open is successful the counter is left untouched. If the open fails then we need to decrement the counter. This also simplifies starting waiting clients when a processes is either returned, exits, or fails to start by isolating the logic and calling it in each place as necessary. Closes COUCHDB-2321 Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/db58e794 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/db58e794 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/db58e794 Branch: refs/heads/master Commit: db58e794f937a52b6b61c964942e56afa7d03d8b Parents: 28a7f57 Author: Paul J. Davis Authored: Fri Sep 5 17:35:14 2014 -0500 Committer: Paul J. Davis Committed: Fri Sep 5 18:18:59 2014 -0500 ---------------------------------------------------------------------- src/couch_proc_manager.erl | 484 +++++++++++++++++++++++----------------- 1 file changed, 276 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/db58e794/src/couch_proc_manager.erl ---------------------------------------------------------------------- diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl index 9232fd7..8f29c00 100644 --- a/src/couch_proc_manager.erl +++ b/src/couch_proc_manager.erl @@ -39,12 +39,16 @@ -include_lib("couch/include/couch_db.hrl"). +-define(PROCS, couch_proc_manager_procs). +-define(WAITERS, couch_proc_manager_waiters). +-define(OPENING, couch_proc_manager_opening). + -record(state, { - tab, config, - proc_counts, - waiting, - threshold_ts + counts, + threshold_ts, + hard_limit, + soft_limit }). -record(client, { @@ -58,7 +62,7 @@ -record(proc_int, { pid, lang, - client = nil, + client, ddoc_keys = [], prompt_fun, set_timeout_fun, @@ -66,51 +70,70 @@ t0 = os:timestamp() }). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + get_proc_count() -> gen_server:call(?MODULE, get_proc_count). + get_stale_proc_count() -> gen_server:call(?MODULE, get_stale_proc_count). + reload() -> - gen_server:call(?MODULE, bump_threshold_ts). + gen_server:call(?MODULE, set_threshold_ts). + terminate_stale_procs() -> gen_server:call(?MODULE, terminate_stale_procs). + init([]) -> process_flag(trap_exit, true), - ok = config:listen_for_changes(?MODULE, nil), + ok = config:listen_for_changes(?MODULE, undefined), + + TableOpts = [public, named_table, ordered_set], + ets:new(?PROCS, TableOpts ++ [{keypos, #proc_int.pid}]), + ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]), + ets:new(?OPENING, [public, named_table, set]), + {ok, #state{ - tab = ets:new(procs, [ordered_set, {keypos, #proc_int.pid}]), config = get_proc_config(), - proc_counts = dict:new(), - waiting = ets:new(couch_proc_manage_waiting, - [ordered_set, {keypos, #client.timestamp}]) + counts = dict:new(), + threshold_ts = os:timestamp(), + hard_limit = get_hard_limit(), + soft_limit = get_soft_limit() }}. -handle_call(get_table, _From, State) -> - {reply, State#state.tab, State}; + +terminate(_Reason, _State) -> + ets:foldl(fun(#proc_int{pid=P}, _) -> + couch_util:shutdown_sync(P) + end, 0, ?PROCS), + ok. + handle_call(get_proc_count, _From, State) -> - {reply, ets:info(State#state.tab, size), State}; + NumProcs = ets:info(?PROCS, size), + NumOpening = ets:info(?OPENING, size), + {reply, NumProcs + NumOpening, State}; handle_call(get_stale_proc_count, _From, State) -> - #state{tab = Tab, threshold_ts = T0} = State, + #state{threshold_ts = T0} = State, MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', T0}], [true]}], - {reply, ets:select_count(Tab, MatchSpec), State}; + {reply, ets:select_count(?PROCS, MatchSpec), State}; handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) -> {ClientPid, _} = From, - Lang = couch_util:to_binary( - couch_util:get_value(<<"language">>, Props, <<"javascript">>)), + LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>), + Lang = couch_util:to_binary(LangStr), IterFun = fun(Proc, Acc) -> case lists:member(DDocKey, Proc#proc_int.ddoc_keys) of true -> - {stop, assign_proc(State#state.tab, ClientPid, Proc)}; + {stop, assign_proc(ClientPid, Proc)}; false -> {ok, Acc} end @@ -118,7 +141,7 @@ handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) -> TeachFun = fun(Proc0, Acc) -> try {ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0), - {stop, assign_proc(State#state.tab, ClientPid, Proc1)} + {stop, assign_proc(ClientPid, Proc1)} catch _:_ -> {ok, Acc} end @@ -129,60 +152,76 @@ handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) -> handle_call({get_proc, Lang}, From, State) -> {ClientPid, _} = From, IterFun = fun(Proc, _Acc) -> - {stop, assign_proc(State#state.tab, ClientPid, Proc)} + {stop, assign_proc(ClientPid, Proc)} end, Client = #client{from=From, lang=couch_util:to_binary(Lang)}, find_proc(State, Client, [IterFun]); -handle_call({ret_proc, #proc{client=Ref, lang=Lang0} = Proc}, _From, State) -> +handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) -> erlang:demonitor(Ref, [flush]), - Lang = couch_util:to_binary(Lang0), - % We need to check if the process is alive here, as the client could be - % handing us a #proc{} with a dead one. We would have already removed the - % #proc_int{} from our own table, so the alternative is to do a lookup in the - % table before the insert. Don't know which approach is cheaper. - {reply, true, return_proc(State, Proc#proc{lang=Lang})}; - -handle_call(bump_threshold_ts, _From, #state{tab = Tab} = State) -> - FoldFun = fun(#proc_int{client = nil, pid = Pid}, _) -> - remove_proc(Tab, Pid); - (_, _) -> - ok + NewState = case ets:lookup(?PROCS, Proc#proc.pid) of + [#proc_int{}=ProcInt] -> + return_proc(State, ProcInt); + [] -> + % Proc must've died and we already + % cleared it out of the table in + % the handle_info clause. + State + end, + {reply, true, NewState}; + +handle_call(set_threshold_ts, _From, State) -> + FoldFun = fun + (#proc_int{client = undefined} = Proc, StateAcc) -> + remove_proc(StateAcc, Proc); + (_, StateAcc) -> + StateAcc end, - ets:foldl(FoldFun, nil, Tab), - {reply, ok, State#state{threshold_ts = os:timestamp()}}; + NewState = ets:foldl(FoldFun, State, ?PROCS), + {reply, ok, NewState#state{threshold_ts = os:timestamp()}}; -handle_call(terminate_stale_procs, _From, State) -> - #state{tab = Tab, threshold_ts = T0} = State, - MatchHead = #proc_int{pid = '$1', t0 = '$2', _ = '_'}, - MatchSpec = [{MatchHead, [{'<', '$2', T0}], ['$1']}], - lists:foreach(fun(P) -> remove_proc(Tab,P) end, ets:select(Tab, MatchSpec)), - {reply, ok, State}; +handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) -> + FoldFun = fun + (#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) -> + case Ts1 > Ts2 of + true -> + remove_proc(StateAcc, Proc); + false -> + StateAcc + end; + (_, StateAcc) -> + StateAcc + end, + NewState = ets:foldl(FoldFun, State, ?PROCS), + {reply, ok, NewState}; handle_call(_Call, _From, State) -> {reply, ignored, State}. -handle_cast({os_proc_idle, Pid}, #state{tab=Tab, proc_counts=Counts}=State0) -> - Limit = list_to_integer( - config:get("query_server_config", "os_process_soft_limit", "100")), - State = case ets:lookup(Tab, Pid) of - [#proc_int{client=nil, lang=Lang}] -> + +handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) -> + NewState = case ets:lookup(?PROCS, Pid) of + [#proc_int{client=undefined, lang=Lang}=Proc] -> case dict:find(Lang, Counts) of - {ok, Count} when Count > Limit -> + {ok, Count} when Count >= State#state.soft_limit -> ?LOG_INFO("Closing idle OS Process: ~p", [Pid]), - remove_proc(Tab, Pid), - State0#state{ - proc_counts=dict:update_counter(Lang, -1, Counts) - }; + remove_proc(State, Proc); {ok, _} -> - State0 + State end; _ -> - State0 + State end, - {noreply, State}; + {noreply, NewState}; + handle_cast(reload_config, State) -> - {noreply, State#state{config = get_proc_config()}}; + NewState = State#state{ + config = get_proc_config(), + hard_limit = get_hard_limit(), + soft_limit = get_soft_limit() + }, + {noreply, flush_waiters(NewState)}; + handle_cast(_Msg, State) -> {noreply, State}. @@ -190,38 +229,37 @@ handle_cast(_Msg, State) -> handle_info(shutdown, State) -> {stop, shutdown, State}; -handle_info({'EXIT', _, {ok, Proc0, {ClientPid,_} = From}}, State) -> +handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) -> + ets:delete(?OPENING, Pid), link(Proc0#proc_int.pid), - Proc = assign_proc(State#state.tab, ClientPid, Proc0), + Proc = assign_proc(ClientPid, Proc0), gen_server:reply(From, {ok, Proc, State#state.config}), {noreply, State}; +handle_info({'EXIT', Pid, spawn_error}, State) -> + [{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid), + ets:delete(?OPENING, Pid), + NewState = State#state{ + counts = dict:update_counter(Lang, -1, State#state.counts) + }, + {noreply, flush_waiters(NewState, Lang)}; + handle_info({'EXIT', Pid, Reason}, State) -> - #state{proc_counts=Counts, waiting=Waiting} = State, ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]), - MaybeProc = ets:lookup(State#state.tab, Pid), - ets:delete(State#state.tab, Pid), - case MaybeProc of - [#proc_int{lang=Lang}] -> - case get_waiting_client(Waiting, Lang) of - nil -> - {noreply, State#state{ - proc_counts=dict:update_counter(Lang, -1, Counts) - }}; - Client -> - spawn_link(?MODULE, new_proc, [Client]), - {noreply, State} - end; + case ets:lookup(?PROCS, Pid) of + [#proc_int{} = Proc] -> + NewState = remove_proc(State, Proc), + {noreply, flush_waiters(NewState, Proc#proc_int.lang)}; [] -> {noreply, State} end; handle_info({'DOWN', Ref, _, _, _Reason}, State0) -> - case ets:match_object(State0#state.tab, #proc_int{client=Ref, _='_'}) of - [] -> - {noreply, State0}; - [#proc_int{} = Proc] -> - {noreply, return_proc(State0, Proc)} + case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of + [#proc_int{} = Proc] -> + {noreply, return_proc(State0, Proc)}; + [] -> + {noreply, State0} end; handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) -> @@ -229,31 +267,32 @@ handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) -> {noreply, State}; handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), - {noreply, State}; + ok = config:listen_for_changes(?MODULE, undefined), + % Reload our config in case it changed in the last + % five seconds. + handle_cast(reload, State); handle_info(_Msg, State) -> {noreply, State}. -terminate(_Reason, #state{tab=Tab}) -> - ets:foldl(fun(#proc_int{pid=P}, _) -> couch_util:shutdown_sync(P) end, 0, Tab), - ok. code_change(_OldVsn, #state{}=State, _Extra) -> {ok, State}. + handle_config_change("query_server_config", _, _, _, _) -> gen_server:cast(?MODULE, reload_config), - {ok, nil}; + {ok, undefined}; handle_config_change(_, _, _, _, _) -> - {ok, nil}. - -find_proc(State, Client, [Fun|FindFuns]) -> - try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of - {not_found, _} -> - find_proc(State, Client, FindFuns); - {ok, Proc} -> - {reply, {ok, Proc, State#state.config}, State} + {ok, undefined}. + + +find_proc(State, Client, [Fun | FindFuns]) -> + try iter_procs(Client#client.lang, Fun, undefined) of + {not_found, _} -> + find_proc(State, Client, FindFuns); + {ok, Proc} -> + {reply, {ok, Proc, State#state.config}, State} catch error:Reason -> ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]), {reply, {error, Reason}, State} @@ -261,62 +300,96 @@ find_proc(State, Client, [Fun|FindFuns]) -> find_proc(State, Client, []) -> {noreply, maybe_spawn_proc(State, Client)}. -iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) -> - iter_procs(Tab, list_to_binary(Lang), Fun, Acc); -iter_procs(Tab, Lang, Fun, Acc) -> - Pattern = #proc_int{lang=Lang, client=nil, _='_'}, + +iter_procs(Lang, Fun, Acc) when is_binary(Lang) -> + Pattern = #proc_int{lang=Lang, client=undefined, _='_'}, MSpec = [{Pattern, [], ['$_']}], - case ets:select_reverse(Tab, MSpec, 25) of + case ets:select_reverse(?PROCS, MSpec, 25) of '$end_of_table' -> {not_found, Acc}; Continuation -> - iter_procs(Continuation, Fun, Acc) + iter_procs_int(Continuation, Fun, Acc) end. -iter_procs({[], Continuation0}, Fun, Acc) -> + +iter_procs_int({[], Continuation0}, Fun, Acc) -> case ets:select_reverse(Continuation0) of '$end_of_table' -> {not_found, Acc}; Continuation1 -> - iter_procs(Continuation1, Fun, Acc) + iter_procs_int(Continuation1, Fun, Acc) end; -iter_procs({[Proc | Rest], Continuation}, Fun, Acc0) -> +iter_procs_int({[Proc | Rest], Continuation}, Fun, Acc0) -> case Fun(Proc, Acc0) of {ok, Acc1} -> - iter_procs({Rest, Continuation}, Fun, Acc1); + iter_procs_int({Rest, Continuation}, Fun, Acc1); {stop, Acc1} -> {ok, Acc1} end. + +maybe_spawn_proc(State, Client) -> + case dict:find(Client#client.lang, State#state.counts) of + {ok, Count} when Count >= State#state.hard_limit -> + add_waiting_client(Client), + State; + _ -> + spawn_proc(State, Client) + end. + + +spawn_proc(State, Client) -> + Pid = spawn_link(?MODULE, new_proc, [Client]), + ets:insert(?OPENING, {Pid, Client}), + Counts = State#state.counts, + Lang = Client#client.lang, + State#state{ + counts = dict:update_counter(Lang, 1, Counts) + }. + + new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) -> #client{from=From, lang=Lang} = Client, - case new_proc_int(From, Lang) of - {ok, Proc} -> - exit({ok, Proc, From}); - Error -> - gen_server:reply(From, {error, Error}) - end; + Resp = try + case new_proc_int(From, Lang) of + {ok, Proc} -> + {spawn_ok, Proc, From}; + Error -> + gen_server:reply(From, {error, Error}), + spawn_error + end + catch _:_ -> + spawn_error + end, + exit(Resp); new_proc(Client) -> #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client, - case new_proc_int(From, Lang) of - {ok, NewProc} -> - case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of - {ok, Proc} -> - exit({ok, Proc, From}); - {error, Reason} -> - gen_server:reply(From, {error, Reason}) - end; - Error -> - gen_server:reply(From, {error, Error}) - end. + Resp = try + case new_proc_int(From, Lang) of + {ok, NewProc} -> + case teach_ddoc(DDoc, DDocKey, NewProc) of + {ok, Proc} -> + {spawn_ok, Proc, From}; + {error, Reason} -> + gen_server:reply(From, {error, Reason}), + spawn_error + end; + Error -> + gen_server:reply(From, {error, Error}), + spawn_error + end + catch _:_ -> + spawn_error + end, + exit(Resp). + new_proc_int(From, Lang) when is_binary(Lang) -> - new_proc_int(From, binary_to_list(Lang)); -new_proc_int(From, Lang) when is_list(Lang) -> - case config:get("query_servers", Lang) of + LangStr = binary_to_list(Lang), + case config:get("query_servers", LangStr) of undefined -> - case config:get("native_query_servers", Lang) of + case config:get("native_query_servers", LangStr) of undefined -> gen_server:reply(From, {unknown_query_language, Lang}); SpecStr -> @@ -329,23 +402,6 @@ new_proc_int(From, Lang) when is_list(Lang) -> make_proc(Pid, Lang, couch_os_process) end. -proc_with_ddoc(DDoc, DDocKey, Procs) -> - Filter = fun(#proc_int{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end, - case lists:dropwhile(Filter, Procs) of - [DDocProc|_] -> - {ok, DDocProc}; - [] -> - teach_any_proc(DDoc, DDocKey, Procs) - end. - -teach_any_proc(DDoc, DDocKey, [Proc|Rest]) -> - try - teach_ddoc(DDoc, DDocKey, Proc) - catch _:_ -> - teach_any_proc(DDoc, DDocKey, Rest) - end; -teach_any_proc(_, _, []) -> - {error, noproc}. teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) -> % send ddoc over the wire @@ -360,9 +416,10 @@ teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) -> % add ddoc to the proc {ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}. -make_proc(Pid, Lang, Mod) -> + +make_proc(Pid, Lang, Mod) when is_binary(Lang) -> Proc = #proc_int{ - lang = couch_util:to_binary(Lang), + lang = Lang, pid = Pid, prompt_fun = {Mod, prompt}, set_timeout_fun = {Mod, set_timeout}, @@ -371,97 +428,98 @@ make_proc(Pid, Lang, Mod) -> unlink(Pid), {ok, Proc}. -assign_proc(Tab, ClientPid, #proc_int{client=nil}=Proc0) when is_pid(ClientPid) -> - Proc = Proc0#proc_int{client = erlang:monitor(process, ClientPid)}, - ets:insert(Tab, Proc), + +assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) -> + Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)}, + ets:insert(?PROCS, Proc), export_proc(Proc); -assign_proc(Tab, #client{}=Client, #proc_int{client=nil}=Proc) -> +assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) -> {Pid, _} = Client#client.from, - assign_proc(Tab, Pid, Proc). + assign_proc(Pid, Proc). + -return_proc(#state{} = State, #proc{} = Proc) -> - case ets:lookup(State#state.tab, Proc#proc.pid) of - [#proc_int{}=ProcInt] -> - return_proc(State, ProcInt); - [] -> - % Proc must've died and we already - % cleared it out of the table in - % the handle_info clause. - ok - end; return_proc(#state{} = State, #proc_int{} = ProcInt) -> - #state{tab = Tab, waiting = Waiting, threshold_ts = T0} = State, #proc_int{pid = Pid, lang = Lang} = ProcInt, - case is_process_alive(Pid) of true -> - case get_waiting_client(Waiting, Lang) of - nil -> - if ProcInt#proc_int.t0 < T0 -> - remove_proc(Tab, Pid); - true -> - gen_server:cast(Pid, garbage_collect), - ets:insert(Tab, ProcInt#proc_int{client=nil}) - end, - State; - #client{}=Client -> - From = Client#client.from, - assign_proc(Tab, Client, ProcInt#proc_int{client=nil}), - gen_server:reply(From, {ok, ProcInt, State#state.config}), + NewState = case is_process_alive(Pid) of true -> + case ProcInt#proc_int.t0 < State#state.threshold_ts of + true -> + remove_proc(State, Pid); + false -> + gen_server:cast(Pid, garbage_collect), + true = ets:update_element(?PROCS, Pid, [ + {#proc_int.client, undefined} + ]), State end; false -> - ets:delete(Tab, Pid), - case get_waiting_client(Waiting, Lang) of - nil -> - State; - #client{}=Client -> - maybe_spawn_proc(State, Client) - end - end. + remove_proc(State, ProcInt) + end, + flush_waiters(NewState, Lang). + -remove_proc(Tab, Pid) -> - ets:delete(Tab, Pid), - case is_process_alive(Pid) of true -> - unlink(Pid), - gen_server:cast(Pid, stop); +remove_proc(State, #proc_int{}=Proc) -> + ets:delete(?PROCS, Proc#proc_int.pid), + case is_process_alive(Proc#proc_int.pid) of true -> + unlink(Proc#proc_int.pid), + gen_server:cast(Proc#proc_int.pid, stop); false -> ok - end. + end, + Counts = State#state.counts, + Lang = Proc#proc_int.lang, + State#state{ + counts = dict:update_counter(Lang, -1, Counts) + }. + -spec export_proc(#proc_int{}) -> #proc{}. export_proc(#proc_int{} = ProcInt) -> - [_ | Data] = lists:sublist(tuple_to_list(ProcInt), record_info(size, proc)), + ProcIntList = tuple_to_list(ProcInt), + ProcLen = record_info(size, proc), + [_ | Data] = lists:sublist(ProcIntList, ProcLen), list_to_tuple([proc | Data]). -maybe_spawn_proc(State, Client) -> - #state{proc_counts=Counts, waiting=Waiting} = State, - #client{lang=Lang} = Client, - Limit = list_to_integer(config:get( - "query_server_config", "os_process_limit", "100")), - case dict:find(Lang, Counts) of - {ok, Limit} -> - add_waiting_client(Waiting, Client), - State; - _ -> - spawn_link(?MODULE, new_proc, [Client]), - State#state{ - proc_counts=dict:update_counter(Lang, 1, Counts) - } + +flush_waiters(State) -> + dict:fold(fun(Lang, Count, StateAcc) -> + case Count < State#state.hard_limit of + true -> + flush_waiters(StateAcc, Lang); + false -> + StateAcc + end + end, State, State#state.counts). + + +flush_waiters(State, Lang) -> + case dict:fetch(Lang, State#state.counts) of + Count when Count < State#state.hard_limit -> + case get_waiting_client(Lang) of + #client{} = Client -> + NewState = spawn_proc(State, Client), + flush_waiters(NewState, Lang); + undefined -> + State + end; + _ -> + State end. -add_waiting_client(Tab, Client) -> - ets:insert(Tab, Client#client{timestamp=os:timestamp()}). -get_waiting_client(Tab, Lang) when is_list(Lang) -> - get_waiting_client(Tab, couch_util:to_binary(Lang)); -get_waiting_client(Tab, Lang) -> - case ets:match_object(Tab, #client{lang=Lang, _='_'}, 1) of +add_waiting_client(Client) -> + ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}). + + +get_waiting_client(Lang) -> + case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of '$end_of_table' -> - nil; + undefined; {[#client{}=Client], _} -> - ets:delete(Tab, Client#client.timestamp), + ets:delete(?WAITERS, Client#client.timestamp), Client end. + get_proc_config() -> Limit = config:get("query_server_config", "reduce_limit", "true"), Timeout = config:get("couchdb", "os_process_timeout", "5000"), @@ -469,3 +527,13 @@ get_proc_config() -> {<<"reduce_limit">>, list_to_atom(Limit)}, {<<"timeout">>, list_to_integer(Timeout)} ]}. + + +get_hard_limit() -> + LimStr = config:get("query_server_config", "os_process_limit", "100"), + list_to_integer(LimStr). + + +get_soft_limit() -> + LimStr = config:get("query_server_config", "os_process_soft_limit", "100"), + list_to_integer(LimStr).