couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] 02/31: fixes based on reviews
Date Tue, 23 Jul 2019 21:54:47 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/views
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 64be6bf4afbdadd857e1791ed004b1cb5c3399fe
Author: Garren Smith <garren.smith@gmail.com>
AuthorDate: Mon Jul 8 16:40:29 2019 +0200

    fixes based on reviews
---
 rel/overlay/etc/default.ini                       |  4 +-
 src/couch_views/src/couch_views.erl               | 23 ++++---
 src/couch_views/src/couch_views_jobs.erl          |  2 +-
 src/couch_views/src/couch_views_reader.erl        | 63 ++++++++---------
 src/couch_views/src/couch_views_worker_server.erl | 84 +++++++++++++----------
 src/couch_views/test/couch_views_map_test.erl     |  1 -
 test/elixir/test/map_test.exs                     | 13 +---
 7 files changed, 92 insertions(+), 98 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 59b7d57..11bd611 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -225,9 +225,7 @@ iterations = 10 ; iterations for password hashing
 
 ; Settings for view indexing
 [couch_views]
-; type_check_period_msec = 500
-; type_check_max_jitter_msec = 500
-; change_limit = 100
+; max_workers = 100
 
 ; CSP (Content Security Policy) Support for _utils
 [csp]
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 4ccf0fa..69d6765 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -34,15 +34,20 @@ map_query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
 
 
 process_args(#{} = Args) ->
-    Args1 = maps:filter(fun (_, V) -> V /= undefined end, Args),
-
-    maps:merge(#{
-        direction => fwd,
-        inclusive_end => true,
-        update => true,
-        skip => 0,
-        limit => ?MAX_VIEW_LIMIT
-    }, Args1).
+    Args1 = remove_ununsed_values(Args),
+    Defaults = #{
+            direction => fwd,
+            inclusive_end => true,
+            update => true,
+            skip => 0,
+            limit => ?MAX_VIEW_LIMIT
+        },
+
+    maps:merge(Defaults, Args1).
+
+
+remove_ununsed_values(Args) ->
+    maps:filter(fun (_, V) -> V /= undefined end, Args).
 
 
 maybe_build_index(_Db, _Mrst, #{update := false}) ->
diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl
index ff99475..31ab728 100644
--- a/src/couch_views/src/couch_views_jobs.erl
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -50,7 +50,7 @@ add(TxDb, Mrst) ->
     JobData = create_job_data(TxDb, Mrst, 0),
 
     JobId = create_job_id(TxDb, Mrst),
-    JTx = couch_jobs_fdb:get_jtx(),
+    JTx = couch_jobs_fdb:get_jtx(TxDb),
     couch_jobs:add(JTx, ?INDEX_JOB_TYPE, JobId, JobData).
 
 
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 2ddb5b6..f4e768a 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -136,27 +136,12 @@ get_unpack_fun(TxDb, Opts, Callback) ->
     UnPackFwd = fun({K, V}, State) ->
         case couch_views_fdb:unpack_map_row(TxDb, K, V) of
             {key, _Id, RowKey} ->
-                maps:put(current_key, RowKey, State);
+                State#{current_key => RowKey};
             {value, Id, RowValue} ->
                 #{
-                    current_key := RowKey,
-                    acc := Acc,
-                    skip := Skip,
-                    db := Db
+                    current_key := RowKey
                 } = State,
-
-                case Skip > 0 of
-                    true ->
-                        maps:put(skip, Skip - 1, State);
-                    false ->
-                        Row = [{id, Id}, {key, RowKey}, {value, RowValue}],
-
-                        IncludeDoc = maps:get(include_docs, State, false),
-                        Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc),
-
-                        {ok, AccNext} = Callback({row, Row1}, Acc),
-                        maps:put(acc, AccNext, State)
-                end
+                process_map_row(Id, RowKey, RowValue, State, Callback)
         end
     end,
 
@@ -164,26 +149,11 @@ get_unpack_fun(TxDb, Opts, Callback) ->
         case couch_views_fdb:unpack_map_row(TxDb, K, V) of
             {key, Id, RowKey} ->
                 #{
-                    current_value := RowValue,
-                    acc := Acc,
-                    skip := Skip,
-                    db := Db
+                    current_value := RowValue
                 } = State,
-
-                case Skip > 0 of
-                    true ->
-                        maps:put(skip, Skip - 1, State);
-                    false ->
-                        Row = [{id, Id}, {key, RowKey}, {value, RowValue}],
-
-                        IncludeDoc = maps:get(include_docs, State, false),
-                        Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc),
-
-                        {ok, AccNext} = Callback({row, Row1}, Acc),
-                        maps:put(acc, AccNext, State)
-                end;
+                process_map_row(Id, RowKey, RowValue, State, Callback);
             {value, _Id, RowValue} ->
-                maps:put(current_value, RowValue, State)
+                State#{current_value => RowValue}
         end
     end,
 
@@ -193,6 +163,27 @@ get_unpack_fun(TxDb, Opts, Callback) ->
     end.
 
 
+process_map_row(Id, RowKey, RowValue, State, Callback) ->
+    #{
+        acc := Acc,
+        skip := Skip,
+        db := Db
+    } = State,
+
+    case Skip > 0 of
+        true ->
+            State#{skip := Skip -1};
+        false ->
+            Row = [{id, Id}, {key, RowKey}, {value, RowValue}],
+
+            IncludeDoc = maps:get(include_docs, State, false),
+            Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc),
+
+            {ok, AccNext} = Callback({row, Row1}, Acc),
+            State#{acc := AccNext}
+    end.
+
+
 maybe_include_doc(_Db, _Id, Row, false) ->
     Row;
 
diff --git a/src/couch_views/src/couch_views_worker_server.erl b/src/couch_views/src/couch_views_worker_server.erl
index 1c815e5..13bd9aa 100644
--- a/src/couch_views/src/couch_views_worker_server.erl
+++ b/src/couch_views/src/couch_views_worker_server.erl
@@ -31,8 +31,7 @@
 ]).
 
 
--define(TYPE_CHECK_PERIOD_DEFAULT, 500).
--define(MAX_JITTER_DEFAULT, 100).
+-define(MAX_WORKERS, 100).
 
 
 start_link() ->
@@ -41,8 +40,12 @@ start_link() ->
 
 init(_) ->
     couch_views_jobs:set_timeout(),
-    schedule_check(),
-    {ok, #{}}.
+    State0 = #{
+        workers => #{},
+        acceptor_pid => undefined
+    },
+    State = spawn_acceptor(State0),
+    {ok, State}.
 
 
 terminate(_, _St) ->
@@ -53,19 +56,20 @@ handle_call(Msg, _From, St) ->
     {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
 
 
+handle_cast({job, Job, JobData}, State) ->
+    State1 = start_worker(State, Job, JobData),
+    State2 = spawn_acceptor(State1),
+    {noreply, State2};
+
 handle_cast(Msg, St) ->
     {stop, {bad_cast, Msg}, St}.
 
 
-handle_info(check_for_jobs, State) ->
-    accept_jobs(),
-    schedule_check(),
-    {noreply, State};
-
-handle_info({'DOWN', _Ref, process, Pid, Reason}, St) ->
+handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
     LogMsg = "~p : process ~p exited with ~p",
     couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
-    {noreply, St};
+    State1 = check_finished_process(State, Pid),
+    {noreply, State1};
 
 handle_info(Msg, St) ->
     couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
@@ -76,35 +80,43 @@ code_change(_OldVsn, St, _Extra) ->
     {ok, St}.
 
 
-accept_jobs() ->
+start_worker(State, Job, JobData) ->
+    #{workers := Workers} = State,
+    {Pid, _Ref} = spawn_monitor(fun () -> couch_views_worker:start(Job, JobData) end),
+    Workers1 = Workers#{Pid => true},
+    State#{workers := Workers1}.
+
+
+spawn_acceptor(State) ->
+    #{
+        workers := Workers,
+        acceptor_pid := Pid
+    } = State,
+    MaxWorkers = config:get_integer("couch_views", "max_workers", ?MAX_WORKERS),
+    case maps:size(Workers) >= MaxWorkers of
+        false when not is_pid(Pid) ->
+            Parent = self(),
+            {Pid1, _Ref} = spawn_monitor(fun() -> blocking_acceptor(Parent) end),
+            State#{acceptor_pid := Pid1};
+        _ ->
+            State
+    end.
+
+
+blocking_acceptor(Parent) ->
     case couch_views_jobs:accept() of
         not_found ->
-            ok;
+            blocking_acceptor(Parent);
         {ok, Job, JobData} ->
-            start_worker(Job, JobData),
-            % keep accepting jobs until not_found
-            accept_jobs()
+            gen_server:cast(Parent, {job, Job, JobData})
     end.
 
 
-start_worker(Job, JobData) ->
-    % TODO Should I monitor it, or let jobs do that?
-    spawn_monitor(fun () -> couch_views_worker:start(Job, JobData) end),
-    ok.
-
-
-schedule_check() ->
-    Timeout = get_period_msec(),
-    MaxJitter = max(Timeout div 2, get_max_jitter_msec()),
-    Wait = Timeout + rand:uniform(max(1, MaxJitter)),
-    timer:send_after(Wait, self(), check_for_jobs).
-
-
-get_period_msec() ->
-    config:get_integer("couch_views", "type_check_period_msec",
-        ?TYPE_CHECK_PERIOD_DEFAULT).
-
+check_finished_process(#{acceptor_pid := Pid} = State, Pid) ->
+    State1 = State#{acceptor_pid := undefined},
+    spawn_acceptor(State1);
 
-get_max_jitter_msec() ->
-    config:get_integer("couch_views", "type_check_max_jitter_msec",
-        ?MAX_JITTER_DEFAULT).
+check_finished_process(State, Pid) ->
+    #{workers := Workers} = State,
+    Workers1 = maps:remove(Pid, Workers),
+    State#{workers := Workers1}.
diff --git a/src/couch_views/test/couch_views_map_test.erl b/src/couch_views/test/couch_views_map_test.erl
index bbad93f..e7be521 100644
--- a/src/couch_views/test/couch_views_map_test.erl
+++ b/src/couch_views/test/couch_views_map_test.erl
@@ -300,7 +300,6 @@ should_map_duplicate_keys() ->
         {row, [{id, <<"2">>}, {key, <<"2">>}, {value, 2}]},
         {row, [{id, <<"2">>}, {key, <<"2">>}, {value, 3}]}
     ]},
-    ?debugFmt("EXPE ~p ~n", [Expect]),
     ?assertEqual(Expect, Result).
 
 
diff --git a/test/elixir/test/map_test.exs b/test/elixir/test/map_test.exs
index b7a809d..7c443ab 100644
--- a/test/elixir/test/map_test.exs
+++ b/test/elixir/test/map_test.exs
@@ -19,7 +19,7 @@ defmodule ViewMapTest do
             "two"
           end
 
-        doc = %{
+        %{
           :_id => "doc-id-#{i}",
           :value => i,
           :some => "field",
@@ -78,17 +78,6 @@ defmodule ViewMapTest do
     resp = Couch.post("/#{db_name}/_bulk_docs", body: body)
     Enum.each(resp.body, &assert(&1["ok"]))
 
-    #        ddoc = %{
-    #            :_id => "_design/map",
-    #            views: %{
-    #                some: %{map: map_fun1},
-    #                map_some: %{map: map_fun2}
-    #            }
-    #        }
-    #        resp = Couch.put("/#{db_name}/#{ddoc._id}", body: ddoc)
-    #        IO.inspect resp
-    #        assert resp.status_code == 201
-
     {:ok, [db_name: db_name]}
   end
 


Mime
View raw message