Return-Path: X-Original-To: apmail-couchdb-commits-archive@www.apache.org Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0493110243 for ; Sat, 15 Feb 2014 09:50:46 +0000 (UTC) Received: (qmail 42524 invoked by uid 500); 15 Feb 2014 09:49:49 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 42436 invoked by uid 500); 15 Feb 2014 09:49:46 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 41298 invoked by uid 99); 15 Feb 2014 09:49:11 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 Feb 2014 09:49:11 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5E2D1927BBC; Sat, 15 Feb 2014 09:49:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benoitc@apache.org To: commits@couchdb.apache.org Date: Sat, 15 Feb 2014 09:49:57 -0000 Message-Id: In-Reply-To: <462f90d1cb314bc68f38d5e7b324ddb6@git.apache.org> References: <462f90d1cb314bc68f38d5e7b324ddb6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [51/59] [abbrv] couchdb commit: updated refs/heads/1994-merge-rcouch to 6e59a78 remove couch_index Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/372b033a Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/372b033a Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/372b033a Branch: refs/heads/1994-merge-rcouch Commit: 372b033af8ac8e0f71e918f9e1b34abffe4c1e75 Parents: 75150a1 Author: Benoit Chesneau Authored: Thu Feb 13 16:39:43 2014 +0100 Committer: Benoit Chesneau Committed: Thu Feb 13 16:39:43 2014 +0100 ---------------------------------------------------------------------- apps/couch_index/src/couch_index.app.src | 19 - apps/couch_index/src/couch_index.erl | 402 -------------------- apps/couch_index/src/couch_index_api.erl | 54 --- apps/couch_index/src/couch_index_compactor.erl | 113 ------ apps/couch_index/src/couch_index_event.erl | 65 ---- apps/couch_index/src/couch_index_event_sup.erl | 51 --- apps/couch_index/src/couch_index_indexer.erl | 221 ----------- apps/couch_index/src/couch_index_server.erl | 221 ----------- apps/couch_index/src/couch_index_sup.erl | 34 -- apps/couch_index/src/couch_index_updater.erl | 200 ---------- apps/couch_index/src/couch_index_util.erl | 77 ---- 11 files changed, 1457 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index.app.src ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index.app.src b/apps/couch_index/src/couch_index.app.src deleted file mode 100644 index 921e5d2..0000000 --- a/apps/couch_index/src/couch_index.app.src +++ /dev/null @@ -1,19 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -{application, couch_index, [ - {description, "CouchDB Secondary Index Manager"}, - {vsn, "1.3.0"}, - {modules, []}, - {registered, [couch_index_server]}, - {applications, [kernel, stdlib, couch]} -]}. http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index.erl b/apps/couch_index/src/couch_index.erl deleted file mode 100644 index 01483bb..0000000 --- a/apps/couch_index/src/couch_index.erl +++ /dev/null @@ -1,402 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index). --behaviour(gen_server). - - -%% API --export([start_link/1, stop/1, get_state/2, get_info/1]). --export([compact/1, compact/2, get_compactor_pid/1]). --export([acquire_indexer/1, release_indexer/1]). --export([config_change/3]). - -%% gen_server callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_call/3, handle_cast/2, handle_info/2]). - --include_lib("couch/include/couch_db.hrl"). - --record(st, { - mod, - idx_state, - updater, - compactor, - indexer=nil, - waiters=[], - commit_delay, - committed=true, - shutdown=false -}). - - -start_link({Module, IdxState}) -> - proc_lib:start_link(?MODULE, init, [{Module, IdxState}]). - - -stop(Pid) -> - gen_server:cast(Pid, stop). - - -get_state(Pid, RequestSeq) -> - gen_server:call(Pid, {get_state, RequestSeq}, infinity). - - -get_info(Pid) -> - gen_server:call(Pid, get_info). - - -compact(Pid) -> - compact(Pid, []). - - -compact(Pid, Options) -> - {ok, CPid} = gen_server:call(Pid, compact), - case lists:member(monitor, Options) of - true -> {ok, erlang:monitor(process, CPid)}; - false -> ok - end. - - -get_compactor_pid(Pid) -> - gen_server:call(Pid, get_compactor_pid). - - -acquire_indexer(Pid) -> - {ok, IPid} = gen_server:call(Pid, get_indexer_pid), - gen_server:call(IPid, {acquire, self()}). - -release_indexer(Pid) -> - {ok, IPid} = gen_server:call(Pid, get_indexer_pid), - gen_server:call(IPid, {release, self()}). - - -config_change("query_server_config", "commit_freq", NewValue) -> - ok = gen_server:cast(?MODULE, {config_update, NewValue}). - - -init({Mod, IdxState}) -> - ok = couch_config:register(fun ?MODULE:config_change/3), - DbName = Mod:get(db_name, IdxState), - Resp = couch_util:with_db(DbName, fun(Db) -> - case Mod:open(Db, IdxState) of - {ok, IdxSt} -> - couch_db:monitor(Db), - {ok, IdxSt}; - Error -> - Error - end - end), - case Resp of - {ok, NewIdxState} -> - {ok, UPid} = couch_index_updater:start_link(self(), Mod), - {ok, CPid} = couch_index_compactor:start_link(self(), Mod), - - Delay = couch_config:get("query_server_config", "commit_freq", "5"), - MsDelay = 1000 * list_to_integer(Delay), - State = #st{ - mod=Mod, - idx_state=NewIdxState, - updater=UPid, - compactor=CPid, - commit_delay=MsDelay - }, - Args = [ - Mod:get(db_name, IdxState), - Mod:get(idx_name, IdxState), - couch_index_util:hexsig(Mod:get(signature, IdxState)) - ], - ?LOG_INFO("Opening index for db: ~s idx: ~s sig: ~p", Args), - proc_lib:init_ack({ok, self()}), - gen_server:enter_loop(?MODULE, [], State); - Other -> - proc_lib:init_ack(Other) - end. - - -terminate(Reason, State) -> - #st{mod=Mod, idx_state=IdxState}=State, - Mod:close(IdxState), - send_all(State#st.waiters, Reason), - couch_util:shutdown_sync(State#st.updater), - couch_util:shutdown_sync(State#st.compactor), - Args = [ - Mod:get(db_name, IdxState), - Mod:get(idx_name, IdxState), - couch_index_util:hexsig(Mod:get(signature, IdxState)), - Reason - ], - ?LOG_INFO("Closing index for db: ~s idx: ~s sig: ~p~nreason: ~p", Args), - ok. - - -handle_call({get_state, ReqSeq}, From, State) -> - #st{ - mod=Mod, - idx_state=IdxState, - waiters=Waiters - } = State, - IdxSeq = Mod:get(update_seq, IdxState), - case ReqSeq =< IdxSeq of - true -> - {reply, {ok, IdxState}, State}; - _ -> % View update required - couch_index_updater:run(State#st.updater, IdxState), - Waiters2 = [{From, ReqSeq} | Waiters], - {noreply, State#st{waiters=Waiters2}, infinity} - end; -handle_call(get_info, _From, State) -> - #st{mod=Mod} = State, - {ok, Info0} = Mod:get(info, State#st.idx_state), - IsUpdating = couch_index_updater:is_running(State#st.updater), - IsCompacting = couch_index_compactor:is_running(State#st.compactor), - Info = Info0 ++ [ - {updater_running, IsUpdating}, - {compact_running, IsCompacting}, - {waiting_commit, State#st.committed == false}, - {waiting_clients, length(State#st.waiters)} - ], - {reply, {ok, Info}, State}; -handle_call(reset, _From, State) -> - #st{ - mod=Mod, - idx_state=IdxState - } = State, - {ok, NewIdxState} = Mod:reset(IdxState), - {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}}; -handle_call(compact, _From, State) -> - Resp = couch_index_compactor:run(State#st.compactor, State#st.idx_state), - {reply, Resp, State}; -handle_call(get_compactor_pid, _From, State) -> - {reply, {ok, State#st.compactor}, State}; -handle_call({compacted, NewIdxState}, _From, State) -> - #st{ - mod=Mod, - idx_state=OldIdxState, - updater=Updater, - commit_delay=Delay - } = State, - assert_signature_match(Mod, OldIdxState, NewIdxState), - NewSeq = Mod:get(update_seq, NewIdxState), - OldSeq = Mod:get(update_seq, OldIdxState), - % For indices that require swapping files, we have to make sure we're - % up to date with the current index. Otherwise indexes could roll back - % (perhaps considerably) to previous points in history. - case NewSeq >= OldSeq of - true -> - {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState), - % Restart the indexer if it's running. - case couch_index_updater:is_running(Updater) of - true -> ok = couch_index_updater:restart(Updater, NewIdxState1); - false -> ok - end, - case State#st.committed of - true -> erlang:send_after(Delay, self(), commit); - false -> ok - end, - {reply, ok, State#st{ - idx_state=NewIdxState1, - committed=false - }}; - _ -> - {reply, recompact, State} - end; -handle_call(get_indexer_pid, _From, #st{mod=Mod, idx_state=IdxState}=State) -> - Pid = case State#st.indexer of - Pid1 when is_pid(Pid1) -> - Pid1; - _ -> - DbName = Mod:get(db_name, IdxState), - {ok, IPid} = couch_index_indexer:start_link(self(), DbName), - erlang:monitor(process, IPid), - IPid - end, - {reply, {ok, Pid}, State#st{indexer=Pid}}. - - -handle_cast({config_change, NewDelay}, State) -> - MsDelay = 1000 * list_to_integer(NewDelay), - {noreply, State#st{commit_delay=MsDelay}}; -handle_cast({updated, NewIdxState}, State) -> - {noreply, NewState} = handle_cast({new_state, NewIdxState}, State), - case NewState#st.shutdown andalso (NewState#st.waiters =:= []) of - true -> - {stop, normal, NewState}; - false -> - maybe_restart_updater(NewState), - {noreply, NewState} - end; -handle_cast({new_state, NewIdxState}, State) -> - #st{ - mod=Mod, - idx_state=OldIdxState, - commit_delay=Delay - } = State, - assert_signature_match(Mod, OldIdxState, NewIdxState), - CurrSeq = Mod:get(update_seq, NewIdxState), - - DbName = Mod:get(db_name, NewIdxState), - DDocId = Mod:get(idx_name, NewIdxState), - - %% notify to event listeners that the index has been - %% updated - couch_index_event:notify({index_update, - {DbName, DDocId, - Mod}}), - Args = [ - DbName, - DDocId, - CurrSeq - ], - ?LOG_DEBUG("Updated index for db: ~s idx: ~s seq: ~B", Args), - Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState), - case State#st.committed of - true -> erlang:send_after(Delay, self(), commit); - false -> ok - end, - {noreply, State#st{ - idx_state=NewIdxState, - waiters=Rest, - committed=false - }}; -handle_cast({update_error, Error}, State) -> - send_all(State#st.waiters, Error), - {noreply, State#st{waiters=[]}}; -handle_cast(stop, State) -> - {stop, normal, State}; -handle_cast(delete, State) -> - #st{mod=Mod, idx_state=IdxState} = State, - DbName = Mod:get(db_name, IdxState), - DDocId = Mod:get(idx_name, IdxState), - - %% notify about the index deletion - couch_index_event:notify({index_delete, - {DbName, DDocId, Mod}}), - - ok = Mod:delete(IdxState), - - {stop, normal, State}; -handle_cast(ddoc_updated, State) -> - #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State, - DbName = Mod:get(db_name, IdxState), - DDocId = Mod:get(idx_name, IdxState), - - %% notify to event listeners that the index has been - %% updated - couch_index_event:notify({index_update, - {DbName, DDocId, - Mod}}), - - Shutdown = couch_util:with_db(DbName, fun(Db) -> - case couch_db:open_doc(Db, DDocId, [ejson_body]) of - {not_found, deleted} -> - true; - {ok, DDoc} -> - {ok, NewIdxState} = Mod:init(Db, DDoc), - Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState) - end - end), - case Shutdown of - true -> - case Waiters of - [] -> - {stop, normal, State}; - _ -> - {noreply, State#st{shutdown = true}} - end; - false -> - {noreply, State#st{shutdown = false}} - end; -handle_cast(_Mesg, State) -> - {stop, unhandled_cast, State}. - - -handle_info(commit, #st{committed=true}=State) -> - {noreply, State}; -handle_info(commit, State) -> - #st{mod=Mod, idx_state=IdxState, commit_delay=Delay} = State, - DbName = Mod:get(db_name, IdxState), - GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end, - CommittedSeq = couch_util:with_db(DbName, GetCommSeq), - case CommittedSeq >= Mod:get(update_seq, IdxState) of - true -> - % Commit the updates - ok = Mod:commit(IdxState), - {noreply, State#st{committed=true}}; - _ -> - % We can't commit the header because the database seq that's - % fully committed to disk is still behind us. If we committed - % now and the database lost those changes our view could be - % forever out of sync with the database. But a crash before we - % commit these changes, no big deal, we only lose incremental - % changes since last committal. - erlang:send_after(Delay, self(), commit), - {noreply, State} - end; - -handle_info({'DOWN', _, _, Pid, _}, #st{mod=Mod, idx_state=IdxState, - indexer=Pid}=State) -> - Args = [Mod:get(db_name, IdxState), - Mod:get(idx_name, IdxState)], - ?LOG_INFO("Background indexer shutdown by monitor notice for db: ~s idx: ~s", Args), - - {noreply, State#st{indexer=nil}}; - -handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) -> - DbName = Mod:get(db_name, IdxState), - DDocId = Mod:get(idx_name, IdxState), - - %% notify to event listeners that the index has been - %% updated - couch_index_event:notify({index_delete, {DbName, DDocId, Mod}}), - - Args = [DbName, DDocId], - ?LOG_INFO("Index shutdown by monitor notice for db: ~s idx: ~s", Args), - catch send_all(State#st.waiters, shutdown), - {stop, normal, State#st{waiters=[]}}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -maybe_restart_updater(#st{waiters=[]}) -> - ok; -maybe_restart_updater(#st{mod=Mod, idx_state=IdxState}=State) -> - couch_util:with_db(Mod:get(db_name, IdxState), fun(Db) -> - UpdateSeq = couch_db:get_update_seq(Db), - CommittedSeq = couch_db:get_committed_update_seq(Db), - CanUpdate = UpdateSeq > CommittedSeq, - UOpts = Mod:get(update_options, IdxState), - case CanUpdate and lists:member(committed_only, UOpts) of - true -> couch_db:ensure_full_commit(Db); - false -> ok - end - end), - couch_index_updater:run(State#st.updater, IdxState). - - -send_all(Waiters, Reply) -> - [gen_server:reply(From, Reply) || {From, _} <- Waiters]. - - -send_replies(Waiters, UpdateSeq, IdxState) -> - Pred = fun({_, S}) -> S =< UpdateSeq end, - {ToSend, Remaining} = lists:partition(Pred, Waiters), - [gen_server:reply(From, {ok, IdxState}) || {From, _} <- ToSend], - Remaining. - -assert_signature_match(Mod, OldIdxState, NewIdxState) -> - case {Mod:get(signature, OldIdxState), Mod:get(signature, NewIdxState)} of - {Sig, Sig} -> ok; - _ -> erlang:error(signature_mismatch) - end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index_api.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index_api.erl b/apps/couch_index/src/couch_index_api.erl deleted file mode 100644 index 9d3a67c..0000000 --- a/apps/couch_index/src/couch_index_api.erl +++ /dev/null @@ -1,54 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_api). - -get(Field, State) -> - ok. - -init(Db, Ddoc) -> - ok. - -open(Db, State) -> - ok. - -close(State) -> - ok. - -delete(State) -> - ok. - -reset(State) -> - ok. - - -start_update(State, PurgedState, NumChanges) -> - {ok, State}. - -purge(Db, PurgeSeq, PurgedIdRevs, State) -> - ok. - -process_doc(Doc, Seq, State) -> - ok. - -finish_update(State) -> - {ok, State}. - -commit(State) -> - ok. - - -compact(Parent, State, Opts) -> - ok. - -swap_compacted(OldState, NewState) -> - ok. http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index_compactor.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index_compactor.erl b/apps/couch_index/src/couch_index_compactor.erl deleted file mode 100644 index 6e9fb2e..0000000 --- a/apps/couch_index/src/couch_index_compactor.erl +++ /dev/null @@ -1,113 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_compactor). --behaviour(gen_server). - - -%% API --export([start_link/2, run/2, cancel/1, is_running/1]). - -%% gen_server callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_call/3, handle_cast/2, handle_info/2]). - - --include_lib("couch/include/couch_db.hrl"). - --record(st, { - idx, - mod, - pid -}). - - -start_link(Index, Module) -> - gen_server:start_link(?MODULE, {Index, Module}, []). - - -run(Pid, IdxState) -> - gen_server:call(Pid, {compact, IdxState}). - - -cancel(Pid) -> - gen_server:call(Pid, cancel). - - -is_running(Pid) -> - gen_server:call(Pid, is_running). - - -init({Index, Module}) -> - process_flag(trap_exit, true), - {ok, #st{idx=Index, mod=Module}}. - - -terminate(_Reason, State) -> - couch_util:shutdown_sync(State#st.pid), - ok. - - -handle_call({compact, _}, _From, #st{pid=Pid}=State) when is_pid(Pid) -> - {reply, {ok, Pid}, State}; -handle_call({compact, IdxState}, _From, #st{idx=Idx}=State) -> - Pid = spawn_link(fun() -> compact(Idx, State#st.mod, IdxState) end), - {reply, {ok, Pid}, State#st{pid=Pid}}; -handle_call(cancel, _From, #st{pid=undefined}=State) -> - {reply, ok, State}; -handle_call(cancel, _From, #st{pid=Pid}=State) -> - unlink(Pid), - exit(Pid, kill), - {reply, ok, State#st{pid=undefined}}; -handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) -> - {reply, true, State}; -handle_call(is_running, _From, State) -> - {reply, false, State}. - - -handle_cast(_Mesg, State) -> - {stop, unknown_cast, State}. - - -handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) -> - {noreply, State#st{pid=undefined}}; -handle_info({'EXIT', _Pid, normal}, State) -> - {noreply, State}; -handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) -> - {stop, normal, State}; -handle_info(_Mesg, State) -> - {stop, unknown_info, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -compact(Parent, Mod, IdxState) -> - compact(Parent, Mod, IdxState, []). - -compact(Idx, Mod, IdxState, Opts) -> - DbName = Mod:get(db_name, IdxState), - Args = [DbName, Mod:get(idx_name, IdxState)], - ?LOG_INFO("Compaction started for db: ~s idx: ~s", Args), - {ok, NewIdxState} = couch_util:with_db(DbName, fun(Db) -> - Mod:compact(Db, IdxState, Opts) - end), - ok = Mod:commit(NewIdxState), - case gen_server:call(Idx, {compacted, NewIdxState}) of - recompact -> - ?LOG_INFO("Compaction restarting for db: ~s idx: ~s", Args), - compact(Idx, Mod, NewIdxState, [recompact]); - _ -> - ?LOG_INFO("Compaction finished for db: ~s idx: ~s", Args), - ok - end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index_event.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index_event.erl b/apps/couch_index/src/couch_index_event.erl deleted file mode 100644 index 0cd0a6b..0000000 --- a/apps/couch_index/src/couch_index_event.erl +++ /dev/null @@ -1,65 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_event). --behaviour(gen_event). - --export([start_link/1]). --export([notify/1]). --export([stop/1]). - -%% gen_event callbacks --export([init/1, handle_event/2, handle_call/2, handle_info/2, - terminate/2, code_change/3]). - -start_link(Consumer) -> - HandlerId = {?MODULE, make_ref()}, - couch_index_event_sup:start_link(couch_index_events, HandlerId, - Consumer). - -notify(Event) -> - gen_event:notify(couch_index_events, Event). - -stop(Pid) -> - couch_index_event_sup:stop(Pid). - - -init(Consumer) -> - process_flag(trap_exit, true), - {ok, Consumer}. - -handle_event(Event, Consumer) -> - dispatch_event(Event, Consumer). - -handle_call(_Req, Consumer) -> - {reply, ok, Consumer}. - -handle_info({'EXIT', _, _}, _Consumer) -> - remove_handler; -handle_info(_Info, Consumer)-> - {ok, Consumer}. - -code_change(_OldVsn, Consumer, _Extra) -> - {ok, Consumer}. - -terminate(_Reason, _consumer) -> - ok. - -dispatch_event(Event, Fun) when is_function(Fun) -> - Fun(Event), - {ok, Fun}; -dispatch_event(Event, {Fun, Acc}) when is_function(Fun) -> - Acc2 = Fun(Event, Acc), - {ok, {Fun, Acc2}}; -dispatch_event(Event, Pid) when is_pid(Pid) -> - Pid ! Event, - {ok, Pid}. http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index_event_sup.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index_event_sup.erl b/apps/couch_index/src/couch_index_event_sup.erl deleted file mode 100644 index 68cba43..0000000 --- a/apps/couch_index/src/couch_index_event_sup.erl +++ /dev/null @@ -1,51 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_event_sup). - --export([start_link/3]). --export([stop/1]). - -%% internal gen_server callbacks --export([init/1, terminate/2, handle_call/3, handle_cast/2, - handle_info/2,code_change/3]). - -start_link(EventMgr, EventHandler, Args) -> - gen_server:start_link(?MODULE, {EventMgr, EventHandler, Args}, []). - -stop(Pid) -> - gen_server:cast(Pid, stop). - -init({EventMgr, EventHandler, Args}) -> - case gen_event:add_sup_handler(EventMgr, EventHandler, Args) of - ok -> - {ok, {EventMgr, EventHandler}}; - {stop, Error} -> - {stop, Error} - end. - -handle_call(_Whatever, _From, State) -> - {ok, State}. - -handle_cast(stop, State) -> - {stop, normal, State}. - -handle_info({gen_event_EXIT, _Handler, Reason}, State) -> - {stop, Reason, State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -terminate(_Reason, _State) -> - ok. - - http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index_indexer.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index_indexer.erl b/apps/couch_index/src/couch_index_indexer.erl deleted file mode 100644 index 727c8dd..0000000 --- a/apps/couch_index/src/couch_index_indexer.erl +++ /dev/null @@ -1,221 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_indexer). - --export([start_link/2]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {index, - dbname, - threshold, - refresh_interval, - db_updates=0, - tref=nil, - notifier=nil, - locks}). - - -start_link(Index, DbName) -> - gen_server:start_link(?MODULE, {Index, DbName}, []). - -init({Index, DbName}) -> - process_flag(trap_exit, true), - %% register to config events - Self = self(), - ok = couch_config:register(fun - ("couch_index", "threshold") -> - gen_server:cast(Self, config_threshold); - ("couch_index", "refresh_interval") -> - gen_server:cast(Self, config_refresh) - end), - - %% get defaults - Threshold = get_db_threshold(), - Refresh = get_refresh_interval(), - - %% delay background index indexing - self() ! start_indexing, - {ok, #state{index=Index, - dbname=DbName, - threshold=Threshold, - refresh_interval=Refresh, - locks=dict:new()}}. - -handle_call({acquire, Pid}, _From, #state{locks=Locks}=State) -> - NLocks = case dict:find(Pid, Locks) of - error -> - dict:store(Pid, {erlang:monitor(process, Pid), 1}, Locks); - {ok, {MRef, Refc}} -> - dict:store(Pid, {MRef, Refc+1}, Locks) - end, - {reply, ok, State#state{locks=NLocks}}; - -handle_call({release, Pid}, _From, #state{locks=Locks}=State) -> - NLocks = case dict:find(Pid, Locks) of - {ok, {MRef, 1}} -> - erlang:demonitor(MRef, [flush]), - dict:erase(Pid, Locks); - {ok, {MRef, Refc}} -> - dict:store(Pid, {MRef, Refc-1}, Locks); - error -> - Locks - end, - - NState = State#state{locks=NLocks}, - - case should_close() of - true -> {stop, normal, ok, NState}; - false -> {reply, ok, NState} - end; - -handle_call(stop, _From, State) -> - {stop, normal, ok, State}. - - -handle_cast(config_threshold, State) -> - Threshold = get_db_threshold(), - {noreply, State#state{threshold=Threshold}}; -handle_cast(config_refresh, #state{tref=TRef}=State) -> - R = get_refresh_interval(), - %% stop the old timee - if TRef /= nil -> - erlang:cancel_timer(TRef); - true -> ok - end, - %% start the new timer - NTRef = erlang:start_timer(R, self(), refresh_index), - {noreply, State#state{refresh_interval=R, tref=NTRef}}; - -handle_cast(updated, #state{index=Index, dbname=DbName, - threshold=Threshold, - db_updates=Updates}=State) -> - NUpdates = Updates + 1, - - %% we only update if the number of updates is greater than the - %% threshold. - case NUpdates =:= Threshold of - true -> - refresh_index(DbName, Index), - {noreply, State#state{db_updates=0}}; - false -> - {noreply, State#state{db_updates=NUpdates}} - - end; - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(start_indexing, #state{dbname=DbName, - refresh_interval=R}=State) -> - %% start the db notifier to watch db update events - {ok, NotifierPid} = start_db_notifier(DbName), - - %% start the timer - TRef = erlang:start_timer(R, self(), refresh_index), - - {noreply, State#state{tref=TRef, notifier=NotifierPid}}; - -handle_info({timeout, TRef, refresh_index}, #state{index=Index, - dbname=DbName, - tref=TRef, - db_updates=N}=State) -> - %% only refresh the index if an update happened - case N > 0 of - true -> - refresh_index(DbName, Index); - false -> - ok - end, - {noreply, #state{db_updates=0}=State}; - -handle_info({'DOWN', MRef, _, Pid, _}, #state{locks=Locks}=State) -> - NLocks = case dict:find(Pid, Locks) of - {ok, {MRef, _}} -> - dict:erase(Pid, Locks); - error -> - Locks - end, - - NState = State#state{locks=NLocks}, - - case should_close() of - true -> {stop, normal, NState}; - false -> {noreply, NState} - end; - -handle_info({'EXIT', Pid, _Reason}, #state{notifier=Pid}=State) -> - %% db notifier exited - {stop, normal, State#state{notifier=nil}}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -terminate(_Reason, #state{tref=TRef, notifier=Pid}) -> - if TRef /= nil -> - erlang:cancel_timer(TRef); - true -> ok - end, - - case is_pid(Pid) of - true -> couch_util:shutdown_sync(Pid); - _ -> ok - end, - ok. - -%% refresh the index to trigger updates. -refresh_index(Db, Index) -> - UpdateSeq = couch_util:with_db(Db, fun(WDb) -> - couch_db:get_update_seq(WDb) - end), - - case catch couch_index:get_state(Index, UpdateSeq) of - {ok, _} -> ok; - Error -> {error, Error} - end. - -%% if none has acquired us, we could stop the server. -should_close() -> - case process_info(self(), monitors) of - {monitors, []} -> true; - _ -> false - end. - - -%% number of max updates before refreshing the index. We don't -%% update the index on each db update. Instead we are waiting for a -%% minimum. If the minimum is not acchieved, the update will happen -%% in the next interval. -get_db_threshold() -> - list_to_integer( - couch_config:get("couch_index", "threshold", "200") - ). - -%% refresh interval in ms, the interval in which the index will be -%% updated -get_refresh_interval() -> - list_to_integer( - couch_config:get("couch_index", "refresh_interval", "1000") - ). - -%% db notifier -start_db_notifier(DbName) -> - Self = self(), - - couch_db_update_notifier:start_link(fun - ({updated, Name}) when Name =:= DbName -> - gen_server:cast(Self, updated); - (_) -> - ok - end). http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index_server.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index_server.erl b/apps/couch_index/src/couch_index_server.erl deleted file mode 100644 index 2c6ebc9..0000000 --- a/apps/couch_index/src/couch_index_server.erl +++ /dev/null @@ -1,221 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_server). --behaviour(gen_server). - --export([start_link/0, get_index/4, get_index/3, get_index/2]). --export([acquire_indexer/3, release_indexer/3]). --export([config_change/2, update_notify/1]). - --export([init/1, terminate/2, code_change/3]). --export([handle_call/3, handle_cast/2, handle_info/2]). - --include_lib("couch/include/couch_db.hrl"). - --define(BY_SIG, couchdb_indexes_by_sig). --define(BY_PID, couchdb_indexes_by_pid). --define(BY_DB, couchdb_indexes_by_db). - - --record(st, {root_dir, - notifier_pid}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - - -get_index(Module, DbName, DDoc) -> - get_index(Module, DbName, DDoc, nil). - - -get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) -> - couch_util:with_db(DbName, fun(Db) -> - get_index(Module, Db, DDoc, Fun) - end); -get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) -> - case couch_db:open_doc(Db, DDoc, [ejson_body]) of - {ok, Doc} -> get_index(Module, Db, Doc, Fun); - Error -> Error - end; -get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) -> - {ok, InitState} = Module:init(Db, DDoc), - {ok, FunResp} = Fun(InitState), - {ok, Pid} = get_index(Module, InitState), - {ok, Pid, FunResp}; -get_index(Module, Db, DDoc, _Fun) -> - {ok, InitState} = Module:init(Db, DDoc), - get_index(Module, InitState). - - -get_index(Module, IdxState) -> - DbName = Module:get(db_name, IdxState), - Sig = Module:get(signature, IdxState), - case ets:lookup(?BY_SIG, {DbName, Sig}) of - [{_, Pid}] when is_pid(Pid) -> - {ok, Pid}; - _ -> - Args = {Module, IdxState, DbName, Sig}, - gen_server:call(?MODULE, {get_index, Args}, infinity) - end. - -acquire_indexer(Module, DbName, DDoc) -> - case get_index(Module, DbName, DDoc) of - {ok, Pid} -> - couch_index:acquire_indexer(Pid); - Error -> - Error - end. - -release_indexer(Module, DbName, DDoc) -> - case get_index(Module, DbName, DDoc) of - {ok, Pid} -> - couch_index:release_indexer(Pid); - Error -> - Error - end. - - - -init([]) -> - process_flag(trap_exit, true), - couch_config:register(fun ?MODULE:config_change/2), - ets:new(?BY_SIG, [protected, set, named_table]), - ets:new(?BY_PID, [private, set, named_table]), - ets:new(?BY_DB, [protected, bag, named_table]), - - {ok, NotifierPid} = couch_db_update_notifier:start_link( - fun ?MODULE:update_notify/1), - RootDir = couch_index_util:root_dir(), - couch_file:init_delete_dir(RootDir), - {ok, #st{root_dir=RootDir, - notifier_pid=NotifierPid}}. - - -terminate(_Reason, _State) -> - Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)], - lists:map(fun couch_util:shutdown_sync/1, Pids), - ok. - - -handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) -> - case ets:lookup(?BY_SIG, {DbName, Sig}) of - [] -> - spawn_link(fun() -> new_index(Args) end), - ets:insert(?BY_SIG, {{DbName, Sig}, [From]}), - {noreply, State}; - [{_, Waiters}] when is_list(Waiters) -> - ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}), - {noreply, State}; - [{_, Pid}] when is_pid(Pid) -> - {reply, {ok, Pid}, State} - end; -handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) -> - [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), - [gen_server:reply(From, {ok, Pid}) || From <- Waiters], - link(Pid), - add_to_ets(DbName, Sig, DDocId, Pid), - {reply, ok, State}; -handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) -> - [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), - [gen_server:reply(From, Error) || From <- Waiters], - ets:delete(?BY_SIG, {DbName, Sig}), - {reply, ok, State}; -handle_call({reset_indexes, DbName}, _From, State) -> - reset_indexes(DbName, State#st.root_dir), - {reply, ok, State}. - - -handle_cast({reset_indexes, DbName}, State) -> - reset_indexes(DbName, State#st.root_dir), - {noreply, State}. - -handle_info({'EXIT', Pid, Reason}, Server) -> - case ets:lookup(?BY_PID, Pid) of - [{Pid, {DbName, Sig}}] -> - [{DbName, {DDocId, Sig}}] = - ets:match_object(?BY_DB, {DbName, {'$1', Sig}}), - rem_from_ets(DbName, Sig, DDocId, Pid); - [] when Reason /= normal -> - exit(Reason); - _Else -> - ok - end, - {noreply, Server}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -new_index({Mod, IdxState, DbName, Sig}) -> - DDocId = Mod:get(idx_name, IdxState), - case couch_index:start_link({Mod, IdxState}) of - {ok, Pid} -> - ok = gen_server:call( - ?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}}), - unlink(Pid); - Error -> - ok = gen_server:call( - ?MODULE, {async_error, {DbName, DDocId, Sig}, Error}) - end. - - -reset_indexes(DbName, Root) -> - % shutdown all the updaters and clear the files, the db got changed - Fun = fun({_, {DDocId, Sig}}) -> - [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}), - MRef = erlang:monitor(process, Pid), - gen_server:cast(Pid, delete), - receive {'DOWN', MRef, _, _, _} -> ok end, - rem_from_ets(DbName, Sig, DDocId, Pid) - end, - lists:foreach(Fun, ets:lookup(?BY_DB, DbName)), - Path = couch_index_util:index_dir("", DbName), - couch_file:nuke_dir(Root, Path). - - -add_to_ets(DbName, Sig, DDocId, Pid) -> - ets:insert(?BY_SIG, {{DbName, Sig}, Pid}), - ets:insert(?BY_PID, {Pid, {DbName, Sig}}), - ets:insert(?BY_DB, {DbName, {DDocId, Sig}}). - - -rem_from_ets(DbName, Sig, DDocId, Pid) -> - ets:delete(?BY_SIG, {DbName, Sig}), - ets:delete(?BY_PID, Pid), - ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}). - - -config_change("couchdb", "view_index_dir") -> - exit(whereis(?MODULE), config_change); -config_change("couchdb", "index_dir") -> - exit(whereis(?MODULE), config_change). - - -update_notify({deleted, DbName}) -> - gen_server:cast(?MODULE, {reset_indexes, DbName}); -update_notify({created, DbName}) -> - gen_server:cast(?MODULE, {reset_indexes, DbName}); -update_notify({ddoc_updated, {DbName, DDocId}}) -> - lists:foreach( - fun({_DbName, {_DDocId, Sig}}) -> - case ets:lookup(?BY_SIG, {DbName, Sig}) of - [{_, IndexPid}] -> - (catch gen_server:cast(IndexPid, ddoc_updated)); - [] -> - ok - end - end, - ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}})); -update_notify(_) -> - ok. http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index_sup.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index_sup.erl b/apps/couch_index/src/couch_index_sup.erl deleted file mode 100644 index 8e69016..0000000 --- a/apps/couch_index/src/couch_index_sup.erl +++ /dev/null @@ -1,34 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_sup). --behaviour(supervisor). - --export([start_link/0]). --export([init/1]). - - -%% Helper macro for declaring children of supervisor --define(CHILD(I), {I, {I, start_link, []}, permanent, 5000, worker, [I]}). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - - -init([]) -> - Server = ?CHILD(couch_index_server), - - EventSup = {couch_index_events, - {gen_event, start_link, [{local, couch_index_events}]}, - permanent, brutal_kill, worker, dynamic}, - - {ok, {{one_for_one, 10, 3600}, [Server, EventSup]}}. http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index_updater.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index_updater.erl b/apps/couch_index/src/couch_index_updater.erl deleted file mode 100644 index 9f54a56..0000000 --- a/apps/couch_index/src/couch_index_updater.erl +++ /dev/null @@ -1,200 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_updater). --behaviour(gen_server). - - -%% API --export([start_link/2, run/2, is_running/1, update/2, restart/2]). - -%% gen_server callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_call/3, handle_cast/2, handle_info/2]). - --include_lib("couch/include/couch_db.hrl"). - --record(st, { - idx, - mod, - pid=nil -}). - - -start_link(Index, Module) -> - gen_server:start_link(?MODULE, {Index, Module}, []). - - -run(Pid, IdxState) -> - gen_server:call(Pid, {update, IdxState}). - - -is_running(Pid) -> - gen_server:call(Pid, is_running). - - -update(Mod, State) -> - update(nil, Mod, State). - - -restart(Pid, IdxState) -> - gen_server:call(Pid, {restart, IdxState}). - - -init({Index, Module}) -> - process_flag(trap_exit, true), - {ok, #st{idx=Index, mod=Module}}. - - -terminate(_Reason, State) -> - couch_util:shutdown_sync(State#st.pid), - ok. - - -handle_call({update, _IdxState}, _From, #st{pid=Pid}=State) when is_pid(Pid) -> - {reply, ok, State}; -handle_call({update, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) -> - Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], - ?LOG_INFO("Starting index update for db: ~s idx: ~s", Args), - Pid = spawn_link(fun() -> update(Idx, Mod, IdxState) end), - {reply, ok, State#st{pid=Pid}}; -handle_call({restart, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) -> - Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], - ?LOG_INFO("Restarting index update for db: ~s idx: ~s", Args), - case is_pid(State#st.pid) of - true -> couch_util:shutdown_sync(State#st.pid); - _ -> ok - end, - Pid = spawn_link(fun() -> update(Idx, State#st.mod, IdxState) end), - {reply, ok, State#st{pid=Pid}}; -handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) -> - {reply, true, State}; -handle_call(is_running, _From, State) -> - {reply, false, State}. - - -handle_cast(_Mesg, State) -> - {stop, unknown_cast, State}. - - -handle_info({'EXIT', _, {updated, Pid, IdxState}}, #st{pid=Pid}=State) -> - Mod = State#st.mod, - Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], - ?LOG_INFO("Index update finished for db: ~s idx: ~s", Args), - ok = gen_server:cast(State#st.idx, {updated, IdxState}), - {noreply, State#st{pid=undefined}}; -handle_info({'EXIT', _, {reset, Pid}}, #st{idx=Idx, pid=Pid}=State) -> - {ok, NewIdxState} = gen_server:call(State#st.idx, reset), - Pid2 = spawn_link(fun() -> update(Idx, State#st.mod, NewIdxState) end), - {noreply, State#st{pid=Pid2}}; -handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) -> - {noreply, State#st{pid=undefined}}; -handle_info({'EXIT', Pid, {{nocatch, Error}, _Trace}}, State) -> - handle_info({'EXIT', Pid, Error}, State); -handle_info({'EXIT', Pid, Error}, #st{pid=Pid}=State) -> - ok = gen_server:cast(State#st.idx, {update_error, Error}), - {noreply, State#st{pid=undefined}}; -handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) -> - {stop, normal, State}; -handle_info({'EXIT', _Pid, normal}, State) -> - {noreply, State}; -handle_info(_Mesg, State) -> - {stop, unknown_info, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -update(Idx, Mod, IdxState) -> - DbName = Mod:get(db_name, IdxState), - CurrSeq = Mod:get(update_seq, IdxState), - UpdateOpts = Mod:get(update_options, IdxState), - CommittedOnly = lists:member(committed_only, UpdateOpts), - IncludeDesign = lists:member(include_design, UpdateOpts), - DocOpts = case lists:member(local_seq, UpdateOpts) of - true -> [conflicts, deleted_conflicts, local_seq]; - _ -> [conflicts, deleted_conflicts] - end, - - couch_util:with_db(DbName, fun(Db) -> - DbUpdateSeq = couch_db:get_update_seq(Db), - DbCommittedSeq = couch_db:get_committed_update_seq(Db), - - PurgedIdxState = case purge_index(Db, Mod, IdxState) of - {ok, IdxState0} -> IdxState0; - reset -> exit({reset, self()}) - end, - - NumChanges = couch_db:count_changes_since(Db, CurrSeq), - - LoadDoc = fun(DocInfo) -> - #doc_info{ - id=DocId, - high_seq=Seq, - revs=[#rev_info{deleted=Deleted} | _] - } = DocInfo, - - case {IncludeDesign, DocId} of - {false, <<"_design/", _/binary>>} -> - {nil, Seq}; - _ when Deleted -> - {#doc{id=DocId, deleted=true}, Seq}; - _ -> - {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts), - {Doc, Seq} - end - end, - - Proc = fun(DocInfo, _, {IdxStateAcc, _}) -> - HighSeq = DocInfo#doc_info.high_seq, - case CommittedOnly and (HighSeq > DbCommittedSeq) of - true -> - {stop, {IdxStateAcc, false}}; - false -> - {Doc, Seq} = LoadDoc(DocInfo), - {ok, NewSt} = Mod:process_doc(Doc, Seq, IdxStateAcc), - {ok, {NewSt, true}} - end - end, - - {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges), - Acc0 = {InitIdxState, true}, - {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []), - {ProcIdxSt, SendLast} = Acc, - - % If we didn't bail due to hitting the last committed seq we need - % to send our last update_seq through. - {ok, LastIdxSt} = case SendLast of - true -> - Mod:process_doc(nil, DbUpdateSeq, ProcIdxSt); - _ -> - {ok, ProcIdxSt} - end, - - {ok, FinalIdxState} = Mod:finish_update(LastIdxSt), - exit({updated, self(), FinalIdxState}) - end). - - -purge_index(Db, Mod, IdxState) -> - DbPurgeSeq = couch_db:get_purge_seq(Db), - IdxPurgeSeq = Mod:get(purge_seq, IdxState), - if - DbPurgeSeq == IdxPurgeSeq -> - {ok, IdxState}; - DbPurgeSeq == IdxPurgeSeq + 1 -> - {ok, PurgedIdRevs} = couch_db:get_last_purged(Db), - Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState); - true -> - reset - end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/372b033a/apps/couch_index/src/couch_index_util.erl ---------------------------------------------------------------------- diff --git a/apps/couch_index/src/couch_index_util.erl b/apps/couch_index/src/couch_index_util.erl deleted file mode 100644 index c833920..0000000 --- a/apps/couch_index/src/couch_index_util.erl +++ /dev/null @@ -1,77 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_util). - --export([root_dir/0, index_dir/2, index_file/3]). --export([load_doc/3, sort_lib/1, hexsig/1]). - --include_lib("couch/include/couch_db.hrl"). - -root_dir() -> - couch_config:get("couchdb", "view_index_dir"). - - -index_dir(Module, DbName) when is_binary(DbName) -> - DbDir = "." ++ binary_to_list(DbName) ++ "_design", - filename:join([root_dir(), DbDir, Module]); -index_dir(Module, #db{}=Db) -> - index_dir(Module, couch_db:name(Db)). - - -index_file(Module, DbName, FileName) -> - filename:join(index_dir(Module, DbName), FileName). - - -load_doc(Db, #doc_info{}=DI, Opts) -> - Deleted = lists:member(deleted, Opts), - case (catch couch_db:open_doc(Db, DI, Opts)) of - {ok, #doc{deleted=false}=Doc} -> Doc; - {ok, #doc{deleted=true}=Doc} when Deleted -> Doc; - _Else -> null - end; -load_doc(Db, {DocId, Rev}, Opts) -> - case (catch load_doc(Db, DocId, Rev, Opts)) of - #doc{deleted=false} = Doc -> Doc; - _ -> null - end. - - -load_doc(Db, DocId, Rev, Options) -> - case Rev of - nil -> % open most recent rev - case (catch couch_db:open_doc(Db, DocId, Options)) of - {ok, Doc} -> Doc; - _Error -> null - end; - _ -> % open a specific rev (deletions come back as stubs) - case (catch couch_db:open_doc_revs(Db, DocId, [Rev], Options)) of - {ok, [{ok, Doc}]} -> Doc; - {ok, [{{not_found, missing}, Rev}]} -> null; - {ok, [_Else]} -> null - end - end. - - -sort_lib({Lib}) -> - sort_lib(Lib, []). -sort_lib([], LAcc) -> - lists:keysort(1, LAcc); -sort_lib([{LName, {LObj}}|Rest], LAcc) -> - LSorted = sort_lib(LObj, []), % descend into nested object - sort_lib(Rest, [{LName, LSorted}|LAcc]); -sort_lib([{LName, LCode}|Rest], LAcc) -> - sort_lib(Rest, [{LName, LCode}|LAcc]). - - -hexsig(Sig) -> - couch_util:to_hex(binary_to_list(Sig)).