couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbast...@apache.org
Subject [couchdb] 03/07: Use LRU for view indexes
Date Thu, 20 Apr 2017 20:49:00 GMT
This is an automated email from the ASF dual-hosted git repository.

bbastian pushed a commit to branch COUCHDB-3377
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f0a891285ca393308093a24e3bf7f5c3e45812e1
Author: Benjamin Bastian <benjamin.bastian@gmail.com>
AuthorDate: Wed Mar 29 14:44:23 2017 -0700

    Use LRU for view indexes
---
 src/couch_index/priv/stats_descriptions.cfg        |  20 ++
 src/couch_index/src/couch_index.erl                |  16 +-
 src/couch_index/src/couch_index_compactor.erl      |   3 +
 src/couch_index/src/couch_index_server.erl         | 180 ++++++++++++----
 .../test/couch_index_compaction_tests.erl          |   7 +-
 src/couch_index/test/couch_index_lru_tests.erl     | 226 +++++++++++++++++++++
 6 files changed, 408 insertions(+), 44 deletions(-)

diff --git a/src/couch_index/priv/stats_descriptions.cfg b/src/couch_index/priv/stats_descriptions.cfg
new file mode 100644
index 0000000..aac317f
--- /dev/null
+++ b/src/couch_index/priv/stats_descriptions.cfg
@@ -0,0 +1,20 @@
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
+
+% Style guide for descriptions: Start with a lowercase letter & do not add
+% a trailing full-stop / period
+% Please keep this in alphabetical order
+
+{[couchdb, couch_index_server, lru_skip], [
+    {type, counter},
+    {desc, <<"number of couch_index_server LRU operations skipped">>}
+]}.
diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl
index c86f5e1..99b5e8a 100644
--- a/src/couch_index/src/couch_index.erl
+++ b/src/couch_index/src/couch_index.erl
@@ -240,8 +240,11 @@ handle_cast({new_state, NewIdxState}, State) ->
     couch_log:debug("Updated index for db: ~s idx: ~s seq: ~B", Args),
     Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState),
     case State#st.committed of
-        true -> erlang:send_after(commit_delay(), self(), commit);
-        false -> ok
+        true ->
+            ok = couch_index_server:set_committing(self(), true),
+            erlang:send_after(commit_delay(), self(), commit);
+        false ->
+            ok
     end,
     {noreply, State#st{
         idx_state=NewIdxState,
@@ -297,6 +300,7 @@ handle_info(commit, State) ->
             % Commit the updates
             ok = Mod:commit(IdxState),
             couch_event:notify(DbName, {index_commit, IdxName}),
+            ok = couch_index_server:set_committing(self(), false),
             {noreply, State#st{committed=true}};
         _ ->
             % We can't commit the header because the database seq that's
@@ -305,6 +309,7 @@ handle_info(commit, State) ->
             % forever out of sync with the database. But a crash before we
             % commit these changes, no big deal, we only lose incremental
             % changes since last committal.
+            ok = couch_index_server:set_committing(self(), true),
             erlang:send_after(commit_delay(), self(), commit),
             {noreply, State}
     end;
@@ -385,8 +390,11 @@ commit_compacted(NewIdxState, State) ->
         false -> ok
     end,
     case State#st.committed of
-        true -> erlang:send_after(commit_delay(), self(), commit);
-        false -> ok
+        true ->
+            ok = couch_index_server:set_committing(self(), true),
+            erlang:send_after(commit_delay(), self(), commit);
+        false ->
+            ok
     end,
     State#st{
         idx_state=NewIdxState1,
diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl
index 61f406c..2271c35 100644
--- a/src/couch_index/src/couch_index_compactor.erl
+++ b/src/couch_index/src/couch_index_compactor.erl
@@ -64,12 +64,14 @@ handle_call({compact, _}, _From, #st{pid=Pid}=State) when is_pid(Pid)
->
     {reply, {ok, Pid}, State};
 handle_call({compact, IdxState}, _From, #st{idx=Idx}=State) ->
     Pid = spawn_link(fun() -> compact(Idx, State#st.mod, IdxState) end),
+    ok = couch_index_server:set_compacting(Idx, true),
     {reply, {ok, Pid}, State#st{pid=Pid}};
 handle_call(cancel, _From, #st{pid=undefined}=State) ->
     {reply, ok, State};
 handle_call(cancel, _From, #st{pid=Pid}=State) ->
     unlink(Pid),
     exit(Pid, kill),
+    ok = couch_index_server:set_compacting(State#st.idx, false),
     {reply, ok, State#st{pid=undefined}};
 handle_call(get_compacting_pid, _From, #st{pid=Pid}=State) ->
     {reply, {ok, Pid}, State};
@@ -84,6 +86,7 @@ handle_cast(_Mesg, State) ->
 
 
 handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
+    ok = couch_index_server:set_compacting(State#st.idx, false),
     {noreply, State#st{pid=undefined}};
 handle_info({'EXIT', Pid, Reason}, #st{pid = Pid} = State) ->
     #st{idx = Idx, mod = Mod} = State,
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 4e86f5e..7ec91c3 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -16,7 +16,8 @@
 
 -vsn(2).
 
--export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]).
+-export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2, close/1]).
+-export([set_committing/2, set_compacting/2]).
 
 -export([init/1, terminate/2, code_change/3]).
 -export([handle_call/3, handle_cast/2, handle_info/2]).
@@ -34,8 +35,76 @@
 -define(BY_PID, couchdb_indexes_by_pid).
 -define(BY_DB, couchdb_indexes_by_db).
 -define(RELISTEN_DELAY, 5000).
+-define(MAX_INDICES_OPEN, 500).
+
+-record(st, {
+    lru=couch_lru:new(fun maybe_close_index/1),
+    open=0,
+    max_open=?MAX_INDICES_OPEN,
+    root_dir
+}).
+
+-record(entry, {
+    name,
+    pid,
+    locked=false,
+    committing=false,
+    compacting=false,
+    waiters=undefined
+}).
+
+close(Mon) ->
+    erlang:demonitor(Mon, [flush]),
+    ok.
+
+maybe_close_lru_view(#st{open=Open, max_open=Max}=State) when Open =< Max ->
+    {ok, State};
+maybe_close_lru_view(State) ->
+    #st{lru=Lru, open=Open} = State,
+    case couch_lru:close(Lru) of
+        false ->
+            {ok, State};
+        {true, NewLru} ->
+            maybe_close_lru_view(State#st{lru=NewLru, open=Open-1})
+    end.
+
+is_idle(Pid) ->
+    case erlang:process_info(Pid, monitored_by) of
+        undefined ->
+            true;
+        {monitored_by, Pids} ->
+            [] =:= Pids -- [whereis(couch_stats_process_tracker)]
+    end.
 
--record(st, {root_dir}).
+set_compacting(Idx, IsCompacting) ->
+    gen_server:call(?MODULE, {compacting, Idx, IsCompacting}, infinity).
+
+set_committing(Pid, IsCommitting) ->
+    gen_server:call(?MODULE, {committing, Pid, IsCommitting}, infinity).
+
+maybe_close_index({DbName, DDocId, Sig}) ->
+    case ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, true}) of
+        true ->
+            case ets:lookup(?BY_SIG, {DbName, Sig}) of
+                [#entry{pid=Pid, committing=false, compacting=false}] ->
+                    case is_idle(Pid) of
+                        true ->
+                            rem_from_ets(DbName, Sig, DDocId, Pid),
+                            couch_index:stop(Pid),
+                            {true, true};
+                        false ->
+                            ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, false}),
+                            couch_stats:increment_counter([couchdb, couch_index_server, lru_skip]),
+                            {false, false}
+                    end;
+                _ ->
+                    ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, false}),
+                    couch_stats:increment_counter([couchdb, couch_index_server, lru_skip]),
+                    {false, false}
+            end;
+        false ->
+            {false, true}
+    end.
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -94,8 +163,8 @@ get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) ->
 get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) ->
     {ok, InitState} = Module:init(Db, DDoc),
     {ok, FunResp} = Fun(InitState),
-    {ok, Pid} = get_index(Module, InitState),
-    {ok, Pid, FunResp};
+    {ok, Pid, Monitor} = get_index(Module, InitState),
+    {ok, Pid, Monitor, FunResp};
 get_index(Module, Db, DDoc, _Fun) ->
     {ok, InitState} = Module:init(Db, DDoc),
     get_index(Module, InitState).
@@ -105,24 +174,31 @@ get_index(Module, IdxState) ->
     DbName = Module:get(db_name, IdxState),
     Sig = Module:get(signature, IdxState),
     case ets:lookup(?BY_SIG, {DbName, Sig}) of
-        [{_, Pid}] when is_pid(Pid) ->
-            {ok, Pid};
+        [#entry{pid=Pid, locked=false}] when is_pid(Pid) ->
+            Monitor = erlang:monitor(process, Pid),
+            {ok, Pid, Monitor};
         _ ->
             Args = {Module, IdxState, DbName, Sig},
-            gen_server:call(?MODULE, {get_index, Args}, infinity)
+            case gen_server:call(?MODULE, {get_index, Args}, infinity) of
+                {ok, Pid} ->
+                    Monitor = erlang:monitor(process, Pid),
+                    {ok, Pid, Monitor};
+                {error, Reason} ->
+                    {error, Reason}
+            end
     end.
 
-
 init([]) ->
     process_flag(trap_exit, true),
     ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
-    ets:new(?BY_SIG, [protected, set, named_table]),
-    ets:new(?BY_PID, [private, set, named_table]),
+    ets:new(?BY_SIG, [protected, set, named_table, {keypos, #entry.name}]),
+    ets:new(?BY_PID, [protected, set, named_table]),
     ets:new(?BY_DB, [protected, bag, named_table]),
     couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
     RootDir = couch_index_util:root_dir(),
+    MaxIndicesOpen = config:get_integer("couchdb", "max_indices_open", ?MAX_INDICES_OPEN),
     couch_file:init_delete_dir(RootDir),
-    {ok, #st{root_dir=RootDir}}.
+    {ok, #st{root_dir=RootDir, max_open=MaxIndicesOpen}}.
 
 
 terminate(_Reason, _State) ->
@@ -134,47 +210,69 @@ terminate(_Reason, _State) ->
 handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) ->
     case ets:lookup(?BY_SIG, {DbName, Sig}) of
         [] ->
+            {ok, NewState} = maybe_close_lru_view(State#st{open=(State#st.open)+1}),
             spawn_link(fun() -> new_index(Args) end),
-            ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
+            ets:insert(?BY_SIG, #entry{name={DbName, Sig}, waiters=[From]}),
+            {noreply, NewState};
+        [#entry{waiters=Waiters}=Entry] when is_list(Waiters) ->
+            ets:insert(?BY_SIG, Entry#entry{waiters=[From | Waiters]}),
             {noreply, State};
-        [{_, Waiters}] when is_list(Waiters) ->
-            ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
-            {noreply, State};
-        [{_, Pid}] when is_pid(Pid) ->
+        [#entry{pid=Pid}] when is_pid(Pid) ->
             {reply, {ok, Pid}, State}
     end;
 handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
-    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [#entry{waiters=Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    NewLru = couch_lru:insert({DbName, DDocId, Sig}, State#st.lru),
     [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
     link(Pid),
     add_to_ets(DbName, Sig, DDocId, Pid),
-    {reply, ok, State};
+    {reply, ok, State#st{lru=NewLru}};
 handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
-    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [#entry{waiters=Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
     [gen_server:reply(From, Error) || From <- Waiters],
     ets:delete(?BY_SIG, {DbName, Sig}),
+    {reply, ok, State#st{open=(State#st.open)-1}};
+handle_call({compacting, Pid, IsCompacting}, _From, State) ->
+    case ets:lookup(?BY_PID, Pid) of
+        [{Pid, {DbName, Sig}}] ->
+            ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.compacting, IsCompacting});
+        [] ->
+            ok
+    end,
+    {reply, ok, State};
+handle_call({committing, Pid, IsCommitting}, _From, State) ->
+    case ets:lookup(?BY_PID, Pid) of
+        [{Pid, {DbName, Sig}}] ->
+            ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.committing, IsCommitting});
+        [] ->
+            ok
+    end,
     {reply, ok, State};
 handle_call({reset_indexes, DbName}, _From, State) ->
     reset_indexes(DbName, State#st.root_dir),
-    {reply, ok, State}.
+    {reply, ok, State};
+handle_call(get_open_count, _From, State) ->
+    {reply, State#st.open, State}.
 
 
 handle_cast({reset_indexes, DbName}, State) ->
     reset_indexes(DbName, State#st.root_dir),
-    {noreply, State}.
+    {noreply, State};
+handle_cast(close_indexes, State) ->
+    {ok, NewState} = maybe_close_lru_view(State),
+    {noreply, NewState}.
 
-handle_info({'EXIT', Pid, Reason}, Server) ->
-    case ets:lookup(?BY_PID, Pid) of
+handle_info({'EXIT', Pid, Reason}, State) ->
+    NewState = case ets:lookup(?BY_PID, Pid) of
         [{Pid, {DbName, Sig}}] ->
-            [{DbName, {DDocId, Sig}}] =
-                ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
-            rem_from_ets(DbName, Sig, DDocId, Pid);
+            rem_from_ets(DbName, Sig, Pid),
+            State#st{open=(State#st.open)-1};
         [] when Reason /= normal ->
             exit(Reason);
         _Else ->
-            ok
+            State
     end,
-    {noreply, Server};
+    {noreply, NewState};
 handle_info(restart_config_listener, State) ->
     ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
     {noreply, State};
@@ -187,18 +285,20 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) ->
-    {ok, RootDir};
-handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) ->
-    {ok, RootDir};
+handle_config_change("couchdb", "index_dir", RootDir, _, #st{root_dir=RootDir}=State) ->
+    {ok, State};
+handle_config_change("couchdb", "view_index_dir", RootDir, _, #st{root_dir=RootDir}=State)
->
+    {ok, State};
 handle_config_change("couchdb", "index_dir", _, _, _) ->
     exit(whereis(couch_index_server), config_change),
     remove_handler;
 handle_config_change("couchdb", "view_index_dir", _, _, _) ->
     exit(whereis(couch_index_server), config_change),
     remove_handler;
-handle_config_change(_, _, _, _, RootDir) ->
-    {ok, RootDir}.
+handle_config_change("couchdb", "max_indices_open", Max, _, State) ->
+    {ok, State#st{max_open=list_to_integer(Max)}};
+handle_config_change(_, _, _, _, State) ->
+    {ok, State}.
 
 handle_config_terminate(_, stop, _) ->
     ok;
@@ -222,7 +322,7 @@ new_index({Mod, IdxState, DbName, Sig}) ->
 reset_indexes(DbName, Root) ->
     % shutdown all the updaters and clear the files, the db got changed
     Fun = fun({_, {DDocId, Sig}}) ->
-        [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+        [#entry{pid=Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
         MRef = erlang:monitor(process, Pid),
         gen_server:cast(Pid, delete),
         receive {'DOWN', MRef, _, _, _} -> ok end,
@@ -234,11 +334,17 @@ reset_indexes(DbName, Root) ->
 
 
 add_to_ets(DbName, Sig, DDocId, Pid) ->
-    ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
+    ets:insert(?BY_SIG, #entry{name={DbName, Sig}, pid=Pid}),
     ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
     ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
 
 
+rem_from_ets(DbName, Sig, Pid) ->
+    [{DbName, {DDocId, Sig}}] =
+        ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
+    rem_from_ets(DbName, Sig, DDocId, Pid).
+
+
 rem_from_ets(DbName, Sig, DDocId, Pid) ->
     ets:delete(?BY_SIG, {DbName, Sig}),
     ets:delete(?BY_PID, Pid),
@@ -254,7 +360,7 @@ handle_db_event(DbName, deleted, St) ->
 handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
     lists:foreach(fun({_DbName, {_DDocId, Sig}}) ->
         case ets:lookup(?BY_SIG, {DbName, Sig}) of
-            [{_, IndexPid}] ->
+            [#entry{pid=IndexPid}] ->
                 (catch gen_server:cast(IndexPid, ddoc_updated));
             [] ->
                 ok
diff --git a/src/couch_index/test/couch_index_compaction_tests.erl b/src/couch_index/test/couch_index_compaction_tests.erl
index 0787151..6cbffc8 100644
--- a/src/couch_index/test/couch_index_compaction_tests.erl
+++ b/src/couch_index/test/couch_index_compaction_tests.erl
@@ -19,9 +19,9 @@ setup() ->
     DbName = ?tempdb(),
     {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
     couch_db:close(Db),
-    {ok, IndexerPid} = fake_index(Db),
+    {ok, IndexerPid, Mon} = fake_index(Db),
     ?assertNot(is_opened(Db)),
-    {Db, IndexerPid}.
+    {Db, IndexerPid, Mon}.
 
 fake_index(#db{name = DbName} = Db) ->
     ok = meck:new([test_index], [non_strict]),
@@ -67,7 +67,7 @@ compaction_test_() ->
     }.
 
 
-hold_db_for_recompaction({Db, Idx}) ->
+hold_db_for_recompaction({Db, Idx, Mon}) ->
     ?_test(begin
         ?assertNot(is_opened(Db)),
         ok = meck:reset(test_index),
@@ -87,6 +87,7 @@ hold_db_for_recompaction({Db, Idx}) ->
         end,
 
         ?assertNot(is_opened(Db)),
+        couch_index_server:close(Mon),
         ok
     end).
 
diff --git a/src/couch_index/test/couch_index_lru_tests.erl b/src/couch_index/test/couch_index_lru_tests.erl
new file mode 100644
index 0000000..52f4244
--- /dev/null
+++ b/src/couch_index/test/couch_index_lru_tests.erl
@@ -0,0 +1,226 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_lru_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(MAX_INDICES_OPEN, 10).
+
+-record(test_idx, {
+    db_name,
+    idx_name,
+    signature
+}).
+
+
+setup() ->
+    test_util:start_couch([]),
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    config:set("couchdb", "max_indices_open", integer_to_list(?MAX_INDICES_OPEN)),
+    Db.
+
+
+teardown(Db) ->
+    ok = couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+    config:delete("couchdb", "max_indices_open"),
+    (catch couch_db:close(Db)),
+    ok.
+
+
+lru_test_() ->
+    {
+        "Test the view index LRU",
+        {
+            setup,
+            fun() -> test_util:start_couch([]) end, fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun test_close_while_compacting/1,
+                    fun test_soft_max/1
+                ]
+            }
+        }
+    }.
+
+
+test_close_while_compacting(Db) ->
+    ?_test(begin
+        ok = meck:new([couch_index_server], [passthrough]),
+        Self = self(),
+        ok = meck:expect(couch_index_server, set_compacting, fun(Idx, IsCompacting) ->
+            meck:passthrough([Idx, IsCompacting]),
+            Self ! {compact, IsCompacting, self()},
+            receive finish ->
+                ok
+            end,
+            ok
+        end),
+
+        ok = meck:expect(couch_index_server, set_committing, fun(Idx, IsCommitting) ->
+            meck:passthrough([Idx, IsCommitting]),
+            Self ! {commit, IsCommitting, self()},
+            ok
+        end),
+
+        % create ddocs
+        DDocIds = lists:map(fun(I) ->
+            BI = integer_to_binary(I),
+            <<"_design/ddoc_", BI/binary>>
+        end, lists:seq(1,?MAX_INDICES_OPEN+10)),
+        ok = create_ddocs(Db, DDocIds),
+
+        % open and compact indexes
+        Openers = lists:map(fun(DDocId) ->
+            spawn_link(fun() ->
+                {ok, Pid, Mon} = couch_index_server:get_index(couch_mrview_index, Db#db.name,
DDocId),
+                couch_index:compact(Pid),
+                receive close ->
+                    ok
+                end,
+                couch_index_server:close(Mon)
+            end)
+        end, DDocIds),
+
+        % check that all indexes are open
+        ToClose = wait_all_compacting(true, [], ?MAX_INDICES_OPEN+10),
+        ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)),
+        % close compactor pids, but still block flag from being set in BY_SIG table
+        lists:foreach(fun(Opener) -> Opener ! close end, Openers),
+        % check that compaction flag block pids from closing
+        gen_server:cast(couch_index_server, close_indexes),
+        ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)),
+        % allow compaction flag to be unset
+        lists:foreach(fun(CPid) -> CPid ! finish end, ToClose),
+        % wait until all compaction flags are unset
+        Finished = wait_all_compacting(false, [], ?MAX_INDICES_OPEN+10),
+        lists:foreach(fun(CPid) -> CPid ! finish end, Finished),
+        gen_server:cast(couch_index_server, close_indexes),
+        ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)),
+        % wait for all commits to start
+        Indexers = wait_all_committing(dict:new(), true, ?MAX_INDICES_OPEN+10),
+        ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)),
+        % force premature commit
+        [Indexer ! commit || Indexer <- Indexers],
+        % wait until commits happen
+        wait_all_committing(dict:new(), false, ?MAX_INDICES_OPEN+10),
+        gen_server:cast(couch_index_server, close_indexes),
+        % since all commits and all compacts are done, make sure indexes are closed
+        ?assertEqual(?MAX_INDICES_OPEN, gen_server:call(couch_index_server, get_open_count)),
+        % clean up
+        (catch meck:unload(couch_index_server)),
+        ok
+    end).
+
+
+test_soft_max(Db) ->
+    ?_test(begin
+        ok = meck:new([test_index], [non_strict]),
+        ok = meck:expect(test_index, init, fun(Db0, DDoc) ->
+            Sig = couch_crypto:hash(md5, term_to_binary({Db0#db.name, DDoc})),
+            {ok, #test_idx{db_name=Db0#db.name, idx_name=DDoc, signature=Sig}}
+        end),
+        ok = meck:expect(test_index, close, ['_'], {true, true}),
+        ok = meck:expect(test_index, open, fun(_Db, State) ->
+            {ok, State}
+        end),
+        ok = meck:expect(test_index, compact, ['_', '_', '_'],
+            meck:seq([{ok, 9}, {ok, 10}])), %% to trigger recompaction
+        ok = meck:expect(test_index, commit, ['_'], ok),
+        ok = meck:expect(test_index, get, fun
+            (db_name, State) ->
+                State#test_idx.db_name;
+            (idx_name, State) ->
+                State#test_idx.idx_name;
+            (signature, State) ->
+                State#test_idx.signature;
+            (update_seq, Seq) ->
+                Seq
+        end),
+
+        ok = meck:reset(test_index),
+
+        IdxOpens = lists:map(fun(I) ->
+            BI = integer_to_binary(I),
+            % hack: use tuple as index name so couch_index_server won't try to open
+            % it as a design document.
+            IndexName = {<<"_design/i", BI/binary>>},
+            ?assertEqual(I-1, gen_server:call(couch_index_server, get_open_count)),
+            couch_index_server:get_index(test_index, Db, IndexName)
+        end, lists:seq(1, 500)),
+
+        lists:foldl(fun(IdxOpen, Acc) ->
+            ?assertMatch({ok, _, _}, IdxOpen),
+            {ok, Pid, Mon} = IdxOpen,
+            ?assert(is_pid(Pid)),
+            ?assert(is_reference(Mon)),
+            ?assertNotEqual(undefined, process_info(Pid)),
+            gen_server:cast(couch_index_server, close_indexes),
+            OpenCount = gen_server:call(couch_index_server, get_open_count),
+            ?assertEqual(max(?MAX_INDICES_OPEN, Acc), OpenCount),
+            couch_index_server:close(Mon),
+            Acc-1
+        end, 500, IdxOpens),
+
+        config:delete("couchdb", "max_indices_open"),
+        (catch meck:unload(test_index)),
+        (catch meck:unload(couch_util)),
+        ok
+    end).
+
+
+wait_all_compacting(_IsCompacting, Acc, 0) ->
+    Acc;
+wait_all_compacting(IsCompacting, Acc, Remaining) ->
+    receive {compact, IsCompacting, From} ->
+        wait_all_compacting(IsCompacting, [From | Acc], Remaining-1)
+    end.
+
+
+wait_all_committing(Pids, ShouldBe, Count) ->
+    receive {commit, IsCommitting, From} ->
+        Pids0 = dict:store(From, IsCommitting, Pids),
+        CommitCount = dict:fold(fun(_K, V, Acc) ->
+            case V of
+                ShouldBe -> Acc+1;
+                _ -> Acc
+            end
+        end, 0, Pids0),
+        case Count =:= CommitCount of
+            true ->
+                [Pid || {Pid, _} <- dict:to_list(Pids0)];
+            false ->
+                wait_all_committing(Pids0, ShouldBe, Count)
+        end
+    end.
+
+
+create_ddocs(Db, DDocIds) ->
+    Docs = lists:map(fun(DDocId) ->
+        MapFun = <<"function(doc) {emit(\"", DDocId/binary, "\", 1);}">>,
+        Json = {[
+            {<<"_id">>, DDocId},
+            {<<"language">>, <<"javascript">>},
+            {<<"views">>, {[
+                {<<"v">>, {[
+                    {<<"map">>, MapFun}
+                ]}}
+            ]}}
+        ]},
+        couch_doc:from_json_obj(Json)
+    end, DDocIds),
+    {ok, _} = couch_db:update_docs(Db, Docs, [?ADMIN_CTX]),
+    ok.

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <commits@couchdb.apache.org>.

Mime
View raw message