couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject svn commit: r1171180 - in /couchdb/trunk/src/couch_index: ./ Makefile.am src/ src/couch_index.app.src src/couch_index.erl src/couch_index_api.erl src/couch_index_compactor.erl src/couch_index_server.erl src/couch_index_updater.erl src/couch_index_util.erl
Date Thu, 15 Sep 2011 17:22:31 GMT
Author: davisp
Date: Thu Sep 15 17:22:30 2011
New Revision: 1171180

URL: http://svn.apache.org/viewvc?rev=1171180&view=rev
Log:
Paul is an idiot commit 1 of 3.

Forgot I was working with a retarded VCS. These are the file changes
for the previous three commits.


Added:
    couchdb/trunk/src/couch_index/
    couchdb/trunk/src/couch_index/Makefile.am
    couchdb/trunk/src/couch_index/src/
    couchdb/trunk/src/couch_index/src/couch_index.app.src
    couchdb/trunk/src/couch_index/src/couch_index.erl
    couchdb/trunk/src/couch_index/src/couch_index_api.erl
    couchdb/trunk/src/couch_index/src/couch_index_compactor.erl
    couchdb/trunk/src/couch_index/src/couch_index_server.erl
    couchdb/trunk/src/couch_index/src/couch_index_updater.erl
    couchdb/trunk/src/couch_index/src/couch_index_util.erl

Added: couchdb/trunk/src/couch_index/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/Makefile.am?rev=1171180&view=auto
==============================================================================
--- couchdb/trunk/src/couch_index/Makefile.am (added)
+++ couchdb/trunk/src/couch_index/Makefile.am Thu Sep 15 17:22:30 2011
@@ -0,0 +1,38 @@
+## Licensed under the Apache License, Version 2.0 (the "License"); you may not
+## use this file except in compliance with the License. You may obtain a copy of
+## the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+## License for the specific language governing permissions and limitations under
+## the License.
+
+couch_indexlibdir = $(localerlanglibdir)/couch_index-0.1
+couch_indexebindir = $(couch_indexlibdir)/ebin
+
+couch_indexebin_DATA = $(compiled_files)
+
+EXTRA_DIST = $(source_files)
+CLEANFILES = $(compiled_files)
+
+source_files = \
+    src/couch_index.erl \
+    src/couch_index_compactor.erl \
+    src/couch_index_server.erl \
+    src/couch_index_updater.erl \
+	src/couch_index_util.erl
+
+compiled_files = \
+    ebin/couch_index.beam \
+    ebin/couch_index_compactor.beam \
+    ebin/couch_index_server.beam \
+    ebin/couch_index_updater.beam \
+	ebin/couch_index_util.beam
+
+ebin/%.beam: src/%.erl
+	@mkdir -p ebin/
+	$(ERLC) -I$(top_srcdir)/src/couchdb -o ebin/ $(ERLC_FLAGS) ${TEST} $<;
+

Added: couchdb/trunk/src/couch_index/src/couch_index.app.src
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index.app.src?rev=1171180&view=auto
==============================================================================
--- couchdb/trunk/src/couch_index/src/couch_index.app.src (added)
+++ couchdb/trunk/src/couch_index/src/couch_index.app.src Thu Sep 15 17:22:30 2011
@@ -0,0 +1,22 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_index, [
+    {description, "CouchDB Secondary Index Manager"},
+    {vsn, "@version@"},
+    {modules, [
+        couch_index,
+        couch_index_server
+    ]},
+    {registered, [couch_index_server]},
+    {applications, [kernel, stdlib]},
+]}.

Added: couchdb/trunk/src/couch_index/src/couch_index.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index.erl?rev=1171180&view=auto
==============================================================================
--- couchdb/trunk/src/couch_index/src/couch_index.erl (added)
+++ couchdb/trunk/src/couch_index/src/couch_index.erl Thu Sep 15 17:22:30 2011
@@ -0,0 +1,264 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index).
+-behaviour(gen_server).
+
+
+%% API
+-export([start_link/1, stop/1, get_state/2, get_info/1]).
+-export([compact/1, get_compactor_pid/1]).
+-export([config_change/3]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+
+-record(st, {
+    mod,
+    idx_state,
+    updater,
+    compactor,
+    waiters=[],
+    commit_delay,
+    committed=true
+}).
+
+
+start_link({Module, IdxState}) ->
+    proc_lib:start_link(?MODULE, init, [{Module, IdxState}]).
+
+
+stop(Pid) ->
+    gen_server:cast(Pid, stop).
+
+
+get_state(Pid, RequestSeq) ->
+    gen_server:call(Pid, {get_state, RequestSeq}, infinity).
+
+
+get_info(Pid) ->
+    gen_server:call(Pid, get_info).
+
+
+compact(Pid) ->
+    gen_server:call(Pid, compact).
+
+
+get_compactor_pid(Pid) ->
+    gen_server:call(Pid, get_compactor_pid).
+
+
+config_change("query_server_config", "commit_freq", NewValue) ->
+    ok = gen_server:cast(?MODULE, {config_update, NewValue}).
+
+
+init({Mod, IdxState}) ->
+    ok = couch_config:register(fun ?MODULE:config_change/3),
+    DbName = Mod:get(db_name, IdxState),
+    Resp = couch_util:with_db(DbName, fun(Db) ->
+        case Mod:open(Db, IdxState) of
+            {ok, IdxSt} ->
+                couch_db:monitor(Db),
+                {ok, IdxSt};
+            Error ->
+                Error
+        end
+    end),
+    case Resp of
+        {ok, NewIdxState} ->
+            {ok, UPid} = couch_index_updater:start_link(self(), Mod),
+            {ok, CPid} = couch_index_compactor:start_link(self(), Mod),
+            Delay = couch_config:get("query_server_config", "commit_freq", "5"),
+            MsDelay = 1000 * list_to_integer(Delay),
+            State = #st{
+                mod=Mod,
+                idx_state=NewIdxState,
+                updater=UPid,
+                compactor=CPid,
+                commit_delay=MsDelay
+            },
+            proc_lib:init_ack({ok, self()}),
+            gen_server:enter_loop(?MODULE, [], State);
+        Other ->
+            proc_lib:init_ack(Other)
+    end.
+
+
+terminate(Reason, State) ->
+    #st{mod=Mod, idx_state=IdxState}=State,
+    Mod:close(IdxState),
+    send_all(State#st.waiters, Reason),
+    couch_util:shutdown_sync(State#st.updater),
+    couch_util:shutdown_sync(State#st.compactor),
+    ok.
+
+
+handle_call({get_state, ReqSeq}, From, State) ->
+    #st{
+        mod=Mod,
+        idx_state=IdxState,
+        waiters=Waiters
+    } = State,
+    IdxSeq = Mod:get(update_seq, IdxState),
+    case ReqSeq =< IdxSeq of
+        true ->
+            {reply, {ok, IdxState}, State};
+        _ -> % View update required
+            couch_index_updater:run(State#st.updater, IdxState),
+            Waiters2 = [{From, ReqSeq} | Waiters],
+            {noreply, State#st{waiters=Waiters2}, infinity}
+    end;
+handle_call(get_info, _From, State) ->
+    #st{mod=Mod} = State,
+    {ok, Info0} = Mod:get(info, State#st.idx_state),
+    IsUpdating = couch_index_updater:is_running(State#st.updater),
+    IsCompacting = couch_index_compactor:is_running(State#st.compactor),
+    Info = Info0 ++ [
+        {updater_running, IsUpdating},
+        {compact_running, IsCompacting},
+        {waiting_commit, State#st.committed == false},
+        {waiting_clients, length(State#st.waiters)}
+    ],
+    {reply, {ok, Info}, State};
+handle_call(reset, _From, State) ->
+    #st{
+        mod=Mod,
+        idx_state=IdxState
+    } = State,
+    {ok, NewIdxState} = Mod:reset(IdxState),
+    {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}};
+handle_call(compact, _From, State) ->
+    couch_index_compactor:run(State#st.compactor, State#st.idx_state),
+    {reply, ok, State};
+handle_call(get_compactor_pid, _From, State) ->
+    {reply, {ok, State#st.compactor}, State};
+handle_call({compacted, NewIdxState}, _From, State) ->
+    #st{
+        mod=Mod,
+        idx_state=OldIdxState,
+        updater=Updater,
+        commit_delay=Delay
+    } = State,
+    NewSeq = Mod:get(update_seq, NewIdxState),
+    OldSeq = Mod:get(update_seq, OldIdxState),
+    % For indices that require swapping files, we have to make sure we're
+    % up to date with the current index. Otherwise indexes could roll back
+    % (perhaps considerably) to previous points in history.
+    case NewSeq >= OldSeq of
+        true ->
+            {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState),
+            ok = couch_index_updater:restart(Updater, NewIdxState1),
+            case State#st.committed of
+                true -> erlang:send_after(Delay, self(), commit);
+                false -> ok
+            end,
+            {reply, ok, State#st{
+                idx_state=NewIdxState1,
+                updater=Updater,
+                committed=false
+            }};
+        _ ->
+            {reply, recompact, State}
+    end.
+
+
+handle_cast({config_change, NewDelay}, State) ->
+    MsDelay = 1000 * list_to_integer(NewDelay),
+    {noreply, State#st{commit_delay=MsDelay}};
+handle_cast({updated, NewIdxState}, State) ->
+    {noreply, NewState} = handle_cast({new_state, NewIdxState}, State),
+    maybe_restart_updater(NewState),
+    {noreply, NewState};
+handle_cast({new_state, NewIdxState}, State) ->
+    #st{mod=Mod, commit_delay=Delay} = State,
+    CurrSeq = Mod:get(update_seq, NewIdxState),
+    Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState),
+    case State#st.committed of
+        true -> erlang:send_after(Delay, self(), commit);
+        false -> ok
+    end,
+    {noreply, State#st{
+        idx_state=NewIdxState,
+        waiters=Rest,
+        committed=false
+    }};
+handle_cast({update_error, Error}, State) ->
+    send_all(State#st.waiters, Error),
+    {noreply, State#st{waiters=[]}};
+handle_cast(stop, State) ->
+    {stop, normal, State};
+handle_cast(delete, State) ->
+    #st{mod=Mod, idx_state=IdxState} = State,
+    ok = Mod:delete(IdxState),
+    {stop, normal, State};
+handle_cast(_Mesg, State) ->
+    {stop, unhandled_cast, State}.
+
+
+handle_info(commit, #st{committed=true}=State) ->
+    {noreply, State};
+handle_info(commit, State) ->
+    #st{mod=Mod, idx_state=IdxState, commit_delay=Delay} = State,
+    DbName = Mod:get(db_name, IdxState),
+    GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end,
+    CommittedSeq = couch_util:with_db(DbName, GetCommSeq),
+    case CommittedSeq >= Mod:get(update_seq, IdxState) of
+        true ->
+            % Commit the updates
+            ok = Mod:commit(IdxState),
+            {noreply, State#st{committed=true}};
+        _ ->
+            % We can't commit the header because the database seq that's
+            % fully committed to disk is still behind us. If we committed
+            % now and the database lost those changes our view could be
+            % forever out of sync with the database. But a crash before we
+            % commit these changes, no big deal, we only lose incremental
+            % changes since last committal.
+            erlang:send_after(Delay, self(), commit),
+            {noreply, State}
+    end;
+handle_info({'DOWN', _, _, _Pid, _}, State) ->
+    catch send_all(State#st.waiters, shutdown),
+    {stop, normal, State#st{waiters=[]}}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+maybe_restart_updater(#st{waiters=[]}) ->
+    ok;
+maybe_restart_updater(#st{mod=Mod, idx_state=IdxState}=State) ->
+    couch_util:with_db(Mod:get(db_name, IdxState), fun(Db) ->
+        UpdateSeq = couch_db:get_update_seq(Db),
+        CommittedSeq = couch_db:get_committed_update_seq(Db),
+        CanUpdate = UpdateSeq > CommittedSeq,
+        UOpts = Mod:get(update_options, IdxState),
+        case CanUpdate and lists:member(committed_only, UOpts) of
+            true -> couch_db:ensure_full_commit(Db);
+            false -> ok
+        end
+    end),
+    couch_index_updater:run(State#st.updater, IdxState).
+
+
+send_all(Waiters, Reply) ->
+    [gen_server:reply(From, Reply) || {From, _} <- Waiters].
+
+
+send_replies(Waiters, UpdateSeq, IdxState) ->
+    Pred = fun({_, S}) -> S =< UpdateSeq end,
+    {ToSend, Remaining} = lists:partition(Pred, Waiters),
+    [gen_server:reply(From, {ok, IdxState}) || {From, _} <- ToSend],
+    Remaining.

Added: couchdb/trunk/src/couch_index/src/couch_index_api.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index_api.erl?rev=1171180&view=auto
==============================================================================
--- couchdb/trunk/src/couch_index/src/couch_index_api.erl (added)
+++ couchdb/trunk/src/couch_index/src/couch_index_api.erl Thu Sep 15 17:22:30 2011
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+
+get(Field, State) ->
+    ok.
+
+
+open(Db, State) ->
+    ok.
+
+close(State) ->
+    ok.
+
+delete(State) ->
+    ok.
+
+reset(State) ->
+    ok.
+
+
+start_update(State) ->
+    {ok, State}.
+
+purge(PurgedIdRevs, State) ->
+    ok.
+
+process_doc(Doc, State) ->
+    ok.
+
+finish_update(State) ->
+    {ok, State}.
+
+commit(State) ->
+    ok.
+
+
+compact(Parent, State, Opts) ->
+    ok.
+
+swap_compacted(OldState, NewState) ->
+    ok.

Added: couchdb/trunk/src/couch_index/src/couch_index_compactor.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index_compactor.erl?rev=1171180&view=auto
==============================================================================
--- couchdb/trunk/src/couch_index/src/couch_index_compactor.erl (added)
+++ couchdb/trunk/src/couch_index/src/couch_index_compactor.erl Thu Sep 15 17:22:30 2011
@@ -0,0 +1,110 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_compactor).
+-behaviour(gen_server).
+
+
+%% API
+-export([start_link/2, run/2, monitor/1, cancel/1, is_running/1]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+
+-record(st, {
+    idx,
+    mod,
+    pid
+}).
+
+
+start_link(Index, Module) ->
+    gen_server:start_link(?MODULE, {Index, Module}, []).
+
+
+run(Pid, IdxState) ->
+    gen_server:call(Pid, {compact, IdxState}).
+
+
+monitor(Pid) ->
+    case gen_server:call(Pid, get_pid) of
+        {ok, undefined} -> {error, compaction_not_running};
+        {ok, CPid} -> {ok, erlang:monitor(process, CPid)}
+    end.
+
+
+cancel(Pid) ->
+    gen_server:call(Pid, cancel).
+
+
+is_running(Pid) ->
+    gen_server:call(Pid, is_running).
+
+
+init({Index, Module}) ->
+    process_flag(trap_exit, true),
+    {ok, #st{idx=Index, mod=Module}}.
+
+
+terminate(_Reason, State) ->
+    couch_util:shutdown_sync(State#st.pid),
+    ok.
+
+
+handle_call({compact, _}, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, ok, State};
+handle_call({compact, IdxState}, _From, #st{idx=Idx}=State) ->
+    Pid = spawn_link(fun() -> compact(Idx, State#st.mod, IdxState) end),
+    {reply, ok, State#st{pid=Pid}};
+handle_call(get_pid, _From, State) ->
+    {reply, {ok, State#st.pid}, State};
+handle_call(cancel, _From, #st{pid=undefined}=State) ->
+    {reply, ok, State};
+handle_call(cancel, _From, #st{pid=Pid}=State) ->
+    unlink(Pid),
+    exit(Pid, kill),
+    {reply, ok, State#st{pid=undefined}};
+handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, true, State};
+handle_call(is_running, _From, State) ->
+    {reply, false, State}.
+
+
+handle_cast(_Mesg, State) ->
+    {stop, unknown_cast, State}.
+
+
+handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', _Pid, normal}, State) ->
+    {noreply, State};
+handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) ->
+    {stop, normal, State};
+handle_info(_Mesg, State) ->
+    {stop, unknown_info, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+compact(Parent, Mod, IdxState) ->
+    compact(Parent, Mod, IdxState, []).
+
+compact(Idx, Mod, IdxState, Opts) ->
+    {ok, NewIdxState} = Mod:compact(IdxState, Opts),
+    case gen_server:call(Idx, {compacted, NewIdxState}) of
+        recompact -> compact(Idx, Mod, NewIdxState, [recompact]);
+        _ -> ok
+    end.

Added: couchdb/trunk/src/couch_index/src/couch_index_server.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index_server.erl?rev=1171180&view=auto
==============================================================================
--- couchdb/trunk/src/couch_index/src/couch_index_server.erl (added)
+++ couchdb/trunk/src/couch_index/src/couch_index_server.erl Thu Sep 15 17:22:30 2011
@@ -0,0 +1,190 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_server).
+-behaviour(gen_server).
+
+-export([start_link/0, get_index/4, get_index/3, get_index/2]).
+-export([config_change/2, update_notify/1]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+
+-define(BY_SIG, couchdb_indexes_by_sig).
+-define(BY_PID, couchdb_indexes_by_pid).
+-define(BY_DB, couchdb_indexes_by_db).
+
+
+-record(st, {root_dir}).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+get_index(Module, DbName, DDoc) ->
+    get_index(Module, DbName, DDoc, nil).
+
+
+get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        get_index(Module, Db, DDoc, Fun)
+    end);
+get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) ->
+    case couch_db:open_doc(Db, DDoc, [ejson_body]) of
+        {ok, Doc} -> get_index(Module, Db, Doc, Fun);
+        Error -> Error
+    end;
+get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) ->
+    {ok, InitState} = Module:init(Db, DDoc),
+    {ok, FunResp} = Fun(InitState),
+    {ok, Pid} = get_index(Module, InitState),
+    {ok, Pid, FunResp};
+get_index(Module, Db, DDoc, _Fun) ->
+    {ok, InitState} = Module:init(Db, DDoc),
+    get_index(Module, InitState).
+
+
+get_index(Module, IdxState) ->
+    DbName = Module:get(db_name, IdxState),
+    Sig = Module:get(signature, IdxState),
+    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+        [{_, Pid}] when is_pid(Pid) ->
+            {ok, Pid};
+        _ ->
+            Args = {Module, IdxState, DbName, Sig},
+            gen_server:call(?MODULE, {get_index, Args}, infinity)
+    end.
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    couch_config:register(fun ?MODULE:config_change/2),
+    couch_db_update_notifier:start_link(fun ?MODULE:update_notify/1),
+    ets:new(?BY_SIG, [protected, set, named_table]),
+    ets:new(?BY_PID, [private, set, named_table]),
+    ets:new(?BY_DB, [private, bag, named_table]),
+    RootDir = couch_index_util:root_dir(),
+    % Deprecation warning if it wasn't index_dir
+    case couch_config:get("couchdb", "index_dir") of
+        undefined ->
+            Msg = "Deprecation warning: 'view_index_dir' is now 'index_dir'",
+            ?LOG_ERROR(Msg, []);
+        _ -> ok
+    end,
+    couch_file:init_delete_dir(RootDir),
+    {ok, #st{root_dir=RootDir}}.
+
+
+terminate(_Reason, _State) ->
+    Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
+    lists:map(fun couch_util:shutdown_sync/1, Pids),
+    ok.
+
+
+handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) ->
+    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+        [] ->
+            spawn_link(fun() -> new_index(Args) end),
+            ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
+            {noreply, State};
+        [{_, Waiters}] when is_list(Waiters) ->
+            ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
+            {noreply, State};
+        [{_, Pid}] when is_pid(Pid) ->
+            {reply, {ok, Pid}, State}
+    end;
+handle_call({async_open, {DbName, Sig}, {ok, Pid}}, _From, State) ->
+    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
+    link(Pid),
+    add_to_ets(DbName, Sig, Pid),
+    {reply, ok, State};
+handle_call({async_error, {DbName, Sig}, Error}, _From, State) ->
+    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [gen_server:reply(From, Error) || From <- Waiters],
+    ets:delete(?BY_SIG, {DbName, Sig}),
+    {reply, ok, State};
+handle_call({reset_indexes, DbName}, _From, State) ->
+    reset_indexes(DbName, State#st.root_dir),
+    {reply, ok, State}.
+
+
+handle_cast({reset_indexes, DbName}, State) ->
+    reset_indexes(DbName, State#st.root_dir),
+    {noreply, State}.
+
+
+handle_info({'EXIT', Pid, Reason}, Server) ->
+    case ets:lookup(?BY_PID, Pid) of
+        [{Pid, DbName, Sig}] ->
+            rem_from_ets(DbName, Sig, Pid);
+        [] when Reason /= normal ->
+            exit(Reason);
+        _Else ->
+            ok
+    end,
+    {noreply, Server}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+new_index({Mod, IdxState, DbName, Sig}) ->
+    case couch_index:start_link({Mod, IdxState}) of
+        {ok, Pid} ->
+            gen_server:call(?MODULE, {async_open, {DbName, Sig}, {ok, Pid}}),
+            unlink(Pid);
+        Error ->
+            gen_server:call(?MODULE, {async_error, {DbName, Sig}, Error})
+    end.
+
+
+reset_indexes(DbName, Root) ->
+    % shutdown all the updaters and clear the files, the db got changed
+    Fun = fun({_, Sig}) ->
+        [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+        couch_util:shutdown_sync(Pid),
+        rem_from_ets(DbName, Sig, Pid)
+    end,
+    lists:foreach(Fun, ets:lookup(?BY_DB, DbName)),
+    Path = Root ++ "/." ++ binary_to_list(DbName) ++ "_design",
+    couch_file:nuke_dir(Root, Path).
+
+
+add_to_ets(DbName, Sig, Pid) ->
+    ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
+    ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
+    ets:insert(?BY_DB, {DbName, Sig}).
+
+
+rem_from_ets(DbName, Sig, Pid) ->
+    ets:delete(?BY_SIG, {DbName, Sig}),
+    ets:delete(?BY_PID, Pid),
+    ets:delete_object(?BY_DB, {DbName, Sig}).
+
+
+config_change("couchdb", "view_index_dir") ->
+    exit(whereis(?MODULE), config_change);
+config_change("couchdb", "index_dir") ->
+    exit(whereis(?MODULE), config_change).
+
+
+update_notify({deleted, DbName}) ->
+    gen_server:cast(?MODULE, {reset_indexes, DbName});
+update_notify({created, DbName}) ->
+    gen_server:cast(?MODULE, {reset_indexes, DbName});
+update_notify(_) ->
+    ok.
+

Added: couchdb/trunk/src/couch_index/src/couch_index_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index_updater.erl?rev=1171180&view=auto
==============================================================================
--- couchdb/trunk/src/couch_index/src/couch_index_updater.erl (added)
+++ couchdb/trunk/src/couch_index/src/couch_index_updater.erl Thu Sep 15 17:22:30 2011
@@ -0,0 +1,206 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_updater).
+-behaviour(gen_server).
+
+
+%% API
+-export([start_link/2, run/2, is_running/1, update/3, restart/2]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+
+-record(st, {
+    idx,
+    mod,
+    pid=nil
+}).
+
+
+start_link(Index, Module) ->
+    gen_server:start_link(?MODULE, {Index, Module}, []).
+
+
+run(Pid, IdxState) ->
+    gen_server:call(Pid, {update, IdxState}).
+
+
+is_running(Pid) ->
+    gen_server:call(Pid, is_running).
+
+
+restart(Pid, IdxState) ->
+    gen_server:call(Pid, {restart, IdxState}).
+
+
+init({Index, Module}) ->
+    process_flag(trap_exit, true),
+    {ok, #st{idx=Index, mod=Module}}.
+
+
+terminate(_Reason, State) ->
+    couch_util:shutdown_sync(State#st.pid),
+    ok.
+
+
+handle_call({update, _IdxState}, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, ok, State};
+handle_call({update, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) ->
+    Pid = spawn_link(fun() -> update(Idx, Mod, IdxState) end),
+    {reply, ok, State#st{pid=Pid}};
+handle_call({restart, IdxState}, _From, #st{idx=Idx}=State) ->
+    case is_pid(State#st.pid) of
+        true -> couch_util:shutdown_sync(State#st.pid);
+        _ -> ok
+    end,
+    Pid = spawn_link(fun() -> update(Idx, State#st.mod, IdxState) end),
+    {reply, ok, State#st{pid=Pid}};
+handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, true, State};
+handle_call(is_running, _From, State) ->
+    {reply, false, State}.
+
+
+handle_cast(_Mesg, State) ->
+    {stop, unknown_cast, State}.
+
+
+handle_info({'EXIT', Pid, {updated, IdxState}}, #st{pid=Pid}=State) ->
+    ok = gen_server:cast(State#st.idx, {updated, IdxState}),
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', Pid, reset}, #st{idx=Idx, pid=Pid}=State) ->
+    {ok, NewIdxState} = gen_server:call(State#st.idx, reset),
+    Pid2 = spawn_link(fun() -> update(Idx, State#st.mod, NewIdxState) end),
+    {noreply, State#st{pid=Pid2}};
+handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', Pid, {{nocatch, Error}, _Trace}}, State) ->
+    handle_info({'EXIT', Pid, Error}, State);
+handle_info({'EXIT', Pid, Error}, #st{pid=Pid}=State) ->
+    ok = gen_server:cast(State#st.idx, {update_error, Error}),
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) ->
+    {stop, normal, State};
+handle_info({'EXIT', _Pid, normal}, State) ->
+    {noreply, State};
+handle_info(_Mesg, State) ->
+    {stop, unknown_info, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+update(Idx, Mod, IdxState) ->
+    DbName = Mod:get(db_name, IdxState),
+    CurrSeq = Mod:get(update_seq, IdxState),
+    UpdateOpts = Mod:get(update_options, IdxState),
+    CommittedOnly = lists:member(committed_only, UpdateOpts),
+    IncludeDesign = lists:member(include_design, UpdateOpts),
+    DocOpts = case lists:member(local_seq, UpdateOpts) of
+        true -> [conflicts, deleted_conflicts, local_seq];
+        _ -> [conflicts, deleted_conflicts]
+    end,
+
+    TaskType = <<"Indexer">>,
+    Starting = <<"Starting index update.">>,
+    couch_task_status:add_task(TaskType, Mod:get(idx_name, IdxState), Starting),
+
+    couch_util:with_db(DbName, fun(Db) ->
+        DbUpdateSeq = couch_db:get_update_seq(Db),
+        DbCommittedSeq = couch_db:get_committed_update_seq(Db),
+
+        PurgedIdxState = case purge_index(Db, Mod, IdxState) of
+            {ok, IdxState0} -> IdxState0;
+            reset -> exit(reset)
+        end,
+
+        couch_task_status:set_update_frequency(500),
+        NumChanges = couch_db:count_changes_since(Db, CurrSeq),
+
+        LoadDoc = fun(DocInfo) ->
+            #doc_info{
+                id=DocId,
+                high_seq=Seq,
+                revs=[#rev_info{deleted=Deleted} | _]
+            } = DocInfo,
+
+            case {IncludeDesign, DocId} of
+                {false, <<"_design/", _/binary>>} ->
+                    {nil, Seq};
+                _ when Deleted ->
+                    {#doc{id=DocId, deleted=true}, Seq};
+                _ ->
+                    {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts),
+                    {Doc, Seq}
+            end
+        end,
+
+        Proc = fun(DocInfo, _, {IdxStateAcc, Count, _}) ->
+            HighSeq = DocInfo#doc_info.high_seq,
+            case CommittedOnly and (HighSeq > DbCommittedSeq) of
+                true ->
+                    {stop, {IdxStateAcc, Count, false}};
+                false ->
+                    update_task_status(NumChanges, Count),
+                    {Doc, Seq} = LoadDoc(DocInfo),
+                    {ok, NewSt} = Mod:process_doc(Doc, Seq, IdxStateAcc),
+                    {ok, {NewSt, Count+1, true}}
+            end
+        end,
+
+        {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState),
+        Acc0 = {InitIdxState, 0, true},
+        {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []),
+        {ProcIdxSt, _, SendLast} = Acc,
+
+        % If we didn't bail due to hitting the last committed seq we need
+        % to send our last update_seq through.
+        {ok, LastIdxSt} = case SendLast of
+            true ->
+                Mod:process_doc(nil, DbUpdateSeq, ProcIdxSt);
+            _ ->
+                {ok, ProcIdxSt}
+        end,
+
+        couch_task_status:set_update_frequency(0),
+        couch_task_status:update("Waiting for index writer to finish."),
+
+        {ok, FinalIdxState} = Mod:finish_update(LastIdxSt),
+        exit({updated, FinalIdxState})
+    end).
+
+
+purge_index(Db, Mod, IdxState) ->
+    DbPurgeSeq = couch_db:get_purge_seq(Db),
+    IdxPurgeSeq = Mod:get(purge_seq, IdxState),
+    if
+        DbPurgeSeq == IdxPurgeSeq ->
+            {ok, IdxState};
+        DbPurgeSeq == IdxPurgeSeq + 1 ->
+            couch_task_status:update(<<"Purging index entries.">>),
+            {ok, PurgedIdRevs} = couch_db:get_last_purged(Db),
+            Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState);
+        true ->
+            couch_task_status:update(<<"Resetting index due to purge state.">>),
+            reset
+    end.
+
+
+update_task_status(Total, Count) ->
+    PercDone = (Count * 100) div Total,
+    Mesg = "Processed ~p of ~p changes (~p%)",
+    couch_task_status:update(Mesg, [Count, Total, PercDone]).

Added: couchdb/trunk/src/couch_index/src/couch_index_util.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index_util.erl?rev=1171180&view=auto
==============================================================================
--- couchdb/trunk/src/couch_index/src/couch_index_util.erl (added)
+++ couchdb/trunk/src/couch_index/src/couch_index_util.erl Thu Sep 15 17:22:30 2011
@@ -0,0 +1,79 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_util).
+
+-export([root_dir/0, index_dir/2, index_file/3]).
+-export([load_doc/4, sort_lib/1, hexsig/1]).
+
+-include("couch_db.hrl").
+
+
+root_dir() ->
+    case couch_config:get("couchdb", "index_dir") of
+        undefined -> couch_config:get("couchdb", "view_index_dir");
+        Value -> Value
+    end.
+
+
+index_dir(Module, DbName) when is_binary(DbName) ->
+    DbDir = "." ++ binary_to_list(DbName) ++ "_design",
+    filename:join([root_dir(), DbDir, Module]);
+index_dir(Module, #db{}=Db) ->
+    index_dir(Module, couch_db:name(Db)).
+
+
+index_file(Module, DbName, FileName) ->
+    filename:join(index_dir(Module, DbName), FileName).
+
+
+load_doc(Db, Id, {Props}, Opts) ->
+    DocId = couch_util:get_value(<<"_id">>, Props, Id),
+    Rev = case couch_util:get_value(<<"_rev">>, Props, undefined) of
+        Rev0 when is_binary(Rev0) -> couch_doc:parse_rev(Rev0);
+        _ -> nil
+    end,
+    load_doc_int(Db, DocId, Rev, Opts);
+load_doc(Db, Id, _Value, Opts) ->
+    load_doc_int(Db, Id, nil, Opts).
+
+
+load_doc_int(Db, DocId, Rev, Options) ->
+    JsonDoc = case Rev of
+        nil -> % open most recent rev
+            case couch_db:open_doc(Db, DocId, Options) of
+                {ok, Doc} -> couch_doc:to_json_obj(Doc, Options);
+                _Error -> null
+            end;
+        _ -> % open a specific rev (deletions come back as stubs)
+            case couch_db:open_doc_revs(Db, DocId, [Rev], Options) of
+                {ok, [{ok, Doc}]} -> couch_doc:to_json_obj(Doc, Options);
+                {ok, [{{not_found, missing}, Rev}]} -> null;
+                {ok, [_Else]} -> null
+            end
+    end,
+    {doc, JsonDoc}.
+
+
+sort_lib({Lib}) ->
+    sort_lib(Lib, []).
+sort_lib([], LAcc) ->
+    lists:keysort(1, LAcc);
+sort_lib([{LName, {LObj}}|Rest], LAcc) ->
+    LSorted = sort_lib(LObj, []), % descend into nested object
+    sort_lib(Rest, [{LName, LSorted}|LAcc]);
+sort_lib([{LName, LCode}|Rest], LAcc) ->
+    sort_lib(Rest, [{LName, LCode}|LAcc]).
+
+
+hexsig(Sig) ->
+    couch_util:to_hex(binary_to_list(Sig)).



Mime
View raw message