couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [17/22] couch-mrview commit: updated refs/heads/import-rcouch to 7258945
Date Thu, 06 Feb 2014 16:54:36 GMT
move src/* to the root

There is no need of an src/ folder except for an aesthetic reason, we
are obviously distributing a source. This layout is alos more common in
the Erlang world, and most editor and package manager understand it.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/commit/c296d042
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/tree/c296d042
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/diff/c296d042

Branch: refs/heads/import-rcouch
Commit: c296d04294614e3172c39afa1b4174580e24ff2d
Parents: 5813a97
Author: benoitc <benoitc@apache.org>
Authored: Sun Jan 12 10:17:31 2014 +0100
Committer: Paul J. Davis <paul.joseph.davis@gmail.com>
Committed: Thu Feb 6 10:51:40 2014 -0600

----------------------------------------------------------------------
 src/couch_index.app.src       |  19 +++
 src/couch_index.erl           | 338 +++++++++++++++++++++++++++++++++++++
 src/couch_index_api.erl       |  54 ++++++
 src/couch_index_compactor.erl | 113 +++++++++++++
 src/couch_index_server.erl    | 203 ++++++++++++++++++++++
 src/couch_index_sup.erl       |  29 ++++
 src/couch_index_updater.erl   | 200 ++++++++++++++++++++++
 src/couch_index_util.erl      |  77 +++++++++
 8 files changed, 1033 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/c296d042/src/couch_index.app.src
----------------------------------------------------------------------
diff --git a/src/couch_index.app.src b/src/couch_index.app.src
new file mode 100644
index 0000000..921e5d2
--- /dev/null
+++ b/src/couch_index.app.src
@@ -0,0 +1,19 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_index, [
+    {description, "CouchDB Secondary Index Manager"},
+    {vsn, "1.3.0"},
+    {modules, []},
+    {registered, [couch_index_server]},
+    {applications, [kernel, stdlib, couch]}
+]}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/c296d042/src/couch_index.erl
----------------------------------------------------------------------
diff --git a/src/couch_index.erl b/src/couch_index.erl
new file mode 100644
index 0000000..c09a110
--- /dev/null
+++ b/src/couch_index.erl
@@ -0,0 +1,338 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index).
+-behaviour(gen_server).
+
+
+%% API
+-export([start_link/1, stop/1, get_state/2, get_info/1]).
+-export([compact/1, compact/2, get_compactor_pid/1]).
+-export([config_change/3]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-record(st, {
+    mod,
+    idx_state,
+    updater,
+    compactor,
+    waiters=[],
+    commit_delay,
+    committed=true,
+    shutdown=false
+}).
+
+
+start_link({Module, IdxState}) ->
+    proc_lib:start_link(?MODULE, init, [{Module, IdxState}]).
+
+
+stop(Pid) ->
+    gen_server:cast(Pid, stop).
+
+
+get_state(Pid, RequestSeq) ->
+    gen_server:call(Pid, {get_state, RequestSeq}, infinity).
+
+
+get_info(Pid) ->
+    gen_server:call(Pid, get_info).
+
+
+compact(Pid) ->
+    compact(Pid, []).
+
+
+compact(Pid, Options) ->
+    {ok, CPid} = gen_server:call(Pid, compact),
+    case lists:member(monitor, Options) of
+        true -> {ok, erlang:monitor(process, CPid)};
+        false -> ok
+    end.
+
+
+get_compactor_pid(Pid) ->
+    gen_server:call(Pid, get_compactor_pid).
+
+config_change("query_server_config", "commit_freq", NewValue) ->
+    ok = gen_server:cast(?MODULE, {config_update, NewValue}).
+
+
+init({Mod, IdxState}) ->
+    ok = couch_config:register(fun ?MODULE:config_change/3),
+    DbName = Mod:get(db_name, IdxState),
+    Resp = couch_util:with_db(DbName, fun(Db) ->
+        case Mod:open(Db, IdxState) of
+            {ok, IdxSt} ->
+                couch_db:monitor(Db),
+                {ok, IdxSt};
+            Error ->
+                Error
+        end
+    end),
+    case Resp of
+        {ok, NewIdxState} ->
+            {ok, UPid} = couch_index_updater:start_link(self(), Mod),
+            {ok, CPid} = couch_index_compactor:start_link(self(), Mod),
+            Delay = couch_config:get("query_server_config", "commit_freq", "5"),
+            MsDelay = 1000 * list_to_integer(Delay),
+            State = #st{
+                mod=Mod,
+                idx_state=NewIdxState,
+                updater=UPid,
+                compactor=CPid,
+                commit_delay=MsDelay
+            },
+            Args = [
+                Mod:get(db_name, IdxState),
+                Mod:get(idx_name, IdxState),
+                couch_index_util:hexsig(Mod:get(signature, IdxState))
+            ],
+            ?LOG_INFO("Opening index for db: ~s idx: ~s sig: ~p", Args),
+            proc_lib:init_ack({ok, self()}),
+            gen_server:enter_loop(?MODULE, [], State);
+        Other ->
+            proc_lib:init_ack(Other)
+    end.
+
+
+terminate(Reason, State) ->
+    #st{mod=Mod, idx_state=IdxState}=State,
+    Mod:close(IdxState),
+    send_all(State#st.waiters, Reason),
+    couch_util:shutdown_sync(State#st.updater),
+    couch_util:shutdown_sync(State#st.compactor),
+    Args = [
+        Mod:get(db_name, IdxState),
+        Mod:get(idx_name, IdxState),
+        couch_index_util:hexsig(Mod:get(signature, IdxState)),
+        Reason
+    ],
+    ?LOG_INFO("Closing index for db: ~s idx: ~s sig: ~p~nreason: ~p", Args),
+    ok.
+
+
+handle_call({get_state, ReqSeq}, From, State) ->
+    #st{
+        mod=Mod,
+        idx_state=IdxState,
+        waiters=Waiters
+    } = State,
+    IdxSeq = Mod:get(update_seq, IdxState),
+    case ReqSeq =< IdxSeq of
+        true ->
+            {reply, {ok, IdxState}, State};
+        _ -> % View update required
+            couch_index_updater:run(State#st.updater, IdxState),
+            Waiters2 = [{From, ReqSeq} | Waiters],
+            {noreply, State#st{waiters=Waiters2}, infinity}
+    end;
+handle_call(get_info, _From, State) ->
+    #st{mod=Mod} = State,
+    {ok, Info0} = Mod:get(info, State#st.idx_state),
+    IsUpdating = couch_index_updater:is_running(State#st.updater),
+    IsCompacting = couch_index_compactor:is_running(State#st.compactor),
+    Info = Info0 ++ [
+        {updater_running, IsUpdating},
+        {compact_running, IsCompacting},
+        {waiting_commit, State#st.committed == false},
+        {waiting_clients, length(State#st.waiters)}
+    ],
+    {reply, {ok, Info}, State};
+handle_call(reset, _From, State) ->
+    #st{
+        mod=Mod,
+        idx_state=IdxState
+    } = State,
+    {ok, NewIdxState} = Mod:reset(IdxState),
+    {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}};
+handle_call(compact, _From, State) ->
+    Resp = couch_index_compactor:run(State#st.compactor, State#st.idx_state),
+    {reply, Resp, State};
+handle_call(get_compactor_pid, _From, State) ->
+    {reply, {ok, State#st.compactor}, State};
+handle_call({compacted, NewIdxState}, _From, State) ->
+    #st{
+        mod=Mod,
+        idx_state=OldIdxState,
+        updater=Updater,
+        commit_delay=Delay
+    } = State,
+    assert_signature_match(Mod, OldIdxState, NewIdxState),
+    NewSeq = Mod:get(update_seq, NewIdxState),
+    OldSeq = Mod:get(update_seq, OldIdxState),
+    % For indices that require swapping files, we have to make sure we're
+    % up to date with the current index. Otherwise indexes could roll back
+    % (perhaps considerably) to previous points in history.
+    case NewSeq >= OldSeq of
+        true ->
+            {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState),
+            % Restart the indexer if it's running.
+            case couch_index_updater:is_running(Updater) of
+                true -> ok = couch_index_updater:restart(Updater, NewIdxState1);
+                false -> ok
+            end,
+            case State#st.committed of
+                true -> erlang:send_after(Delay, self(), commit);
+                false -> ok
+            end,
+            {reply, ok, State#st{
+                idx_state=NewIdxState1,
+                committed=false
+            }};
+        _ ->
+            {reply, recompact, State}
+    end.
+
+
+handle_cast({config_change, NewDelay}, State) ->
+    MsDelay = 1000 * list_to_integer(NewDelay),
+    {noreply, State#st{commit_delay=MsDelay}};
+handle_cast({updated, NewIdxState}, State) ->
+    {noreply, NewState} = handle_cast({new_state, NewIdxState}, State),
+    case NewState#st.shutdown andalso (NewState#st.waiters =:= []) of
+        true ->
+            {stop, normal, NewState};
+        false ->
+            maybe_restart_updater(NewState),
+            {noreply, NewState}
+    end;
+handle_cast({new_state, NewIdxState}, State) ->
+    #st{
+        mod=Mod,
+        idx_state=OldIdxState,
+        commit_delay=Delay
+    } = State,
+    assert_signature_match(Mod, OldIdxState, NewIdxState),
+    CurrSeq = Mod:get(update_seq, NewIdxState),
+    Args = [
+        Mod:get(db_name, NewIdxState),
+        Mod:get(idx_name, NewIdxState),
+        CurrSeq
+    ],
+    ?LOG_DEBUG("Updated index for db: ~s idx: ~s seq: ~B", Args),
+    Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState),
+    case State#st.committed of
+        true -> erlang:send_after(Delay, self(), commit);
+        false -> ok
+    end,
+    {noreply, State#st{
+        idx_state=NewIdxState,
+        waiters=Rest,
+        committed=false
+    }};
+handle_cast({update_error, Error}, State) ->
+    send_all(State#st.waiters, Error),
+    {noreply, State#st{waiters=[]}};
+handle_cast(stop, State) ->
+    {stop, normal, State};
+handle_cast(delete, State) ->
+    #st{mod=Mod, idx_state=IdxState} = State,
+    ok = Mod:delete(IdxState),
+    {stop, normal, State};
+handle_cast(ddoc_updated, State) ->
+    #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
+    DbName = Mod:get(db_name, IdxState),
+    DDocId = Mod:get(idx_name, IdxState),
+    Shutdown = couch_util:with_db(DbName, fun(Db) ->
+        case couch_db:open_doc(Db, DDocId, [ejson_body]) of
+            {not_found, deleted} ->
+                true;
+            {ok, DDoc} ->
+                {ok, NewIdxState} = Mod:init(Db, DDoc),
+                Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState)
+        end
+    end),
+    case Shutdown of
+        true ->
+            case Waiters of
+                [] ->
+                    {stop, normal, State};
+                _ ->
+                    {noreply, State#st{shutdown = true}}
+            end;
+        false ->
+            {noreply, State#st{shutdown = false}}
+    end;
+handle_cast(_Mesg, State) ->
+    {stop, unhandled_cast, State}.
+
+
+handle_info(commit, #st{committed=true}=State) ->
+    {noreply, State};
+handle_info(commit, State) ->
+    #st{mod=Mod, idx_state=IdxState, commit_delay=Delay} = State,
+    DbName = Mod:get(db_name, IdxState),
+    GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end,
+    CommittedSeq = couch_util:with_db(DbName, GetCommSeq),
+    case CommittedSeq >= Mod:get(update_seq, IdxState) of
+        true ->
+            % Commit the updates
+            ok = Mod:commit(IdxState),
+            {noreply, State#st{committed=true}};
+        _ ->
+            % We can't commit the header because the database seq that's
+            % fully committed to disk is still behind us. If we committed
+            % now and the database lost those changes our view could be
+            % forever out of sync with the database. But a crash before we
+            % commit these changes, no big deal, we only lose incremental
+            % changes since last committal.
+            erlang:send_after(Delay, self(), commit),
+            {noreply, State}
+    end;
+handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) ->
+    Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Index shutdown by monitor notice for db: ~s idx: ~s", Args),
+    catch send_all(State#st.waiters, shutdown),
+    {stop, normal, State#st{waiters=[]}}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+maybe_restart_updater(#st{waiters=[]}) ->
+    ok;
+maybe_restart_updater(#st{mod=Mod, idx_state=IdxState}=State) ->
+    couch_util:with_db(Mod:get(db_name, IdxState), fun(Db) ->
+        UpdateSeq = couch_db:get_update_seq(Db),
+        CommittedSeq = couch_db:get_committed_update_seq(Db),
+        CanUpdate = UpdateSeq > CommittedSeq,
+        UOpts = Mod:get(update_options, IdxState),
+        case CanUpdate and lists:member(committed_only, UOpts) of
+            true -> couch_db:ensure_full_commit(Db);
+            false -> ok
+        end
+    end),
+    couch_index_updater:run(State#st.updater, IdxState).
+
+
+send_all(Waiters, Reply) ->
+    [gen_server:reply(From, Reply) || {From, _} <- Waiters].
+
+
+send_replies(Waiters, UpdateSeq, IdxState) ->
+    Pred = fun({_, S}) -> S =< UpdateSeq end,
+    {ToSend, Remaining} = lists:partition(Pred, Waiters),
+    [gen_server:reply(From, {ok, IdxState}) || {From, _} <- ToSend],
+    Remaining.
+
+assert_signature_match(Mod, OldIdxState, NewIdxState) ->
+    case {Mod:get(signature, OldIdxState), Mod:get(signature, NewIdxState)} of
+        {Sig, Sig} -> ok;
+        _ -> erlang:error(signature_mismatch)
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/c296d042/src/couch_index_api.erl
----------------------------------------------------------------------
diff --git a/src/couch_index_api.erl b/src/couch_index_api.erl
new file mode 100644
index 0000000..9d3a67c
--- /dev/null
+++ b/src/couch_index_api.erl
@@ -0,0 +1,54 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_api).
+
+get(Field, State) ->
+    ok.
+
+init(Db, Ddoc) ->
+    ok.
+
+open(Db, State) ->
+    ok.
+
+close(State) ->
+    ok.
+
+delete(State) ->
+    ok.
+
+reset(State) ->
+    ok.
+
+
+start_update(State, PurgedState, NumChanges) ->
+    {ok, State}.
+
+purge(Db, PurgeSeq, PurgedIdRevs, State) ->
+    ok.
+
+process_doc(Doc, Seq, State) ->
+    ok.
+
+finish_update(State) ->
+    {ok, State}.
+
+commit(State) ->
+    ok.
+
+
+compact(Parent, State, Opts) ->
+    ok.
+
+swap_compacted(OldState, NewState) ->
+    ok.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/c296d042/src/couch_index_compactor.erl
----------------------------------------------------------------------
diff --git a/src/couch_index_compactor.erl b/src/couch_index_compactor.erl
new file mode 100644
index 0000000..6e9fb2e
--- /dev/null
+++ b/src/couch_index_compactor.erl
@@ -0,0 +1,113 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_compactor).
+-behaviour(gen_server).
+
+
+%% API
+-export([start_link/2, run/2, cancel/1, is_running/1]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+-record(st, {
+    idx,
+    mod,
+    pid
+}).
+
+
+start_link(Index, Module) ->
+    gen_server:start_link(?MODULE, {Index, Module}, []).
+
+
+run(Pid, IdxState) ->
+    gen_server:call(Pid, {compact, IdxState}).
+
+
+cancel(Pid) ->
+    gen_server:call(Pid, cancel).
+
+
+is_running(Pid) ->
+    gen_server:call(Pid, is_running).
+
+
+init({Index, Module}) ->
+    process_flag(trap_exit, true),
+    {ok, #st{idx=Index, mod=Module}}.
+
+
+terminate(_Reason, State) ->
+    couch_util:shutdown_sync(State#st.pid),
+    ok.
+
+
+handle_call({compact, _}, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, {ok, Pid}, State};
+handle_call({compact, IdxState}, _From, #st{idx=Idx}=State) ->
+    Pid = spawn_link(fun() -> compact(Idx, State#st.mod, IdxState) end),
+    {reply, {ok, Pid}, State#st{pid=Pid}};
+handle_call(cancel, _From, #st{pid=undefined}=State) ->
+    {reply, ok, State};
+handle_call(cancel, _From, #st{pid=Pid}=State) ->
+    unlink(Pid),
+    exit(Pid, kill),
+    {reply, ok, State#st{pid=undefined}};
+handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, true, State};
+handle_call(is_running, _From, State) ->
+    {reply, false, State}.
+
+
+handle_cast(_Mesg, State) ->
+    {stop, unknown_cast, State}.
+
+
+handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', _Pid, normal}, State) ->
+    {noreply, State};
+handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) ->
+    {stop, normal, State};
+handle_info(_Mesg, State) ->
+    {stop, unknown_info, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+compact(Parent, Mod, IdxState) ->
+    compact(Parent, Mod, IdxState, []).
+
+compact(Idx, Mod, IdxState, Opts) ->
+    DbName = Mod:get(db_name, IdxState),
+    Args = [DbName, Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Compaction started for db: ~s idx: ~s", Args),
+    {ok, NewIdxState} = couch_util:with_db(DbName, fun(Db) ->
+        Mod:compact(Db, IdxState, Opts)
+    end),
+    ok = Mod:commit(NewIdxState),
+    case gen_server:call(Idx, {compacted, NewIdxState}) of
+        recompact ->
+            ?LOG_INFO("Compaction restarting for db: ~s idx: ~s", Args),
+            compact(Idx, Mod, NewIdxState, [recompact]);
+        _ ->
+            ?LOG_INFO("Compaction finished for db: ~s idx: ~s", Args),
+            ok
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/c296d042/src/couch_index_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_index_server.erl b/src/couch_index_server.erl
new file mode 100644
index 0000000..86791db
--- /dev/null
+++ b/src/couch_index_server.erl
@@ -0,0 +1,203 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_server).
+-behaviour(gen_server).
+
+-export([start_link/0, get_index/4, get_index/3, get_index/2]).
+-export([config_change/2, update_notify/1]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(BY_SIG, couchdb_indexes_by_sig).
+-define(BY_PID, couchdb_indexes_by_pid).
+-define(BY_DB, couchdb_indexes_by_db).
+
+
+-record(st, {root_dir,
+             notifier_pid}).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+get_index(Module, DbName, DDoc) ->
+    get_index(Module, DbName, DDoc, nil).
+
+
+get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        get_index(Module, Db, DDoc, Fun)
+    end);
+get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) ->
+    case couch_db:open_doc(Db, DDoc, [ejson_body]) of
+        {ok, Doc} -> get_index(Module, Db, Doc, Fun);
+        Error -> Error
+    end;
+get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) ->
+    {ok, InitState} = Module:init(Db, DDoc),
+    {ok, FunResp} = Fun(InitState),
+    {ok, Pid} = get_index(Module, InitState),
+    {ok, Pid, FunResp};
+get_index(Module, Db, DDoc, _Fun) ->
+    {ok, InitState} = Module:init(Db, DDoc),
+    get_index(Module, InitState).
+
+
+get_index(Module, IdxState) ->
+    DbName = Module:get(db_name, IdxState),
+    Sig = Module:get(signature, IdxState),
+    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+        [{_, Pid}] when is_pid(Pid) ->
+            {ok, Pid};
+        _ ->
+            Args = {Module, IdxState, DbName, Sig},
+            gen_server:call(?MODULE, {get_index, Args}, infinity)
+    end.
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    couch_config:register(fun ?MODULE:config_change/2),
+    ets:new(?BY_SIG, [protected, set, named_table]),
+    ets:new(?BY_PID, [private, set, named_table]),
+    ets:new(?BY_DB, [protected, bag, named_table]),
+
+    {ok, NotifierPid} = couch_db_update_notifier:start_link(
+            fun ?MODULE:update_notify/1),
+    RootDir = couch_index_util:root_dir(),
+    couch_file:init_delete_dir(RootDir),
+    {ok, #st{root_dir=RootDir,
+             notifier_pid=NotifierPid}}.
+
+
+terminate(_Reason, _State) ->
+    Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
+    lists:map(fun couch_util:shutdown_sync/1, Pids),
+    ok.
+
+
+handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) ->
+    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+        [] ->
+            spawn_link(fun() -> new_index(Args) end),
+            ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
+            {noreply, State};
+        [{_, Waiters}] when is_list(Waiters) ->
+            ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
+            {noreply, State};
+        [{_, Pid}] when is_pid(Pid) ->
+            {reply, {ok, Pid}, State}
+    end;
+handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
+    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
+    link(Pid),
+    add_to_ets(DbName, Sig, DDocId, Pid),
+    {reply, ok, State};
+handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
+    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [gen_server:reply(From, Error) || From <- Waiters],
+    ets:delete(?BY_SIG, {DbName, Sig}),
+    {reply, ok, State};
+handle_call({reset_indexes, DbName}, _From, State) ->
+    reset_indexes(DbName, State#st.root_dir),
+    {reply, ok, State}.
+
+
+handle_cast({reset_indexes, DbName}, State) ->
+    reset_indexes(DbName, State#st.root_dir),
+    {noreply, State}.
+
+handle_info({'EXIT', Pid, Reason}, Server) ->
+    case ets:lookup(?BY_PID, Pid) of
+        [{Pid, {DbName, Sig}}] ->
+            [{DbName, {DDocId, Sig}}] =
+                ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
+            rem_from_ets(DbName, Sig, DDocId, Pid);
+        [] when Reason /= normal ->
+            exit(Reason);
+        _Else ->
+            ok
+    end,
+    {noreply, Server}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+new_index({Mod, IdxState, DbName, Sig}) ->
+    DDocId = Mod:get(idx_name, IdxState),
+    case couch_index:start_link({Mod, IdxState}) of
+        {ok, Pid} ->
+            ok = gen_server:call(
+                ?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}}),
+            unlink(Pid);
+        Error ->
+            ok = gen_server:call(
+                ?MODULE, {async_error, {DbName, DDocId, Sig}, Error})
+    end.
+
+
+reset_indexes(DbName, Root) ->
+    % shutdown all the updaters and clear the files, the db got changed
+    Fun = fun({_, {DDocId, Sig}}) ->
+        [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+        MRef = erlang:monitor(process, Pid),
+        gen_server:cast(Pid, delete),
+        receive {'DOWN', MRef, _, _, _} -> ok end,
+        rem_from_ets(DbName, Sig, DDocId, Pid)
+    end,
+    lists:foreach(Fun, ets:lookup(?BY_DB, DbName)),
+    Path = couch_index_util:index_dir("", DbName),
+    couch_file:nuke_dir(Root, Path).
+
+
+add_to_ets(DbName, Sig, DDocId, Pid) ->
+    ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
+    ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
+    ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
+
+
+rem_from_ets(DbName, Sig, DDocId, Pid) ->
+    ets:delete(?BY_SIG, {DbName, Sig}),
+    ets:delete(?BY_PID, Pid),
+    ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}).
+
+
+config_change("couchdb", "view_index_dir") ->
+    exit(whereis(?MODULE), config_change);
+config_change("couchdb", "index_dir") ->
+    exit(whereis(?MODULE), config_change).
+
+
+update_notify({deleted, DbName}) ->
+    gen_server:cast(?MODULE, {reset_indexes, DbName});
+update_notify({created, DbName}) ->
+    gen_server:cast(?MODULE, {reset_indexes, DbName});
+update_notify({ddoc_updated, {DbName, DDocId}}) ->
+    lists:foreach(
+        fun({_DbName, {_DDocId, Sig}}) ->
+            case ets:lookup(?BY_SIG, {DbName, Sig}) of
+                [{_, IndexPid}] ->
+                    (catch gen_server:cast(IndexPid, ddoc_updated));
+                [] ->
+                    ok
+            end
+        end,
+        ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}}));
+update_notify(_) ->
+    ok.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/c296d042/src/couch_index_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_index_sup.erl b/src/couch_index_sup.erl
new file mode 100644
index 0000000..fd97814
--- /dev/null
+++ b/src/couch_index_sup.erl
@@ -0,0 +1,29 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_sup).
+-behaviour(supervisor).
+
+-export([start_link/0]).
+-export([init/1]).
+
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I), {I, {I, start_link, []}, permanent, 5000, worker, [I]}).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+    Server = ?CHILD(couch_index_server),
+    {ok, {{one_for_one, 10, 3600}, [Server]}}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/c296d042/src/couch_index_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_index_updater.erl b/src/couch_index_updater.erl
new file mode 100644
index 0000000..9f54a56
--- /dev/null
+++ b/src/couch_index_updater.erl
@@ -0,0 +1,200 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_updater).
+-behaviour(gen_server).
+
+
+%% API
+-export([start_link/2, run/2, is_running/1, update/2, restart/2]).
+
+%% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-record(st, {
+    idx,
+    mod,
+    pid=nil
+}).
+
+
+start_link(Index, Module) ->
+    gen_server:start_link(?MODULE, {Index, Module}, []).
+
+
+run(Pid, IdxState) ->
+    gen_server:call(Pid, {update, IdxState}).
+
+
+is_running(Pid) ->
+    gen_server:call(Pid, is_running).
+
+
+update(Mod, State) ->
+    update(nil, Mod, State).
+
+
+restart(Pid, IdxState) ->
+    gen_server:call(Pid, {restart, IdxState}).
+
+
+init({Index, Module}) ->
+    process_flag(trap_exit, true),
+    {ok, #st{idx=Index, mod=Module}}.
+
+
+terminate(_Reason, State) ->
+    couch_util:shutdown_sync(State#st.pid),
+    ok.
+
+
+handle_call({update, _IdxState}, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, ok, State};
+handle_call({update, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) ->
+    Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Starting index update for db: ~s idx: ~s", Args),
+    Pid = spawn_link(fun() -> update(Idx, Mod, IdxState) end),
+    {reply, ok, State#st{pid=Pid}};
+handle_call({restart, IdxState}, _From, #st{idx=Idx, mod=Mod}=State) ->
+    Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Restarting index update for db: ~s idx: ~s", Args),
+    case is_pid(State#st.pid) of
+        true -> couch_util:shutdown_sync(State#st.pid);
+        _ -> ok
+    end,
+    Pid = spawn_link(fun() -> update(Idx, State#st.mod, IdxState) end),
+    {reply, ok, State#st{pid=Pid}};
+handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
+    {reply, true, State};
+handle_call(is_running, _From, State) ->
+    {reply, false, State}.
+
+
+handle_cast(_Mesg, State) ->
+    {stop, unknown_cast, State}.
+
+
+handle_info({'EXIT', _, {updated, Pid, IdxState}}, #st{pid=Pid}=State) ->
+    Mod = State#st.mod,
+    Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Index update finished for db: ~s idx: ~s", Args),
+    ok = gen_server:cast(State#st.idx, {updated, IdxState}),
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', _, {reset, Pid}}, #st{idx=Idx, pid=Pid}=State) ->
+    {ok, NewIdxState} = gen_server:call(State#st.idx, reset),
+    Pid2 = spawn_link(fun() -> update(Idx, State#st.mod, NewIdxState) end),
+    {noreply, State#st{pid=Pid2}};
+handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', Pid, {{nocatch, Error}, _Trace}}, State) ->
+    handle_info({'EXIT', Pid, Error}, State);
+handle_info({'EXIT', Pid, Error}, #st{pid=Pid}=State) ->
+    ok = gen_server:cast(State#st.idx, {update_error, Error}),
+    {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', Pid, _Reason}, #st{idx=Pid}=State) ->
+    {stop, normal, State};
+handle_info({'EXIT', _Pid, normal}, State) ->
+    {noreply, State};
+handle_info(_Mesg, State) ->
+    {stop, unknown_info, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+update(Idx, Mod, IdxState) ->
+    DbName = Mod:get(db_name, IdxState),
+    CurrSeq = Mod:get(update_seq, IdxState),
+    UpdateOpts = Mod:get(update_options, IdxState),
+    CommittedOnly = lists:member(committed_only, UpdateOpts),
+    IncludeDesign = lists:member(include_design, UpdateOpts),
+    DocOpts = case lists:member(local_seq, UpdateOpts) of
+        true -> [conflicts, deleted_conflicts, local_seq];
+        _ -> [conflicts, deleted_conflicts]
+    end,
+
+    couch_util:with_db(DbName, fun(Db) ->
+        DbUpdateSeq = couch_db:get_update_seq(Db),
+        DbCommittedSeq = couch_db:get_committed_update_seq(Db),
+
+        PurgedIdxState = case purge_index(Db, Mod, IdxState) of
+            {ok, IdxState0} -> IdxState0;
+            reset -> exit({reset, self()})
+        end,
+
+        NumChanges = couch_db:count_changes_since(Db, CurrSeq),
+
+        LoadDoc = fun(DocInfo) ->
+            #doc_info{
+                id=DocId,
+                high_seq=Seq,
+                revs=[#rev_info{deleted=Deleted} | _]
+            } = DocInfo,
+
+            case {IncludeDesign, DocId} of
+                {false, <<"_design/", _/binary>>} ->
+                    {nil, Seq};
+                _ when Deleted ->
+                    {#doc{id=DocId, deleted=true}, Seq};
+                _ ->
+                    {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts),
+                    {Doc, Seq}
+            end
+        end,
+
+        Proc = fun(DocInfo, _, {IdxStateAcc, _}) ->
+            HighSeq = DocInfo#doc_info.high_seq,
+            case CommittedOnly and (HighSeq > DbCommittedSeq) of
+                true ->
+                    {stop, {IdxStateAcc, false}};
+                false ->
+                    {Doc, Seq} = LoadDoc(DocInfo),
+                    {ok, NewSt} = Mod:process_doc(Doc, Seq, IdxStateAcc),
+                    {ok, {NewSt, true}}
+            end
+        end,
+
+        {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges),
+        Acc0 = {InitIdxState, true},
+        {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []),
+        {ProcIdxSt, SendLast} = Acc,
+
+        % If we didn't bail due to hitting the last committed seq we need
+        % to send our last update_seq through.
+        {ok, LastIdxSt} = case SendLast of
+            true ->
+                Mod:process_doc(nil, DbUpdateSeq, ProcIdxSt);
+            _ ->
+                {ok, ProcIdxSt}
+        end,
+
+        {ok, FinalIdxState} = Mod:finish_update(LastIdxSt),
+        exit({updated, self(), FinalIdxState})
+    end).
+
+
+purge_index(Db, Mod, IdxState) ->
+    DbPurgeSeq = couch_db:get_purge_seq(Db),
+    IdxPurgeSeq = Mod:get(purge_seq, IdxState),
+    if
+        DbPurgeSeq == IdxPurgeSeq ->
+            {ok, IdxState};
+        DbPurgeSeq == IdxPurgeSeq + 1 ->
+            {ok, PurgedIdRevs} = couch_db:get_last_purged(Db),
+            Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState);
+        true ->
+            reset
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/c296d042/src/couch_index_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_index_util.erl b/src/couch_index_util.erl
new file mode 100644
index 0000000..c833920
--- /dev/null
+++ b/src/couch_index_util.erl
@@ -0,0 +1,77 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_util).
+
+-export([root_dir/0, index_dir/2, index_file/3]).
+-export([load_doc/3, sort_lib/1, hexsig/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+root_dir() ->
+  couch_config:get("couchdb", "view_index_dir").
+
+
+index_dir(Module, DbName) when is_binary(DbName) ->
+    DbDir = "." ++ binary_to_list(DbName) ++ "_design",
+    filename:join([root_dir(), DbDir, Module]);
+index_dir(Module, #db{}=Db) ->
+    index_dir(Module, couch_db:name(Db)).
+
+
+index_file(Module, DbName, FileName) ->
+    filename:join(index_dir(Module, DbName), FileName).
+
+
+load_doc(Db, #doc_info{}=DI, Opts) ->
+    Deleted = lists:member(deleted, Opts),
+    case (catch couch_db:open_doc(Db, DI, Opts)) of
+        {ok, #doc{deleted=false}=Doc} -> Doc;
+        {ok, #doc{deleted=true}=Doc} when Deleted -> Doc;
+        _Else -> null
+    end;
+load_doc(Db, {DocId, Rev}, Opts) ->
+    case (catch load_doc(Db, DocId, Rev, Opts)) of
+        #doc{deleted=false} = Doc -> Doc;
+        _ -> null
+    end.
+
+
+load_doc(Db, DocId, Rev, Options) ->
+    case Rev of
+        nil -> % open most recent rev
+            case (catch couch_db:open_doc(Db, DocId, Options)) of
+                {ok, Doc} -> Doc;
+                _Error -> null
+            end;
+        _ -> % open a specific rev (deletions come back as stubs)
+            case (catch couch_db:open_doc_revs(Db, DocId, [Rev], Options)) of
+                {ok, [{ok, Doc}]} -> Doc;
+                {ok, [{{not_found, missing}, Rev}]} -> null;
+                {ok, [_Else]} -> null
+            end
+    end.
+
+
+sort_lib({Lib}) ->
+    sort_lib(Lib, []).
+sort_lib([], LAcc) ->
+    lists:keysort(1, LAcc);
+sort_lib([{LName, {LObj}}|Rest], LAcc) ->
+    LSorted = sort_lib(LObj, []), % descend into nested object
+    sort_lib(Rest, [{LName, LSorted}|LAcc]);
+sort_lib([{LName, LCode}|Rest], LAcc) ->
+    sort_lib(Rest, [{LName, LCode}|LAcc]).
+
+
+hexsig(Sig) ->
+    couch_util:to_hex(binary_to_list(Sig)).


Mime
View raw message