Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 85A41200C80 for ; Thu, 20 Apr 2017 00:18:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8456F160B9C; Wed, 19 Apr 2017 22:18:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 32AD4160BAD for ; Thu, 20 Apr 2017 00:18:38 +0200 (CEST) Received: (qmail 80361 invoked by uid 500); 19 Apr 2017 22:18:37 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 80311 invoked by uid 99); 19 Apr 2017 22:18:37 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Apr 2017 22:18:37 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 21363865A5; Wed, 19 Apr 2017 22:18:35 +0000 (UTC) Date: Wed, 19 Apr 2017 22:18:38 +0000 To: "commits@couchdb.apache.org" Subject: [couchdb] 03/07: Use LRU for view indexes MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: bbastian@apache.org Reply-To: "commits@couchdb.apache.org" In-Reply-To: <149264031545.4433.11300063988731634901@gitbox.apache.org> References: <149264031545.4433.11300063988731634901@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: couchdb X-Git-Refname: refs/heads/COUCHDB-3377 X-Git-Reftype: branch X-Git-Rev: f0a891285ca393308093a24e3bf7f5c3e45812e1 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.3.dev Auto-Submitted: auto-generated Message-Id: <20170419221836.21363865A5@gitbox.apache.org> archived-at: Wed, 19 Apr 2017 22:18:39 -0000 This is an automated email from the ASF dual-hosted git repository. bbastian pushed a commit to branch COUCHDB-3377 in repository https://gitbox.apache.org/repos/asf/couchdb.git commit f0a891285ca393308093a24e3bf7f5c3e45812e1 Author: Benjamin Bastian AuthorDate: Wed Mar 29 14:44:23 2017 -0700 Use LRU for view indexes --- src/couch_index/priv/stats_descriptions.cfg | 20 ++ src/couch_index/src/couch_index.erl | 16 +- src/couch_index/src/couch_index_compactor.erl | 3 + src/couch_index/src/couch_index_server.erl | 180 ++++++++++++---- .../test/couch_index_compaction_tests.erl | 7 +- src/couch_index/test/couch_index_lru_tests.erl | 226 +++++++++++++++++++++ 6 files changed, 408 insertions(+), 44 deletions(-) diff --git a/src/couch_index/priv/stats_descriptions.cfg b/src/couch_index/priv/stats_descriptions.cfg new file mode 100644 index 0000000..aac317f --- /dev/null +++ b/src/couch_index/priv/stats_descriptions.cfg @@ -0,0 +1,20 @@ +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. + +% Style guide for descriptions: Start with a lowercase letter & do not add +% a trailing full-stop / period +% Please keep this in alphabetical order + +{[couchdb, couch_index_server, lru_skip], [ + {type, counter}, + {desc, <<"number of couch_index_server LRU operations skipped">>} +]}. diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl index c86f5e1..99b5e8a 100644 --- a/src/couch_index/src/couch_index.erl +++ b/src/couch_index/src/couch_index.erl @@ -240,8 +240,11 @@ handle_cast({new_state, NewIdxState}, State) -> couch_log:debug("Updated index for db: ~s idx: ~s seq: ~B", Args), Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState), case State#st.committed of - true -> erlang:send_after(commit_delay(), self(), commit); - false -> ok + true -> + ok = couch_index_server:set_committing(self(), true), + erlang:send_after(commit_delay(), self(), commit); + false -> + ok end, {noreply, State#st{ idx_state=NewIdxState, @@ -297,6 +300,7 @@ handle_info(commit, State) -> % Commit the updates ok = Mod:commit(IdxState), couch_event:notify(DbName, {index_commit, IdxName}), + ok = couch_index_server:set_committing(self(), false), {noreply, State#st{committed=true}}; _ -> % We can't commit the header because the database seq that's @@ -305,6 +309,7 @@ handle_info(commit, State) -> % forever out of sync with the database. But a crash before we % commit these changes, no big deal, we only lose incremental % changes since last committal. + ok = couch_index_server:set_committing(self(), true), erlang:send_after(commit_delay(), self(), commit), {noreply, State} end; @@ -385,8 +390,11 @@ commit_compacted(NewIdxState, State) -> false -> ok end, case State#st.committed of - true -> erlang:send_after(commit_delay(), self(), commit); - false -> ok + true -> + ok = couch_index_server:set_committing(self(), true), + erlang:send_after(commit_delay(), self(), commit); + false -> + ok end, State#st{ idx_state=NewIdxState1, diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl index 61f406c..2271c35 100644 --- a/src/couch_index/src/couch_index_compactor.erl +++ b/src/couch_index/src/couch_index_compactor.erl @@ -64,12 +64,14 @@ handle_call({compact, _}, _From, #st{pid=Pid}=State) when is_pid(Pid) -> {reply, {ok, Pid}, State}; handle_call({compact, IdxState}, _From, #st{idx=Idx}=State) -> Pid = spawn_link(fun() -> compact(Idx, State#st.mod, IdxState) end), + ok = couch_index_server:set_compacting(Idx, true), {reply, {ok, Pid}, State#st{pid=Pid}}; handle_call(cancel, _From, #st{pid=undefined}=State) -> {reply, ok, State}; handle_call(cancel, _From, #st{pid=Pid}=State) -> unlink(Pid), exit(Pid, kill), + ok = couch_index_server:set_compacting(State#st.idx, false), {reply, ok, State#st{pid=undefined}}; handle_call(get_compacting_pid, _From, #st{pid=Pid}=State) -> {reply, {ok, Pid}, State}; @@ -84,6 +86,7 @@ handle_cast(_Mesg, State) -> handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) -> + ok = couch_index_server:set_compacting(State#st.idx, false), {noreply, State#st{pid=undefined}}; handle_info({'EXIT', Pid, Reason}, #st{pid = Pid} = State) -> #st{idx = Idx, mod = Mod} = State, diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl index 4e86f5e..7ec91c3 100644 --- a/src/couch_index/src/couch_index_server.erl +++ b/src/couch_index/src/couch_index_server.erl @@ -16,7 +16,8 @@ -vsn(2). --export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]). +-export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2, close/1]). +-export([set_committing/2, set_compacting/2]). -export([init/1, terminate/2, code_change/3]). -export([handle_call/3, handle_cast/2, handle_info/2]). @@ -34,8 +35,76 @@ -define(BY_PID, couchdb_indexes_by_pid). -define(BY_DB, couchdb_indexes_by_db). -define(RELISTEN_DELAY, 5000). +-define(MAX_INDICES_OPEN, 500). + +-record(st, { + lru=couch_lru:new(fun maybe_close_index/1), + open=0, + max_open=?MAX_INDICES_OPEN, + root_dir +}). + +-record(entry, { + name, + pid, + locked=false, + committing=false, + compacting=false, + waiters=undefined +}). + +close(Mon) -> + erlang:demonitor(Mon, [flush]), + ok. + +maybe_close_lru_view(#st{open=Open, max_open=Max}=State) when Open =< Max -> + {ok, State}; +maybe_close_lru_view(State) -> + #st{lru=Lru, open=Open} = State, + case couch_lru:close(Lru) of + false -> + {ok, State}; + {true, NewLru} -> + maybe_close_lru_view(State#st{lru=NewLru, open=Open-1}) + end. + +is_idle(Pid) -> + case erlang:process_info(Pid, monitored_by) of + undefined -> + true; + {monitored_by, Pids} -> + [] =:= Pids -- [whereis(couch_stats_process_tracker)] + end. --record(st, {root_dir}). +set_compacting(Idx, IsCompacting) -> + gen_server:call(?MODULE, {compacting, Idx, IsCompacting}, infinity). + +set_committing(Pid, IsCommitting) -> + gen_server:call(?MODULE, {committing, Pid, IsCommitting}, infinity). + +maybe_close_index({DbName, DDocId, Sig}) -> + case ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, true}) of + true -> + case ets:lookup(?BY_SIG, {DbName, Sig}) of + [#entry{pid=Pid, committing=false, compacting=false}] -> + case is_idle(Pid) of + true -> + rem_from_ets(DbName, Sig, DDocId, Pid), + couch_index:stop(Pid), + {true, true}; + false -> + ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, false}), + couch_stats:increment_counter([couchdb, couch_index_server, lru_skip]), + {false, false} + end; + _ -> + ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, false}), + couch_stats:increment_counter([couchdb, couch_index_server, lru_skip]), + {false, false} + end; + false -> + {false, true} + end. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -94,8 +163,8 @@ get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) -> get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) -> {ok, InitState} = Module:init(Db, DDoc), {ok, FunResp} = Fun(InitState), - {ok, Pid} = get_index(Module, InitState), - {ok, Pid, FunResp}; + {ok, Pid, Monitor} = get_index(Module, InitState), + {ok, Pid, Monitor, FunResp}; get_index(Module, Db, DDoc, _Fun) -> {ok, InitState} = Module:init(Db, DDoc), get_index(Module, InitState). @@ -105,24 +174,31 @@ get_index(Module, IdxState) -> DbName = Module:get(db_name, IdxState), Sig = Module:get(signature, IdxState), case ets:lookup(?BY_SIG, {DbName, Sig}) of - [{_, Pid}] when is_pid(Pid) -> - {ok, Pid}; + [#entry{pid=Pid, locked=false}] when is_pid(Pid) -> + Monitor = erlang:monitor(process, Pid), + {ok, Pid, Monitor}; _ -> Args = {Module, IdxState, DbName, Sig}, - gen_server:call(?MODULE, {get_index, Args}, infinity) + case gen_server:call(?MODULE, {get_index, Args}, infinity) of + {ok, Pid} -> + Monitor = erlang:monitor(process, Pid), + {ok, Pid, Monitor}; + {error, Reason} -> + {error, Reason} + end end. - init([]) -> process_flag(trap_exit, true), ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()), - ets:new(?BY_SIG, [protected, set, named_table]), - ets:new(?BY_PID, [private, set, named_table]), + ets:new(?BY_SIG, [protected, set, named_table, {keypos, #entry.name}]), + ets:new(?BY_PID, [protected, set, named_table]), ets:new(?BY_DB, [protected, bag, named_table]), couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), RootDir = couch_index_util:root_dir(), + MaxIndicesOpen = config:get_integer("couchdb", "max_indices_open", ?MAX_INDICES_OPEN), couch_file:init_delete_dir(RootDir), - {ok, #st{root_dir=RootDir}}. + {ok, #st{root_dir=RootDir, max_open=MaxIndicesOpen}}. terminate(_Reason, _State) -> @@ -134,47 +210,69 @@ terminate(_Reason, _State) -> handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) -> case ets:lookup(?BY_SIG, {DbName, Sig}) of [] -> + {ok, NewState} = maybe_close_lru_view(State#st{open=(State#st.open)+1}), spawn_link(fun() -> new_index(Args) end), - ets:insert(?BY_SIG, {{DbName, Sig}, [From]}), + ets:insert(?BY_SIG, #entry{name={DbName, Sig}, waiters=[From]}), + {noreply, NewState}; + [#entry{waiters=Waiters}=Entry] when is_list(Waiters) -> + ets:insert(?BY_SIG, Entry#entry{waiters=[From | Waiters]}), {noreply, State}; - [{_, Waiters}] when is_list(Waiters) -> - ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}), - {noreply, State}; - [{_, Pid}] when is_pid(Pid) -> + [#entry{pid=Pid}] when is_pid(Pid) -> {reply, {ok, Pid}, State} end; handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) -> - [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), + [#entry{waiters=Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), + NewLru = couch_lru:insert({DbName, DDocId, Sig}, State#st.lru), [gen_server:reply(From, {ok, Pid}) || From <- Waiters], link(Pid), add_to_ets(DbName, Sig, DDocId, Pid), - {reply, ok, State}; + {reply, ok, State#st{lru=NewLru}}; handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) -> - [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), + [#entry{waiters=Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), [gen_server:reply(From, Error) || From <- Waiters], ets:delete(?BY_SIG, {DbName, Sig}), + {reply, ok, State#st{open=(State#st.open)-1}}; +handle_call({compacting, Pid, IsCompacting}, _From, State) -> + case ets:lookup(?BY_PID, Pid) of + [{Pid, {DbName, Sig}}] -> + ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.compacting, IsCompacting}); + [] -> + ok + end, + {reply, ok, State}; +handle_call({committing, Pid, IsCommitting}, _From, State) -> + case ets:lookup(?BY_PID, Pid) of + [{Pid, {DbName, Sig}}] -> + ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.committing, IsCommitting}); + [] -> + ok + end, {reply, ok, State}; handle_call({reset_indexes, DbName}, _From, State) -> reset_indexes(DbName, State#st.root_dir), - {reply, ok, State}. + {reply, ok, State}; +handle_call(get_open_count, _From, State) -> + {reply, State#st.open, State}. handle_cast({reset_indexes, DbName}, State) -> reset_indexes(DbName, State#st.root_dir), - {noreply, State}. + {noreply, State}; +handle_cast(close_indexes, State) -> + {ok, NewState} = maybe_close_lru_view(State), + {noreply, NewState}. -handle_info({'EXIT', Pid, Reason}, Server) -> - case ets:lookup(?BY_PID, Pid) of +handle_info({'EXIT', Pid, Reason}, State) -> + NewState = case ets:lookup(?BY_PID, Pid) of [{Pid, {DbName, Sig}}] -> - [{DbName, {DDocId, Sig}}] = - ets:match_object(?BY_DB, {DbName, {'$1', Sig}}), - rem_from_ets(DbName, Sig, DDocId, Pid); + rem_from_ets(DbName, Sig, Pid), + State#st{open=(State#st.open)-1}; [] when Reason /= normal -> exit(Reason); _Else -> - ok + State end, - {noreply, Server}; + {noreply, NewState}; handle_info(restart_config_listener, State) -> ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()), {noreply, State}; @@ -187,18 +285,20 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) -> - {ok, RootDir}; -handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) -> - {ok, RootDir}; +handle_config_change("couchdb", "index_dir", RootDir, _, #st{root_dir=RootDir}=State) -> + {ok, State}; +handle_config_change("couchdb", "view_index_dir", RootDir, _, #st{root_dir=RootDir}=State) -> + {ok, State}; handle_config_change("couchdb", "index_dir", _, _, _) -> exit(whereis(couch_index_server), config_change), remove_handler; handle_config_change("couchdb", "view_index_dir", _, _, _) -> exit(whereis(couch_index_server), config_change), remove_handler; -handle_config_change(_, _, _, _, RootDir) -> - {ok, RootDir}. +handle_config_change("couchdb", "max_indices_open", Max, _, State) -> + {ok, State#st{max_open=list_to_integer(Max)}}; +handle_config_change(_, _, _, _, State) -> + {ok, State}. handle_config_terminate(_, stop, _) -> ok; @@ -222,7 +322,7 @@ new_index({Mod, IdxState, DbName, Sig}) -> reset_indexes(DbName, Root) -> % shutdown all the updaters and clear the files, the db got changed Fun = fun({_, {DDocId, Sig}}) -> - [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}), + [#entry{pid=Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}), MRef = erlang:monitor(process, Pid), gen_server:cast(Pid, delete), receive {'DOWN', MRef, _, _, _} -> ok end, @@ -234,11 +334,17 @@ reset_indexes(DbName, Root) -> add_to_ets(DbName, Sig, DDocId, Pid) -> - ets:insert(?BY_SIG, {{DbName, Sig}, Pid}), + ets:insert(?BY_SIG, #entry{name={DbName, Sig}, pid=Pid}), ets:insert(?BY_PID, {Pid, {DbName, Sig}}), ets:insert(?BY_DB, {DbName, {DDocId, Sig}}). +rem_from_ets(DbName, Sig, Pid) -> + [{DbName, {DDocId, Sig}}] = + ets:match_object(?BY_DB, {DbName, {'$1', Sig}}), + rem_from_ets(DbName, Sig, DDocId, Pid). + + rem_from_ets(DbName, Sig, DDocId, Pid) -> ets:delete(?BY_SIG, {DbName, Sig}), ets:delete(?BY_PID, Pid), @@ -254,7 +360,7 @@ handle_db_event(DbName, deleted, St) -> handle_db_event(DbName, {ddoc_updated, DDocId}, St) -> lists:foreach(fun({_DbName, {_DDocId, Sig}}) -> case ets:lookup(?BY_SIG, {DbName, Sig}) of - [{_, IndexPid}] -> + [#entry{pid=IndexPid}] -> (catch gen_server:cast(IndexPid, ddoc_updated)); [] -> ok diff --git a/src/couch_index/test/couch_index_compaction_tests.erl b/src/couch_index/test/couch_index_compaction_tests.erl index 0787151..6cbffc8 100644 --- a/src/couch_index/test/couch_index_compaction_tests.erl +++ b/src/couch_index/test/couch_index_compaction_tests.erl @@ -19,9 +19,9 @@ setup() -> DbName = ?tempdb(), {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), couch_db:close(Db), - {ok, IndexerPid} = fake_index(Db), + {ok, IndexerPid, Mon} = fake_index(Db), ?assertNot(is_opened(Db)), - {Db, IndexerPid}. + {Db, IndexerPid, Mon}. fake_index(#db{name = DbName} = Db) -> ok = meck:new([test_index], [non_strict]), @@ -67,7 +67,7 @@ compaction_test_() -> }. -hold_db_for_recompaction({Db, Idx}) -> +hold_db_for_recompaction({Db, Idx, Mon}) -> ?_test(begin ?assertNot(is_opened(Db)), ok = meck:reset(test_index), @@ -87,6 +87,7 @@ hold_db_for_recompaction({Db, Idx}) -> end, ?assertNot(is_opened(Db)), + couch_index_server:close(Mon), ok end). diff --git a/src/couch_index/test/couch_index_lru_tests.erl b/src/couch_index/test/couch_index_lru_tests.erl new file mode 100644 index 0000000..52f4244 --- /dev/null +++ b/src/couch_index/test/couch_index_lru_tests.erl @@ -0,0 +1,226 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_index_lru_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(MAX_INDICES_OPEN, 10). + +-record(test_idx, { + db_name, + idx_name, + signature +}). + + +setup() -> + test_util:start_couch([]), + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + config:set("couchdb", "max_indices_open", integer_to_list(?MAX_INDICES_OPEN)), + Db. + + +teardown(Db) -> + ok = couch_server:delete(Db#db.name, [?ADMIN_CTX]), + config:delete("couchdb", "max_indices_open"), + (catch couch_db:close(Db)), + ok. + + +lru_test_() -> + { + "Test the view index LRU", + { + setup, + fun() -> test_util:start_couch([]) end, fun test_util:stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun test_close_while_compacting/1, + fun test_soft_max/1 + ] + } + } + }. + + +test_close_while_compacting(Db) -> + ?_test(begin + ok = meck:new([couch_index_server], [passthrough]), + Self = self(), + ok = meck:expect(couch_index_server, set_compacting, fun(Idx, IsCompacting) -> + meck:passthrough([Idx, IsCompacting]), + Self ! {compact, IsCompacting, self()}, + receive finish -> + ok + end, + ok + end), + + ok = meck:expect(couch_index_server, set_committing, fun(Idx, IsCommitting) -> + meck:passthrough([Idx, IsCommitting]), + Self ! {commit, IsCommitting, self()}, + ok + end), + + % create ddocs + DDocIds = lists:map(fun(I) -> + BI = integer_to_binary(I), + <<"_design/ddoc_", BI/binary>> + end, lists:seq(1,?MAX_INDICES_OPEN+10)), + ok = create_ddocs(Db, DDocIds), + + % open and compact indexes + Openers = lists:map(fun(DDocId) -> + spawn_link(fun() -> + {ok, Pid, Mon} = couch_index_server:get_index(couch_mrview_index, Db#db.name, DDocId), + couch_index:compact(Pid), + receive close -> + ok + end, + couch_index_server:close(Mon) + end) + end, DDocIds), + + % check that all indexes are open + ToClose = wait_all_compacting(true, [], ?MAX_INDICES_OPEN+10), + ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)), + % close compactor pids, but still block flag from being set in BY_SIG table + lists:foreach(fun(Opener) -> Opener ! close end, Openers), + % check that compaction flag block pids from closing + gen_server:cast(couch_index_server, close_indexes), + ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)), + % allow compaction flag to be unset + lists:foreach(fun(CPid) -> CPid ! finish end, ToClose), + % wait until all compaction flags are unset + Finished = wait_all_compacting(false, [], ?MAX_INDICES_OPEN+10), + lists:foreach(fun(CPid) -> CPid ! finish end, Finished), + gen_server:cast(couch_index_server, close_indexes), + ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)), + % wait for all commits to start + Indexers = wait_all_committing(dict:new(), true, ?MAX_INDICES_OPEN+10), + ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)), + % force premature commit + [Indexer ! commit || Indexer <- Indexers], + % wait until commits happen + wait_all_committing(dict:new(), false, ?MAX_INDICES_OPEN+10), + gen_server:cast(couch_index_server, close_indexes), + % since all commits and all compacts are done, make sure indexes are closed + ?assertEqual(?MAX_INDICES_OPEN, gen_server:call(couch_index_server, get_open_count)), + % clean up + (catch meck:unload(couch_index_server)), + ok + end). + + +test_soft_max(Db) -> + ?_test(begin + ok = meck:new([test_index], [non_strict]), + ok = meck:expect(test_index, init, fun(Db0, DDoc) -> + Sig = couch_crypto:hash(md5, term_to_binary({Db0#db.name, DDoc})), + {ok, #test_idx{db_name=Db0#db.name, idx_name=DDoc, signature=Sig}} + end), + ok = meck:expect(test_index, close, ['_'], {true, true}), + ok = meck:expect(test_index, open, fun(_Db, State) -> + {ok, State} + end), + ok = meck:expect(test_index, compact, ['_', '_', '_'], + meck:seq([{ok, 9}, {ok, 10}])), %% to trigger recompaction + ok = meck:expect(test_index, commit, ['_'], ok), + ok = meck:expect(test_index, get, fun + (db_name, State) -> + State#test_idx.db_name; + (idx_name, State) -> + State#test_idx.idx_name; + (signature, State) -> + State#test_idx.signature; + (update_seq, Seq) -> + Seq + end), + + ok = meck:reset(test_index), + + IdxOpens = lists:map(fun(I) -> + BI = integer_to_binary(I), + % hack: use tuple as index name so couch_index_server won't try to open + % it as a design document. + IndexName = {<<"_design/i", BI/binary>>}, + ?assertEqual(I-1, gen_server:call(couch_index_server, get_open_count)), + couch_index_server:get_index(test_index, Db, IndexName) + end, lists:seq(1, 500)), + + lists:foldl(fun(IdxOpen, Acc) -> + ?assertMatch({ok, _, _}, IdxOpen), + {ok, Pid, Mon} = IdxOpen, + ?assert(is_pid(Pid)), + ?assert(is_reference(Mon)), + ?assertNotEqual(undefined, process_info(Pid)), + gen_server:cast(couch_index_server, close_indexes), + OpenCount = gen_server:call(couch_index_server, get_open_count), + ?assertEqual(max(?MAX_INDICES_OPEN, Acc), OpenCount), + couch_index_server:close(Mon), + Acc-1 + end, 500, IdxOpens), + + config:delete("couchdb", "max_indices_open"), + (catch meck:unload(test_index)), + (catch meck:unload(couch_util)), + ok + end). + + +wait_all_compacting(_IsCompacting, Acc, 0) -> + Acc; +wait_all_compacting(IsCompacting, Acc, Remaining) -> + receive {compact, IsCompacting, From} -> + wait_all_compacting(IsCompacting, [From | Acc], Remaining-1) + end. + + +wait_all_committing(Pids, ShouldBe, Count) -> + receive {commit, IsCommitting, From} -> + Pids0 = dict:store(From, IsCommitting, Pids), + CommitCount = dict:fold(fun(_K, V, Acc) -> + case V of + ShouldBe -> Acc+1; + _ -> Acc + end + end, 0, Pids0), + case Count =:= CommitCount of + true -> + [Pid || {Pid, _} <- dict:to_list(Pids0)]; + false -> + wait_all_committing(Pids0, ShouldBe, Count) + end + end. + + +create_ddocs(Db, DDocIds) -> + Docs = lists:map(fun(DDocId) -> + MapFun = <<"function(doc) {emit(\"", DDocId/binary, "\", 1);}">>, + Json = {[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"views">>, {[ + {<<"v">>, {[ + {<<"map">>, MapFun} + ]}} + ]}} + ]}, + couch_doc:from_json_obj(Json) + end, DDocIds), + {ok, _} = couch_db:update_docs(Db, Docs, [?ADMIN_CTX]), + ok. -- To stop receiving notification emails like this one, please contact "commits@couchdb.apache.org" .