couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jch...@apache.org
Subject svn commit: r983291 - in /couchdb/trunk: etc/couchdb/default.ini.tpl.in src/couchdb/couch_query_servers.erl
Date Sat, 07 Aug 2010 19:26:51 GMT
Author: jchris
Date: Sat Aug  7 19:26:50 2010
New Revision: 983291

URL: http://svn.apache.org/viewvc?rev=983291&view=rev
Log:
os_process_limit for query servers make them much more robust under concurrent load

Modified:
    couchdb/trunk/etc/couchdb/default.ini.tpl.in
    couchdb/trunk/src/couchdb/couch_query_servers.erl

Modified: couchdb/trunk/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/trunk/etc/couchdb/default.ini.tpl.in?rev=983291&r1=983290&r2=983291&view=diff
==============================================================================
--- couchdb/trunk/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/trunk/etc/couchdb/default.ini.tpl.in Sat Aug  7 19:26:50 2010
@@ -46,6 +46,7 @@ javascript = %bindir%/%couchjs_command_n
 ; please let us know on the mailing list so we can fine tune the heuristic.
 [query_server_config]
 reduce_limit = true
+os_process_limit = 25
 
 ; enable external as an httpd handler, then link it with commands here.
 ; note, this api is still under consideration.

Modified: couchdb/trunk/src/couchdb/couch_query_servers.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=983291&r1=983290&r2=983291&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_query_servers.erl (original)
+++ couchdb/trunk/src/couchdb/couch_query_servers.erl Sat Aug  7 19:26:50 2010
@@ -35,6 +35,15 @@
     stop_fun
 }).
 
+-record(qserver, {
+    langs, % Keyed by language name, value is {Mod,Func,Arg}
+    pid_procs, % Keyed by PID, valus is a #proc record.
+    lang_procs, % Keyed by language name, value is a #proc record
+    lang_limits, % Keyed by language name, value is {Lang, Limit, Current}
+    waitlist = [],
+    config
+}).
+
 start_link() ->
     gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).
 
@@ -242,81 +251,97 @@ init([]) ->
             supervisor:terminate_child(couch_secondary_services, query_servers),
             [supervisor:restart_child(couch_secondary_services, query_servers)]
         end),
+    ok = couch_config:register(
+        fun("query_server_config" ++ _, _) ->
+            supervisor:terminate_child(couch_secondary_services, query_servers),
+            supervisor:restart_child(couch_secondary_services, query_servers)
+        end),
 
     Langs = ets:new(couch_query_server_langs, [set, private]),
+    LangLimits = ets:new(couch_query_server_lang_limits, [set, private]),
     PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
     LangProcs = ets:new(couch_query_server_procs, [set, private]),
+
+    ProcTimeout = list_to_integer(couch_config:get(
+                        "couchdb", "os_process_timeout", "5000")),
+    ReduceLimit = list_to_atom(
+        couch_config:get("query_server_config","reduce_limit","true")),
+    OsProcLimit = list_to_integer(
+        couch_config:get("query_server_config","os_process_limit","10")),
+
     % 'query_servers' specifies an OS command-line to execute.
     lists:foreach(fun({Lang, Command}) ->
+        true = ets:insert(LangLimits, {?l2b(Lang), OsProcLimit, 0}),
         true = ets:insert(Langs, {?l2b(Lang),
                           couch_os_process, start_link, [Command]})
     end, couch_config:get("query_servers")),
     % 'native_query_servers' specifies a {Module, Func, Arg} tuple.
     lists:foreach(fun({Lang, SpecStr}) ->
         {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr),
+        true = ets:insert(LangLimits, {?l2b(Lang), 0, 0}), % 0 means no limit
         true = ets:insert(Langs, {?l2b(Lang),
                           Mod, Fun, SpecArg})
     end, couch_config:get("native_query_servers")),
+
+
     process_flag(trap_exit, true),
-    {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg}
-          PidProcs, % Keyed by PID, valus is a #proc record.
-          LangProcs % Keyed by language name, value is a #proc record
-          }}.
+    {ok, #qserver{
+        langs = Langs, % Keyed by language name, value is {Mod,Func,Arg}
+        pid_procs = PidProcs, % Keyed by PID, valus is a #proc record.
+        lang_procs = LangProcs, % Keyed by language name, value is a #proc record
+        lang_limits = LangLimits, % Keyed by language name, value is {Lang, Limit, Current}
+        config = {[{<<"reduce_limit">>, ReduceLimit},{<<"timeout">>,
ProcTimeout}]}
+    }}.
 
-terminate(_Reason, {_Langs, PidProcs, _LangProcs}) ->
+terminate(_Reason, #qserver{pid_procs=PidProcs}) ->
     [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)],
     ok.
 
-handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server)
->
-    % Note to future self. Add max process limit.
+handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, Server) ->
     Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
-    case ets:lookup(LangProcs, Lang) of
-    [{Lang, [P|Rest]}] ->
-        % find a proc in the set that has the DDoc
-        {ok, Proc} = proc_with_ddoc(DDoc, DDocKey, [P|Rest]),
-        rem_from_list(LangProcs, Lang, Proc),
-        {reply, {ok, Proc, get_query_server_config()}, Server};
-    _ ->
-        case (catch new_process(Langs, Lang)) of
-        {ok, Proc} ->
-            add_value(PidProcs, Proc#proc.pid, Proc),
-            {ok, Proc2} = proc_with_ddoc(DDoc, DDocKey, [Proc]),
-            {reply, {ok, Proc2, get_query_server_config()}, Server};
-        Error ->
-            {reply, Error, Server}
-        end
+    case lang_proc(Lang, Server, fun(Procs) ->
+            % find a proc in the set that has the DDoc
+            proc_with_ddoc(DDoc, DDocKey, Procs)
+        end) of
+    {ok, Proc} ->
+        {reply, {ok, Proc, Server#qserver.config}, Server};
+    wait ->
+        {noreply, add_to_waitlist({DDoc, DDocKey}, From, Server)};
+    Error ->
+        {reply, Error, Server}
     end;
-handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) ->
-    % Note to future self. Add max process limit.
-    case ets:lookup(LangProcs, Lang) of
-    [{Lang, [Proc|_]}] ->
-        rem_from_list(LangProcs, Lang, Proc),
-        {reply, {ok, Proc, get_query_server_config()}, Server};
-    _ ->
-        case (catch new_process(Langs, Lang)) of
-        {ok, Proc} ->
-            add_value(PidProcs, Proc#proc.pid, Proc),
-            {reply, {ok, Proc, get_query_server_config()}, Server};
-        Error ->
-            {reply, Error, Server}
-        end
+handle_call({get_proc, Lang}, From, Server) ->
+    case lang_proc(Lang, Server, fun([P|_Procs]) ->
+            {ok, P}
+        end) of
+    {ok, Proc} ->
+        {reply, {ok, Proc, Server#qserver.config}, Server};
+    wait ->
+        {noreply, add_to_waitlist({Lang}, From, Server)};
+    Error ->
+        {reply, Error, Server}
     end;
-handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) ->
+handle_call({unlink_proc, Pid}, _From, #qserver{pid_procs=PidProcs}=Server) ->
     rem_value(PidProcs, Pid),
     unlink(Pid),
     {reply, ok, Server};
-handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) ->
+handle_call({ret_proc, Proc}, _From, #qserver{
+        pid_procs=PidProcs,
+        lang_procs=LangProcs}=Server) ->
     % Along with max process limit, here we should check
     % if we're over the limit and discard when we are.
     add_value(PidProcs, Proc#proc.pid, Proc),
     add_to_list(LangProcs, Proc#proc.lang, Proc),
     link(Proc#proc.pid),
-    {reply, true, Server}.
+    {reply, true, service_waitlist(Server)}.
 
 handle_cast(_Whatever, Server) ->
     {noreply, Server}.
 
-handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) ->
+handle_info({'EXIT', Pid, Status}, #qserver{
+        pid_procs=PidProcs,
+        lang_procs=LangProcs,
+        lang_limits=LangLimits}=Server) ->
     case ets:lookup(PidProcs, Pid) of
     [{Pid, Proc}] ->
         case Status of
@@ -325,7 +350,9 @@ handle_info({'EXIT', Pid, Status}, {_, P
         end,
         rem_value(PidProcs, Pid),
         catch rem_from_list(LangProcs, Proc#proc.lang, Proc),
-        {noreply, Server};
+        [{Lang, Lim, Current}] = ets:lookup(LangLimits, Proc#proc.lang),
+        true = ets:insert(LangLimits, {Lang, Lim, Current-1}),
+        {noreply, service_waitlist(Server)};
     [] ->
         case Status of
         normal ->
@@ -340,23 +367,90 @@ code_change(_OldVsn, State, _Extra) ->
 
 % Private API
 
-get_query_server_config() ->
-    ReduceLimit = list_to_atom(
-        couch_config:get("query_server_config","reduce_limit","true")),
-    {[{<<"reduce_limit">>, ReduceLimit}]}.
+add_to_waitlist(Info, From, #qserver{waitlist=Waitlist}=Server) ->
+    Server#qserver{waitlist=[{Info, From}|Waitlist]}.
+
+service_waitlist(#qserver{waitlist=[]}=Server) ->
+    Server;
+service_waitlist(#qserver{waitlist=Waitlist}=Server) ->
+    [Oldest|RevWList] = lists:reverse(Waitlist),
+    case service_waiting(Oldest, Server) of
+    ok ->
+        Server#qserver{waitlist=lists:reverse(RevWList)};
+    wait ->
+        Server#qserver{waitlist=Waitlist}
+    end.
+
+% todo get rid of duplication
+service_waiting({{#doc{body={Props}}=DDoc, DDocKey}, From}, Server) ->
+    Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    case lang_proc(Lang, Server, fun(Procs) ->
+            % find a proc in the set that has the DDoc
+            proc_with_ddoc(DDoc, DDocKey, Procs)
+        end) of
+    {ok, Proc} ->
+        gen_server:reply(From, {ok, Proc, Server#qserver.config}),
+        ok;
+    wait -> % this should never happen
+        wait;
+    Error ->
+        gen_server:reply(From, Error),
+        ok
+    end;
+service_waiting({{Lang}, From}, Server) ->
+    case lang_proc(Lang, Server, fun([P|Procs]) ->
+            {ok, P}
+        end) of
+    {ok, Proc} ->
+        gen_server:reply(From, {ok, Proc, Server#qserver.config}),
+        ok;
+    wait -> % this should never happen
+        wait;
+    Error ->
+        gen_server:reply(From, Error),
+        ok
+    end.
 
-new_process(Langs, Lang) ->
-    case ets:lookup(Langs, Lang) of
-    [{Lang, Mod, Func, Arg}] ->
-        {ok, Pid} = apply(Mod, Func, Arg),
-        {ok, #proc{lang=Lang,
-                   pid=Pid,
-                   % Called via proc_prompt, proc_set_timeout, and proc_stop
-                   prompt_fun={Mod, prompt},
-                   set_timeout_fun={Mod, set_timeout},
-                   stop_fun={Mod, stop}}};
+lang_proc(Lang, #qserver{
+        langs=Langs,
+        pid_procs=PidProcs,
+        lang_procs=LangProcs,
+        lang_limits=LangLimits}, PickFun) ->
+    % Note to future self. Add max process limit.
+    case ets:lookup(LangProcs, Lang) of
+    [{Lang, [P|Procs]}] ->
+        {ok, Proc} = PickFun([P|Procs]),
+        rem_from_list(LangProcs, Lang, Proc),
+        {ok, Proc};
     _ ->
-        {unknown_query_language, Lang}
+        case (catch new_process(Langs, LangLimits, Lang)) of
+        {ok, Proc} ->
+            add_value(PidProcs, Proc#proc.pid, Proc),
+            {ok, Proc2} = PickFun([Proc]);
+        ErrorOrWait ->
+            ErrorOrWait
+        end
+    end.
+
+new_process(Langs, LangLimits, Lang) ->
+    [{Lang, Lim, Current}] = ets:lookup(LangLimits, Lang),
+    if (Lim == 0) or (Current < Lim) -> % Lim == 0 means no limit
+        % we are below the limit for our language, make a new one
+        case ets:lookup(Langs, Lang) of
+        [{Lang, Mod, Func, Arg}] ->
+            {ok, Pid} = apply(Mod, Func, Arg),
+            true = ets:insert(LangLimits, {Lang, Lim, Current+1}),
+            {ok, #proc{lang=Lang,
+                       pid=Pid,
+                       % Called via proc_prompt, proc_set_timeout, and proc_stop
+                       prompt_fun={Mod, prompt},
+                       set_timeout_fun={Mod, set_timeout},
+                       stop_fun={Mod, stop}}};
+        _ ->
+            {unknown_query_language, Lang}
+        end;
+    true ->
+        wait
     end.
 
 proc_with_ddoc(DDoc, DDocKey, LangProcs) ->
@@ -402,12 +496,11 @@ teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey,
 get_ddoc_process(#doc{} = DDoc, DDocKey) ->
     % remove this case statement
     case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of
-    {ok, Proc, QueryConfig} ->
+    {ok, Proc, {QueryConfig}} ->
         % process knows the ddoc
-        case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
+        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
         true ->
-            proc_set_timeout(Proc, list_to_integer(couch_config:get(
-                                "couchdb", "os_process_timeout", "5000"))),
+            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
             link(Proc#proc.pid),
             gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
             Proc;
@@ -421,11 +514,10 @@ get_ddoc_process(#doc{} = DDoc, DDocKey)
 
 get_os_process(Lang) ->
     case gen_server:call(couch_query_servers, {get_proc, Lang}) of
-    {ok, Proc, QueryConfig} ->
-        case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
+    {ok, Proc, {QueryConfig}} ->
+        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
         true ->
-            proc_set_timeout(Proc, list_to_integer(couch_config:get(
-                                "couchdb", "os_process_timeout", "5000"))),
+            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
             link(Proc#proc.pid),
             gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
             Proc;



Mime
View raw message