couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [12/12] couch-index commit: updated refs/heads/1994-merge-rcouch to 7c1666b
Date Thu, 13 Feb 2014 00:01:25 GMT
couch_index: add background indexing facility

This change add the possibility to trigger a view indexation in
background. The indexation can only work in background if at least one
process acquired it using the `couch_index_server:acquire_index/3`
function. If all the process that acquired it are down or released it
using `couch_index_server:release_indexer/3` then the background task is
stopped.

By default the background indexation will happen every 1s or when 200
docs has been saved in the database. These parameters can be changed
using the options `threshold` and `refresh_interval` in the couch_index
section.

To use it with couch_mrview a new option {refresh, true} has been added
to couch_mrview_changes:handle_changes Also the query parameter
refresh=true is passsed in t the HTTP changes API.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/commit/814ed50f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/tree/814ed50f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/diff/814ed50f

Branch: refs/heads/1994-merge-rcouch
Commit: 814ed50fc48bc4811234a88d1cf9d01b0dc35c27
Parents: 7d713ef
Author: benoitc <benoitc@apache.org>
Authored: Sat Feb 8 19:55:40 2014 +0100
Committer: Paul J. Davis <paul.joseph.davis@gmail.com>
Committed: Wed Feb 12 17:58:23 2014 -0600

----------------------------------------------------------------------
 src/couch_index.erl         |  35 +++++++-
 src/couch_index_indexer.erl | 189 +++++++++++++++++++++++++++++++++++++++
 src/couch_index_server.erl  |  18 ++++
 3 files changed, 241 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/blob/814ed50f/src/couch_index.erl
----------------------------------------------------------------------
diff --git a/src/couch_index.erl b/src/couch_index.erl
index b9ae567..01483bb 100644
--- a/src/couch_index.erl
+++ b/src/couch_index.erl
@@ -17,6 +17,7 @@
 %% API
 -export([start_link/1, stop/1, get_state/2, get_info/1]).
 -export([compact/1, compact/2, get_compactor_pid/1]).
+-export([acquire_indexer/1, release_indexer/1]).
 -export([config_change/3]).
 
 %% gen_server callbacks
@@ -30,6 +31,7 @@
     idx_state,
     updater,
     compactor,
+    indexer=nil,
     waiters=[],
     commit_delay,
     committed=true,
@@ -68,6 +70,16 @@ compact(Pid, Options) ->
 get_compactor_pid(Pid) ->
     gen_server:call(Pid, get_compactor_pid).
 
+
+acquire_indexer(Pid) ->
+    {ok, IPid} = gen_server:call(Pid, get_indexer_pid),
+    gen_server:call(IPid, {acquire, self()}).
+
+release_indexer(Pid) ->
+    {ok, IPid} = gen_server:call(Pid, get_indexer_pid),
+    gen_server:call(IPid, {release, self()}).
+
+
 config_change("query_server_config", "commit_freq", NewValue) ->
     ok = gen_server:cast(?MODULE, {config_update, NewValue}).
 
@@ -88,6 +100,7 @@ init({Mod, IdxState}) ->
         {ok, NewIdxState} ->
             {ok, UPid} = couch_index_updater:start_link(self(), Mod),
             {ok, CPid} = couch_index_compactor:start_link(self(), Mod),
+
             Delay = couch_config:get("query_server_config", "commit_freq", "5"),
             MsDelay = 1000 * list_to_integer(Delay),
             State = #st{
@@ -196,7 +209,18 @@ handle_call({compacted, NewIdxState}, _From, State) ->
             }};
         _ ->
             {reply, recompact, State}
-    end.
+    end;
+handle_call(get_indexer_pid, _From, #st{mod=Mod, idx_state=IdxState}=State) ->
+    Pid = case State#st.indexer of
+        Pid1 when is_pid(Pid1) ->
+            Pid1;
+        _ ->
+            DbName = Mod:get(db_name, IdxState),
+            {ok, IPid} = couch_index_indexer:start_link(self(), DbName),
+            erlang:monitor(process, IPid),
+            IPid
+    end,
+    {reply, {ok, Pid}, State#st{indexer=Pid}}.
 
 
 handle_cast({config_change, NewDelay}, State) ->
@@ -318,6 +342,15 @@ handle_info(commit, State) ->
             erlang:send_after(Delay, self(), commit),
             {noreply, State}
     end;
+
+handle_info({'DOWN', _, _, Pid, _}, #st{mod=Mod, idx_state=IdxState,
+                                        indexer=Pid}=State) ->
+    Args = [Mod:get(db_name, IdxState),
+            Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Background indexer shutdown by monitor notice for db: ~s idx: ~s", Args),
+
+    {noreply, State#st{indexer=nil}};
+
 handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) ->
     DbName = Mod:get(db_name, IdxState),
     DDocId = Mod:get(idx_name, IdxState),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/blob/814ed50f/src/couch_index_indexer.erl
----------------------------------------------------------------------
diff --git a/src/couch_index_indexer.erl b/src/couch_index_indexer.erl
new file mode 100644
index 0000000..4af85cf
--- /dev/null
+++ b/src/couch_index_indexer.erl
@@ -0,0 +1,189 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_indexer).
+
+-export([start_link/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {index,
+                dbname,
+                db_updates=0,
+                tref=nil,
+                notifier=nil,
+                locks}).
+
+
+start_link(Index, DbName) ->
+    gen_server:start_link(?MODULE, {Index, DbName}, []).
+
+init({Index, DbName}) ->
+    process_flag(trap_exit, true),
+    %% delay background index indexing
+    self() ! start_indexing,
+    {ok, #state{index=Index,
+                dbname=DbName,
+                locks=dict:new()}}.
+
+handle_call({acquire, Pid}, _From, #state{locks=Locks}=State) ->
+    NLocks = case dict:find(Pid, Locks) of
+        error ->
+            dict:store(Pid, {erlang:monitor(process, Pid), 1}, Locks);
+        {ok, {MRef, Refc}} ->
+             dict:store(Pid, {MRef, Refc+1}, Locks)
+    end,
+    {reply, ok, State#state{locks=NLocks}};
+
+handle_call({release, Pid}, _From, #state{locks=Locks}=State) ->
+     NLocks = case dict:find(Pid, Locks) of
+        {ok, {MRef, 1}} ->
+            erlang:demonitor(MRef, [flush]),
+            dict:erase(Pid, Locks);
+        {ok, {MRef, Refc}} ->
+            dict:store(Pid, {MRef, Refc-1}, Locks);
+        error ->
+            Locks
+    end,
+
+    NState = State#state{locks=NLocks},
+
+    case should_close() of
+        true -> {stop, normal, ok, NState};
+        false -> {reply, ok, NState}
+    end;
+
+handle_call(stop, _From, State) ->
+    {stop, normal, ok, State}.
+
+handle_cast(updated, #state{index=Index, dbname=DbName,
+                            db_updates=Updates}=State) ->
+    Threshold = get_db_threshold(),
+    NUpdates = Updates + 1,
+
+    %% we only update if the number of updates is greater than the
+    %% threshold.
+    case NUpdates =:= Threshold of
+        true ->
+            refresh_index(DbName, Index),
+            {noreply, State#state{db_updates=0}};
+        false ->
+             {noreply, State#state{db_updates=NUpdates}}
+
+    end;
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(start_indexing, #state{dbname=DbName}=State) ->
+    %% start the db notifier to watch db update events
+    {ok, NotifierPid} = start_db_notifier(DbName),
+
+    %% start the timer
+    R = get_refresh_interval(),
+    TRef = erlang:start_timer(R, self(), refresh_index),
+
+    {noreply, State#state{tref=TRef, notifier=NotifierPid}};
+
+handle_info({timeout, TRef, refresh_index}, #state{index=Index,
+                                                   dbname=DbName,
+                                                   tref=TRef,
+                                                   db_updates=N}=State) ->
+    %% only refresh the index if an update happened
+    case N > 0 of
+        true ->
+            refresh_index(DbName, Index);
+        false ->
+            ok
+    end,
+    {noreply, #state{db_updates=0}=State};
+
+handle_info({'DOWN', MRef, _, Pid, _}, #state{locks=Locks}=State) ->
+    NLocks = case dict:find(Pid, Locks) of
+        {ok, {MRef, _}} ->
+            dict:erase(Pid, Locks);
+        error ->
+            Locks
+    end,
+
+    NState = State#state{locks=NLocks},
+
+    case should_close() of
+        true -> {stop, normal, NState};
+        false -> {noreply, NState}
+    end;
+
+handle_info({'EXIT', Pid, _Reason}, #state{notifier=Pid}=State) ->
+    %% db notifier exited
+    {stop, normal, State#state{notifier=nil}}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+terminate(_Reason, #state{tref=TRef, notifier=Pid}) ->
+    if TRef /= nil ->
+            erlang:cancel_timer(TRef);
+        true -> ok
+    end,
+
+    case is_pid(Pid) of
+        true -> couch_util:shutdown_sync(Pid);
+        _ -> ok
+    end,
+    ok.
+
+%% refresh the index to trigger updates.
+refresh_index(Db, Index) ->
+    UpdateSeq = couch_util:with_db(Db, fun(WDb) ->
+                    couch_db:get_update_seq(WDb)
+            end),
+
+    case catch couch_index:get_state(Index, UpdateSeq) of
+        {ok, _} -> ok;
+        Error -> {error, Error}
+    end.
+
+%% if none has acquired us, we could stop the server.
+should_close() ->
+    case process_info(self(), monitors) of
+        {monitors, []} ->   true;
+        _ ->                false
+    end.
+
+
+%% number of max updates before refreshing the index. We don't
+%% update the index on each db update. Instead we are waiting for a
+%% minimum. If the minimum is not acchieved, the update will happen
+%% in the next interval.
+get_db_threshold() ->
+    list_to_integer(
+            couch_config:get("couch_index", "threshold", "200")
+    ).
+
+%% refresh interval in ms, the interval in which the index will be
+%% updated
+get_refresh_interval() ->
+    list_to_integer(
+            couch_config:get("couch_index", "refresh_interval", "1000")
+    ).
+
+%% db notifier
+start_db_notifier(DbName) ->
+    Self = self(),
+
+    couch_db_update_notifier:start_link(fun
+            ({updated, Name}) when Name =:= DbName ->
+                gen_server:cast(Self, updated);
+            (_) ->
+                ok
+        end).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/blob/814ed50f/src/couch_index_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_index_server.erl b/src/couch_index_server.erl
index 86791db..2c6ebc9 100644
--- a/src/couch_index_server.erl
+++ b/src/couch_index_server.erl
@@ -14,6 +14,7 @@
 -behaviour(gen_server).
 
 -export([start_link/0, get_index/4, get_index/3, get_index/2]).
+-export([acquire_indexer/3, release_indexer/3]).
 -export([config_change/2, update_notify/1]).
 
 -export([init/1, terminate/2, code_change/3]).
@@ -67,6 +68,23 @@ get_index(Module, IdxState) ->
             gen_server:call(?MODULE, {get_index, Args}, infinity)
     end.
 
+acquire_indexer(Module, DbName, DDoc) ->
+    case get_index(Module, DbName, DDoc) of
+        {ok, Pid} ->
+            couch_index:acquire_indexer(Pid);
+        Error ->
+            Error
+    end.
+
+release_indexer(Module, DbName, DDoc) ->
+    case get_index(Module, DbName, DDoc) of
+        {ok, Pid} ->
+            couch_index:release_indexer(Pid);
+        Error ->
+            Error
+    end.
+
+
 
 init([]) ->
     process_flag(trap_exit, true),


Mime
View raw message