couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [2/2] couch commit: updated refs/heads/master to db58e79
Date Sat, 06 Sep 2014 11:16:52 GMT
Fix bugs with couch_proc_manager limits

This fixes the couch_proc_manager limit counting by rearranging the increment and decrements
when processes are created and destroyed. It ensures that each time we remove a process from
the ets table that we decrement appropriately.

For incrementing, things are a bit more complicated in that we need to increment before inserting
to the table. This is so that our hard limit applies even if one of our asynchronous spawn
calls is opening a new process. This is accomplished by incrementing the counter and storing
the async open call information in a new ets table. If the open is successful the counter
is left untouched. If the open fails then we need to decrement the counter.

This also simplifies starting waiting clients when a processes is either returned, exits,
or fails to start by isolating the logic and calling it in each place as necessary.

Closes COUCHDB-2321


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/db58e794
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/db58e794
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/db58e794

Branch: refs/heads/master
Commit: db58e794f937a52b6b61c964942e56afa7d03d8b
Parents: 28a7f57
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Fri Sep 5 17:35:14 2014 -0500
Committer: Paul J. Davis <paul.joseph.davis@gmail.com>
Committed: Fri Sep 5 18:18:59 2014 -0500

----------------------------------------------------------------------
 src/couch_proc_manager.erl | 484 +++++++++++++++++++++++-----------------
 1 file changed, 276 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/db58e794/src/couch_proc_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl
index 9232fd7..8f29c00 100644
--- a/src/couch_proc_manager.erl
+++ b/src/couch_proc_manager.erl
@@ -39,12 +39,16 @@
 
 -include_lib("couch/include/couch_db.hrl").
 
+-define(PROCS, couch_proc_manager_procs).
+-define(WAITERS, couch_proc_manager_waiters).
+-define(OPENING, couch_proc_manager_opening).
+
 -record(state, {
-    tab,
     config,
-    proc_counts,
-    waiting,
-    threshold_ts
+    counts,
+    threshold_ts,
+    hard_limit,
+    soft_limit
 }).
 
 -record(client, {
@@ -58,7 +62,7 @@
 -record(proc_int, {
     pid,
     lang,
-    client = nil,
+    client,
     ddoc_keys = [],
     prompt_fun,
     set_timeout_fun,
@@ -66,51 +70,70 @@
     t0 = os:timestamp()
 }).
 
+
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
+
 get_proc_count() ->
     gen_server:call(?MODULE, get_proc_count).
 
+
 get_stale_proc_count() ->
     gen_server:call(?MODULE, get_stale_proc_count).
 
+
 reload() ->
-    gen_server:call(?MODULE, bump_threshold_ts).
+    gen_server:call(?MODULE, set_threshold_ts).
+
 
 terminate_stale_procs() ->
     gen_server:call(?MODULE, terminate_stale_procs).
 
+
 init([]) ->
     process_flag(trap_exit, true),
-    ok = config:listen_for_changes(?MODULE, nil),
+    ok = config:listen_for_changes(?MODULE, undefined),
+
+    TableOpts = [public, named_table, ordered_set],
+    ets:new(?PROCS, TableOpts ++ [{keypos, #proc_int.pid}]),
+    ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]),
+    ets:new(?OPENING, [public, named_table, set]),
+
     {ok, #state{
-        tab = ets:new(procs, [ordered_set, {keypos, #proc_int.pid}]),
         config = get_proc_config(),
-        proc_counts = dict:new(),
-        waiting = ets:new(couch_proc_manage_waiting,
-                [ordered_set, {keypos, #client.timestamp}])
+        counts = dict:new(),
+        threshold_ts = os:timestamp(),
+        hard_limit = get_hard_limit(),
+        soft_limit = get_soft_limit()
     }}.
 
-handle_call(get_table, _From, State) ->
-    {reply, State#state.tab, State};
+
+terminate(_Reason, _State) ->
+    ets:foldl(fun(#proc_int{pid=P}, _) ->
+        couch_util:shutdown_sync(P)
+    end, 0, ?PROCS),
+    ok.
+
 
 handle_call(get_proc_count, _From, State) ->
-    {reply, ets:info(State#state.tab, size), State};
+    NumProcs = ets:info(?PROCS, size),
+    NumOpening = ets:info(?OPENING, size),
+    {reply, NumProcs + NumOpening, State};
 
 handle_call(get_stale_proc_count, _From, State) ->
-    #state{tab = Tab, threshold_ts = T0} = State,
+    #state{threshold_ts = T0} = State,
     MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', T0}], [true]}],
-    {reply, ets:select_count(Tab, MatchSpec), State};
+    {reply, ets:select_count(?PROCS, MatchSpec), State};
 
 handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
     {ClientPid, _} = From,
-    Lang = couch_util:to_binary(
-            couch_util:get_value(<<"language">>, Props, <<"javascript">>)),
+    LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    Lang = couch_util:to_binary(LangStr),
     IterFun = fun(Proc, Acc) ->
         case lists:member(DDocKey, Proc#proc_int.ddoc_keys) of
             true ->
-                {stop, assign_proc(State#state.tab, ClientPid, Proc)};
+                {stop, assign_proc(ClientPid, Proc)};
             false ->
                 {ok, Acc}
         end
@@ -118,7 +141,7 @@ handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State)
->
     TeachFun = fun(Proc0, Acc) ->
         try
             {ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0),
-            {stop, assign_proc(State#state.tab, ClientPid, Proc1)}
+            {stop, assign_proc(ClientPid, Proc1)}
         catch _:_ ->
             {ok, Acc}
         end
@@ -129,60 +152,76 @@ handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State)
->
 handle_call({get_proc, Lang}, From, State) ->
     {ClientPid, _} = From,
     IterFun = fun(Proc, _Acc) ->
-        {stop, assign_proc(State#state.tab, ClientPid, Proc)}
+        {stop, assign_proc(ClientPid, Proc)}
     end,
     Client = #client{from=From, lang=couch_util:to_binary(Lang)},
     find_proc(State, Client, [IterFun]);
 
-handle_call({ret_proc, #proc{client=Ref, lang=Lang0} = Proc}, _From, State) ->
+handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
     erlang:demonitor(Ref, [flush]),
-    Lang = couch_util:to_binary(Lang0),
-    % We need to check if the process is alive here, as the client could be
-    % handing us a #proc{} with a dead one.  We would have already removed the
-    % #proc_int{} from our own table, so the alternative is to do a lookup in the
-    % table before the insert.  Don't know which approach is cheaper.
-    {reply, true, return_proc(State, Proc#proc{lang=Lang})};
-
-handle_call(bump_threshold_ts, _From, #state{tab = Tab} = State) ->
-    FoldFun = fun(#proc_int{client = nil, pid = Pid}, _) ->
-        remove_proc(Tab, Pid);
-    (_, _) ->
-        ok
+    NewState = case ets:lookup(?PROCS, Proc#proc.pid) of
+        [#proc_int{}=ProcInt] ->
+            return_proc(State, ProcInt);
+        [] ->
+            % Proc must've died and we already
+            % cleared it out of the table in
+            % the handle_info clause.
+            State
+    end,
+    {reply, true, NewState};
+
+handle_call(set_threshold_ts, _From, State) ->
+    FoldFun = fun
+        (#proc_int{client = undefined} = Proc, StateAcc) ->
+            remove_proc(StateAcc, Proc);
+        (_, StateAcc) ->
+            StateAcc
     end,
-    ets:foldl(FoldFun, nil, Tab),
-    {reply, ok, State#state{threshold_ts = os:timestamp()}};
+    NewState = ets:foldl(FoldFun, State, ?PROCS),
+    {reply, ok, NewState#state{threshold_ts = os:timestamp()}};
 
-handle_call(terminate_stale_procs, _From, State) ->
-    #state{tab = Tab, threshold_ts = T0} = State,
-    MatchHead = #proc_int{pid = '$1', t0 = '$2', _ = '_'},
-    MatchSpec = [{MatchHead, [{'<', '$2', T0}], ['$1']}],
-    lists:foreach(fun(P) -> remove_proc(Tab,P) end, ets:select(Tab, MatchSpec)),
-    {reply, ok, State};
+handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) ->
+    FoldFun = fun
+        (#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) ->
+            case Ts1 > Ts2 of
+                true ->
+                    remove_proc(StateAcc, Proc);
+                false ->
+                    StateAcc
+            end;
+        (_, StateAcc) ->
+            StateAcc
+    end,
+    NewState = ets:foldl(FoldFun, State, ?PROCS),
+    {reply, ok, NewState};
 
 handle_call(_Call, _From, State) ->
     {reply, ignored, State}.
 
-handle_cast({os_proc_idle, Pid}, #state{tab=Tab, proc_counts=Counts}=State0) ->
-    Limit = list_to_integer(
-            config:get("query_server_config", "os_process_soft_limit", "100")),
-    State = case ets:lookup(Tab, Pid) of
-        [#proc_int{client=nil, lang=Lang}] ->
+
+handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) ->
+    NewState = case ets:lookup(?PROCS, Pid) of
+        [#proc_int{client=undefined, lang=Lang}=Proc] ->
             case dict:find(Lang, Counts) of
-                {ok, Count} when Count > Limit ->
+                {ok, Count} when Count >= State#state.soft_limit ->
                     ?LOG_INFO("Closing idle OS Process: ~p", [Pid]),
-                    remove_proc(Tab, Pid),
-                    State0#state{
-                        proc_counts=dict:update_counter(Lang, -1, Counts)
-                    };
+                    remove_proc(State, Proc);
                 {ok, _} ->
-                    State0
+                    State
             end;
         _ ->
-            State0
+            State
     end,
-    {noreply, State};
+    {noreply, NewState};
+
 handle_cast(reload_config, State) ->
-    {noreply, State#state{config = get_proc_config()}};
+    NewState = State#state{
+        config = get_proc_config(),
+        hard_limit = get_hard_limit(),
+        soft_limit = get_soft_limit()
+    },
+    {noreply, flush_waiters(NewState)};
+
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
@@ -190,38 +229,37 @@ handle_cast(_Msg, State) ->
 handle_info(shutdown, State) ->
     {stop, shutdown, State};
 
-handle_info({'EXIT', _, {ok, Proc0, {ClientPid,_} = From}}, State) ->
+handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) ->
+    ets:delete(?OPENING, Pid),
     link(Proc0#proc_int.pid),
-    Proc = assign_proc(State#state.tab, ClientPid, Proc0),
+    Proc = assign_proc(ClientPid, Proc0),
     gen_server:reply(From, {ok, Proc, State#state.config}),
     {noreply, State};
 
+handle_info({'EXIT', Pid, spawn_error}, State) ->
+    [{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid),
+    ets:delete(?OPENING, Pid),
+    NewState = State#state{
+        counts = dict:update_counter(Lang, -1, State#state.counts)
+    },
+    {noreply, flush_waiters(NewState, Lang)};
+
 handle_info({'EXIT', Pid, Reason}, State) ->
-    #state{proc_counts=Counts, waiting=Waiting} = State,
     ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
-    MaybeProc = ets:lookup(State#state.tab, Pid),
-    ets:delete(State#state.tab, Pid),
-    case MaybeProc of
-        [#proc_int{lang=Lang}] ->
-            case get_waiting_client(Waiting, Lang) of
-                nil ->
-                    {noreply, State#state{
-                        proc_counts=dict:update_counter(Lang, -1, Counts)
-                    }};
-                Client ->
-                    spawn_link(?MODULE, new_proc, [Client]),
-                    {noreply, State}
-            end;
+    case ets:lookup(?PROCS, Pid) of
+        [#proc_int{} = Proc] ->
+            NewState = remove_proc(State, Proc),
+            {noreply, flush_waiters(NewState, Proc#proc_int.lang)};
         [] ->
             {noreply, State}
     end;
 
 handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
-    case ets:match_object(State0#state.tab, #proc_int{client=Ref, _='_'}) of
-    [] ->
-        {noreply, State0};
-    [#proc_int{} = Proc] ->
-        {noreply, return_proc(State0, Proc)}
+    case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of
+        [#proc_int{} = Proc] ->
+            {noreply, return_proc(State0, Proc)};
+        [] ->
+            {noreply, State0}
     end;
 
 handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
@@ -229,31 +267,32 @@ handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State)
->
     {noreply, State};
 
 handle_info(restart_config_listener, State) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    {noreply, State};
+    ok = config:listen_for_changes(?MODULE, undefined),
+    % Reload our config in case it changed in the last
+    % five seconds.
+    handle_cast(reload, State);
 
 handle_info(_Msg, State) ->
     {noreply, State}.
 
-terminate(_Reason, #state{tab=Tab}) ->
-    ets:foldl(fun(#proc_int{pid=P}, _) -> couch_util:shutdown_sync(P) end, 0, Tab),
-    ok.
 
 code_change(_OldVsn, #state{}=State, _Extra) ->
     {ok, State}.
 
+
 handle_config_change("query_server_config", _, _, _, _) ->
     gen_server:cast(?MODULE, reload_config),
-    {ok, nil};
+    {ok, undefined};
 handle_config_change(_, _, _, _, _) ->
-    {ok, nil}.
-
-find_proc(State, Client, [Fun|FindFuns]) ->
-    try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of
-    {not_found, _} ->
-        find_proc(State, Client, FindFuns);
-    {ok, Proc} ->
-        {reply, {ok, Proc, State#state.config}, State}
+    {ok, undefined}.
+
+
+find_proc(State, Client, [Fun | FindFuns]) ->
+    try iter_procs(Client#client.lang, Fun, undefined) of
+        {not_found, _} ->
+            find_proc(State, Client, FindFuns);
+        {ok, Proc} ->
+            {reply, {ok, Proc, State#state.config}, State}
     catch error:Reason ->
         ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
         {reply, {error, Reason}, State}
@@ -261,62 +300,96 @@ find_proc(State, Client, [Fun|FindFuns]) ->
 find_proc(State, Client, []) ->
     {noreply, maybe_spawn_proc(State, Client)}.
 
-iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) ->
-    iter_procs(Tab, list_to_binary(Lang), Fun, Acc);
-iter_procs(Tab, Lang, Fun, Acc) ->
-    Pattern = #proc_int{lang=Lang, client=nil, _='_'},
+
+iter_procs(Lang, Fun, Acc) when is_binary(Lang) ->
+    Pattern = #proc_int{lang=Lang, client=undefined, _='_'},
     MSpec = [{Pattern, [], ['$_']}],
-    case ets:select_reverse(Tab, MSpec, 25) of
+    case ets:select_reverse(?PROCS, MSpec, 25) of
         '$end_of_table' ->
             {not_found, Acc};
         Continuation ->
-            iter_procs(Continuation, Fun, Acc)
+            iter_procs_int(Continuation, Fun, Acc)
     end.
 
-iter_procs({[], Continuation0}, Fun, Acc) ->
+
+iter_procs_int({[], Continuation0}, Fun, Acc) ->
     case ets:select_reverse(Continuation0) of
         '$end_of_table' ->
             {not_found, Acc};
         Continuation1 ->
-            iter_procs(Continuation1, Fun, Acc)
+            iter_procs_int(Continuation1, Fun, Acc)
     end;
-iter_procs({[Proc | Rest], Continuation}, Fun, Acc0) ->
+iter_procs_int({[Proc | Rest], Continuation}, Fun, Acc0) ->
     case Fun(Proc, Acc0) of
         {ok, Acc1} ->
-            iter_procs({Rest, Continuation}, Fun, Acc1);
+            iter_procs_int({Rest, Continuation}, Fun, Acc1);
         {stop, Acc1} ->
             {ok, Acc1}
     end.
 
+
+maybe_spawn_proc(State, Client) ->
+    case dict:find(Client#client.lang, State#state.counts) of
+    {ok, Count} when Count >= State#state.hard_limit ->
+        add_waiting_client(Client),
+        State;
+    _ ->
+        spawn_proc(State, Client)
+    end.
+
+
+spawn_proc(State, Client) ->
+    Pid = spawn_link(?MODULE, new_proc, [Client]),
+    ets:insert(?OPENING, {Pid, Client}),
+    Counts = State#state.counts,
+    Lang = Client#client.lang,
+    State#state{
+        counts = dict:update_counter(Lang, 1, Counts)
+    }.
+
+
 new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
     #client{from=From, lang=Lang} = Client,
-    case new_proc_int(From, Lang) of
-    {ok, Proc} ->
-        exit({ok, Proc, From});
-    Error ->
-        gen_server:reply(From, {error, Error})
-    end;
+    Resp = try
+        case new_proc_int(From, Lang) of
+            {ok, Proc} ->
+                {spawn_ok, Proc, From};
+            Error ->
+                gen_server:reply(From, {error, Error}),
+                spawn_error
+        end
+    catch _:_ ->
+        spawn_error
+    end,
+    exit(Resp);
 
 new_proc(Client) ->
     #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
-    case new_proc_int(From, Lang) of
-    {ok, NewProc} ->
-        case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of
-        {ok, Proc} ->
-            exit({ok, Proc, From});
-        {error, Reason} ->
-            gen_server:reply(From, {error, Reason})
-        end;
-    Error ->
-        gen_server:reply(From, {error, Error})
-    end.
+    Resp = try
+        case new_proc_int(From, Lang) of
+        {ok, NewProc} ->
+            case teach_ddoc(DDoc, DDocKey, NewProc) of
+            {ok, Proc} ->
+                {spawn_ok, Proc, From};
+            {error, Reason} ->
+                gen_server:reply(From, {error, Reason}),
+                spawn_error
+            end;
+        Error ->
+            gen_server:reply(From, {error, Error}),
+            spawn_error
+        end
+    catch _:_ ->
+        spawn_error
+    end,
+    exit(Resp).
+
 
 new_proc_int(From, Lang) when is_binary(Lang) ->
-    new_proc_int(From, binary_to_list(Lang));
-new_proc_int(From, Lang) when is_list(Lang) ->
-    case config:get("query_servers", Lang) of
+    LangStr = binary_to_list(Lang),
+    case config:get("query_servers", LangStr) of
     undefined ->
-        case config:get("native_query_servers", Lang) of
+        case config:get("native_query_servers", LangStr) of
         undefined ->
             gen_server:reply(From, {unknown_query_language, Lang});
         SpecStr ->
@@ -329,23 +402,6 @@ new_proc_int(From, Lang) when is_list(Lang) ->
         make_proc(Pid, Lang, couch_os_process)
     end.
 
-proc_with_ddoc(DDoc, DDocKey, Procs) ->
-    Filter = fun(#proc_int{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
-    case lists:dropwhile(Filter, Procs) of
-    [DDocProc|_] ->
-        {ok, DDocProc};
-    [] ->
-        teach_any_proc(DDoc, DDocKey, Procs)
-    end.
-
-teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
-    try
-        teach_ddoc(DDoc, DDocKey, Proc)
-    catch _:_ ->
-        teach_any_proc(DDoc, DDocKey, Rest)
-    end;
-teach_any_proc(_, _, []) ->
-    {error, noproc}.
 
 teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) ->
     % send ddoc over the wire
@@ -360,9 +416,10 @@ teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc)
->
     % add ddoc to the proc
     {ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}.
 
-make_proc(Pid, Lang, Mod) ->
+
+make_proc(Pid, Lang, Mod) when is_binary(Lang) ->
     Proc = #proc_int{
-        lang = couch_util:to_binary(Lang),
+        lang = Lang,
         pid = Pid,
         prompt_fun = {Mod, prompt},
         set_timeout_fun = {Mod, set_timeout},
@@ -371,97 +428,98 @@ make_proc(Pid, Lang, Mod) ->
     unlink(Pid),
     {ok, Proc}.
 
-assign_proc(Tab, ClientPid, #proc_int{client=nil}=Proc0) when is_pid(ClientPid) ->
-    Proc = Proc0#proc_int{client = erlang:monitor(process, ClientPid)},
-    ets:insert(Tab, Proc),
+
+assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) ->
+    Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)},
+    ets:insert(?PROCS, Proc),
     export_proc(Proc);
-assign_proc(Tab, #client{}=Client, #proc_int{client=nil}=Proc) ->
+assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) ->
     {Pid, _} = Client#client.from,
-    assign_proc(Tab, Pid, Proc).
+    assign_proc(Pid, Proc).
+
 
-return_proc(#state{} = State, #proc{} = Proc) ->
-    case ets:lookup(State#state.tab, Proc#proc.pid) of
-        [#proc_int{}=ProcInt] ->
-            return_proc(State, ProcInt);
-        [] ->
-            % Proc must've died and we already
-            % cleared it out of the table in
-            % the handle_info clause.
-            ok
-    end;
 return_proc(#state{} = State, #proc_int{} = ProcInt) ->
-    #state{tab = Tab, waiting = Waiting, threshold_ts = T0} = State,
     #proc_int{pid = Pid, lang = Lang} = ProcInt,
-    case is_process_alive(Pid) of true ->
-        case get_waiting_client(Waiting, Lang) of
-            nil ->
-                if ProcInt#proc_int.t0 < T0 ->
-                    remove_proc(Tab, Pid);
-                true ->
-                    gen_server:cast(Pid, garbage_collect),
-                    ets:insert(Tab, ProcInt#proc_int{client=nil})
-                end,
-                State;
-            #client{}=Client ->
-                From = Client#client.from,
-                assign_proc(Tab, Client, ProcInt#proc_int{client=nil}),
-                gen_server:reply(From, {ok, ProcInt, State#state.config}),
+    NewState = case is_process_alive(Pid) of true ->
+        case ProcInt#proc_int.t0 < State#state.threshold_ts of
+            true ->
+                remove_proc(State, Pid);
+            false ->
+                gen_server:cast(Pid, garbage_collect),
+                true = ets:update_element(?PROCS, Pid, [
+                    {#proc_int.client, undefined}
+                ]),
                 State
         end;
     false ->
-        ets:delete(Tab, Pid),
-        case get_waiting_client(Waiting, Lang) of
-            nil ->
-                State;
-            #client{}=Client ->
-                maybe_spawn_proc(State, Client)
-        end
-    end.
+        remove_proc(State, ProcInt)
+    end,
+    flush_waiters(NewState, Lang).
+
 
-remove_proc(Tab, Pid) ->
-    ets:delete(Tab, Pid),
-    case is_process_alive(Pid) of true ->
-        unlink(Pid),
-        gen_server:cast(Pid, stop);
+remove_proc(State, #proc_int{}=Proc) ->
+    ets:delete(?PROCS, Proc#proc_int.pid),
+    case is_process_alive(Proc#proc_int.pid) of true ->
+        unlink(Proc#proc_int.pid),
+        gen_server:cast(Proc#proc_int.pid, stop);
     false ->
         ok
-    end.
+    end,
+    Counts = State#state.counts,
+    Lang = Proc#proc_int.lang,
+    State#state{
+        counts = dict:update_counter(Lang, -1, Counts)
+    }.
+
 
 -spec export_proc(#proc_int{}) -> #proc{}.
 export_proc(#proc_int{} = ProcInt) ->
-    [_ | Data] = lists:sublist(tuple_to_list(ProcInt), record_info(size, proc)),
+    ProcIntList = tuple_to_list(ProcInt),
+    ProcLen = record_info(size, proc),
+    [_ | Data] = lists:sublist(ProcIntList, ProcLen),
     list_to_tuple([proc | Data]).
 
-maybe_spawn_proc(State, Client) ->
-    #state{proc_counts=Counts, waiting=Waiting} = State,
-    #client{lang=Lang} = Client,
-    Limit = list_to_integer(config:get(
-                "query_server_config", "os_process_limit", "100")),
-    case dict:find(Lang, Counts) of
-    {ok, Limit} ->
-        add_waiting_client(Waiting, Client),
-        State;
-    _ ->
-        spawn_link(?MODULE, new_proc, [Client]),
-        State#state{
-            proc_counts=dict:update_counter(Lang, 1, Counts)
-        }
+
+flush_waiters(State) ->
+    dict:fold(fun(Lang, Count, StateAcc) ->
+        case Count < State#state.hard_limit of
+            true ->
+                flush_waiters(StateAcc, Lang);
+            false ->
+                StateAcc
+        end
+    end, State, State#state.counts).
+
+
+flush_waiters(State, Lang) ->
+    case dict:fetch(Lang, State#state.counts) of
+        Count when Count < State#state.hard_limit ->
+            case get_waiting_client(Lang) of
+                #client{} = Client ->
+                    NewState = spawn_proc(State, Client),
+                    flush_waiters(NewState, Lang);
+                undefined ->
+                    State
+            end;
+        _ ->
+            State
     end.
 
-add_waiting_client(Tab, Client) ->
-    ets:insert(Tab, Client#client{timestamp=os:timestamp()}).
 
-get_waiting_client(Tab, Lang) when is_list(Lang) ->
-    get_waiting_client(Tab, couch_util:to_binary(Lang));
-get_waiting_client(Tab, Lang) ->
-    case ets:match_object(Tab, #client{lang=Lang, _='_'}, 1) of
+add_waiting_client(Client) ->
+    ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}).
+
+
+get_waiting_client(Lang) ->
+    case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of
         '$end_of_table' ->
-            nil;
+            undefined;
         {[#client{}=Client], _} ->
-            ets:delete(Tab, Client#client.timestamp),
+            ets:delete(?WAITERS, Client#client.timestamp),
             Client
     end.
 
+
 get_proc_config() ->
     Limit = config:get("query_server_config", "reduce_limit", "true"),
     Timeout = config:get("couchdb", "os_process_timeout", "5000"),
@@ -469,3 +527,13 @@ get_proc_config() ->
         {<<"reduce_limit">>, list_to_atom(Limit)},
         {<<"timeout">>, list_to_integer(Timeout)}
     ]}.
+
+
+get_hard_limit() ->
+    LimStr = config:get("query_server_config", "os_process_limit", "100"),
+    list_to_integer(LimStr).
+
+
+get_soft_limit() ->
+    LimStr = config:get("query_server_config", "os_process_soft_limit", "100"),
+    list_to_integer(LimStr).


Mime
View raw message