couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [1/3] couch commit: updated refs/heads/master to 8bd756e
Date Tue, 16 Aug 2016 15:21:56 GMT
Repository: couchdb-couch
Updated Branches:
  refs/heads/master 7ee0b8161 -> 8bd756e60


Simplify proc manager assignment logic

This changes how proc manager handles proc assignment.
Instead of doing this in three different places:
get_proc call handler, return_proc/2 with maybe_assign_proc
and flush_waiters/2, proc manager now just places
all the incoming requests in the waiting queue
and then flushes it.

As a result all the logic kept in one place which makes
it more obvious that we are treating proc management
as a processing of a single FIFO queue
with "soft" and "hard" upper limits.

Consequently this is fixing a bug in maybe_assign_proc
where it was possible to assign a client a process
that wasn't aware of it.

COUCHDB-3095


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/f0b84513
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/f0b84513
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/f0b84513

Branch: refs/heads/master
Commit: f0b84513752cad3d7f6f944ff912244be4c08e32
Parents: 7ee0b81
Author: Eric Avdey <eiri@eiri.ca>
Authored: Thu Aug 11 17:45:32 2016 -0300
Committer: Eric Avdey <eiri@eiri.ca>
Committed: Tue Aug 16 12:06:06 2016 -0300

----------------------------------------------------------------------
 src/couch_proc_manager.erl | 151 ++++++++++++++++++++--------------------
 1 file changed, 75 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/f0b84513/src/couch_proc_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl
index 33cc1e5..a6790b4 100644
--- a/src/couch_proc_manager.erl
+++ b/src/couch_proc_manager.erl
@@ -131,35 +131,17 @@ handle_call(get_stale_proc_count, _From, State) ->
     {reply, ets:select_count(?PROCS, MatchSpec), State};
 
 handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
-    {ClientPid, _} = From,
     LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
     Lang = couch_util:to_binary(LangStr),
-    IterFun = fun(Proc, Acc) ->
-        case lists:member(DDocKey, Proc#proc_int.ddoc_keys) of
-            true ->
-                {stop, assign_proc(ClientPid, Proc)};
-            false ->
-                {ok, Acc}
-        end
-    end,
-    TeachFun = fun(Proc0, Acc) ->
-        try
-            {ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0),
-            {stop, assign_proc(ClientPid, Proc1)}
-        catch _:_ ->
-            {ok, Acc}
-        end
-    end,
     Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
-    find_proc(State, Client, [IterFun, TeachFun]);
+    add_waiting_client(Client),
+    {noreply, flush_waiters(State, Lang)};
 
-handle_call({get_proc, Lang}, From, State) ->
-    {ClientPid, _} = From,
-    IterFun = fun(Proc, _Acc) ->
-        {stop, assign_proc(ClientPid, Proc)}
-    end,
-    Client = #client{from=From, lang=couch_util:to_binary(Lang)},
-    find_proc(State, Client, [IterFun]);
+handle_call({get_proc, LangStr}, From, State) ->
+    Lang = couch_util:to_binary(LangStr),
+    Client = #client{from=From, lang=Lang},
+    add_waiting_client(Client),
+    {noreply, flush_waiters(State, Lang)};
 
 handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
     erlang:demonitor(Ref, [flush]),
@@ -290,54 +272,60 @@ handle_config_change(_, _, _, _, _) ->
     {ok, undefined}.
 
 
-find_proc(State, Client, [Fun | FindFuns]) ->
-    try iter_procs(Client#client.lang, Fun, undefined) of
-        {not_found, _} ->
-            find_proc(State, Client, FindFuns);
-        {ok, Proc} ->
-            {reply, {ok, Proc, State#state.config}, State}
+find_proc(#client{lang = Lang, ddoc_key = undefined}) ->
+    Pred = fun(_) ->
+        true
+    end,
+    find_proc(Lang, Pred);
+find_proc(#client{lang = Lang, ddoc = DDoc, ddoc_key = DDocKey} = Client) ->
+    Pred = fun(#proc_int{ddoc_keys = DDocKeys}) ->
+        lists:member(DDocKey, DDocKeys)
+    end,
+    case find_proc(Lang, Pred) of
+        not_found ->
+            case find_proc(Client#client{ddoc_key=undefined}) of
+                {ok, Proc} ->
+                    teach_ddoc(DDoc, DDocKey, Proc);
+                Else ->
+                    Else
+            end;
+        Else ->
+            Else
+    end.
+
+find_proc(Lang, Fun) ->
+    try iter_procs(Lang, Fun)
     catch error:Reason ->
-        couch_log:error("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
-        {reply, {error, Reason}, State}
-    end;
-find_proc(State, Client, []) ->
-    {noreply, maybe_spawn_proc(State, Client)}.
+        StackTrace = erlang:get_stacktrace(),
+        couch_log:error("~p ~p ~p", [?MODULE, Reason, StackTrace]),
+        {error, Reason}
+    end.
 
 
-iter_procs(Lang, Fun, Acc) when is_binary(Lang) ->
+iter_procs(Lang, Fun) when is_binary(Lang) ->
     Pattern = #proc_int{lang=Lang, client=undefined, _='_'},
     MSpec = [{Pattern, [], ['$_']}],
     case ets:select_reverse(?PROCS, MSpec, 25) of
         '$end_of_table' ->
-            {not_found, Acc};
+            not_found;
         Continuation ->
-            iter_procs_int(Continuation, Fun, Acc)
+            iter_procs_int(Continuation, Fun)
     end.
 
 
-iter_procs_int({[], Continuation0}, Fun, Acc) ->
+iter_procs_int({[], Continuation0}, Fun) ->
     case ets:select_reverse(Continuation0) of
         '$end_of_table' ->
-            {not_found, Acc};
+            not_found;
         Continuation1 ->
-            iter_procs_int(Continuation1, Fun, Acc)
+            iter_procs_int(Continuation1, Fun)
     end;
-iter_procs_int({[Proc | Rest], Continuation}, Fun, Acc0) ->
-    case Fun(Proc, Acc0) of
-        {ok, Acc1} ->
-            iter_procs_int({Rest, Continuation}, Fun, Acc1);
-        {stop, Acc1} ->
-            {ok, Acc1}
-    end.
-
-
-maybe_spawn_proc(State, Client) ->
-    case dict:find(Client#client.lang, State#state.counts) of
-    {ok, Count} when Count >= State#state.hard_limit ->
-        add_waiting_client(Client),
-        State;
-    _ ->
-        spawn_proc(State, Client)
+iter_procs_int({[Proc | Rest], Continuation}, Fun) ->
+    case Fun(Proc) of
+        true ->
+            {ok, Proc};
+        false ->
+            iter_procs_int({Rest, Continuation}, Fun)
     end.
 
 
@@ -447,23 +435,13 @@ return_proc(#state{} = State, #proc_int{} = ProcInt) ->
                 true = ets:update_element(?PROCS, Pid, [
                     {#proc_int.client, undefined}
                 ]),
-                maybe_assign_proc(State, ProcInt)
+                State
         end;
     false ->
         remove_proc(State, ProcInt)
     end,
     flush_waiters(NewState, Lang).
 
-maybe_assign_proc(#state{} = State, ProcInt) ->
-    #proc_int{lang = Lang} = ProcInt,
-    case get_waiting_client(Lang) of
-        #client{from = From} = Client ->
-            Proc = assign_proc(Client, ProcInt#proc_int{client=undefined}),
-            gen_server:reply(From, {ok, Proc, State#state.config}),
-            State;
-        undefined ->
-            State
-    end.
 
 remove_proc(State, #proc_int{}=Proc) ->
     ets:delete(?PROCS, Proc#proc_int.pid),
@@ -500,16 +478,27 @@ flush_waiters(State) ->
 
 
 flush_waiters(State, Lang) ->
-    case dict:fetch(Lang, State#state.counts) of
-        Count when Count < State#state.hard_limit ->
-            case get_waiting_client(Lang) of
-                #client{} = Client ->
+    CanSpawn = can_spawn(State, Lang),
+    case get_waiting_client(Lang) of
+        #client{from = From} = Client ->
+            case find_proc(Client) of
+                {ok, ProcInt} ->
+                    Proc = assign_proc(Client, ProcInt),
+                    gen_server:reply(From, {ok, Proc, State#state.config}),
+                    remove_waiting_client(Client),
+                    flush_waiters(State, Lang);
+                {error, Error} ->
+                    gen_server:reply(From, {error, Error}),
+                    remove_waiting_client(Client),
+                    flush_waiters(State, Lang);
+                not_found when CanSpawn ->
                     NewState = spawn_proc(State, Client),
+                    remove_waiting_client(Client),
                     flush_waiters(NewState, Lang);
-                undefined ->
+                not_found ->
                     State
             end;
-        _ ->
+        undefined ->
             State
     end.
 
@@ -523,11 +512,21 @@ get_waiting_client(Lang) ->
         '$end_of_table' ->
             undefined;
         {[#client{}=Client], _} ->
-            ets:delete(?WAITERS, Client#client.timestamp),
             Client
     end.
 
 
+remove_waiting_client(#client{timestamp = Timestamp}) ->
+    ets:delete(?WAITERS, Timestamp).
+
+
+can_spawn(#state{hard_limit = HardLimit, counts = Counts}, Lang) ->
+    case dict:find(Lang, Counts) of
+        {ok, Count} -> Count < HardLimit;
+        error -> true
+    end.
+
+
 get_proc_config() ->
     Limit = config:get("query_server_config", "reduce_limit", "true"),
     Timeout = config:get("couchdb", "os_process_timeout", "5000"),


Mime
View raw message