couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vatam...@apache.org
Subject [couchdb] branch master updated: Close idle dbs
Date Wed, 07 Jun 2017 20:13:36 GMT
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ba0c10bc Close idle dbs
ba0c10bc is described below

commit ba0c10bcf49be66b042d8f0deaf14bcfda1a49bf
Author: Nick Vatamaniuc <vatamane@apache.org>
AuthorDate: Wed May 24 03:37:04 2017 -0400

    Close idle dbs
    
    Previously idle dbs, especially sys dbs like _replicator once opened
    once for scanning would stay open forever. In a large cluster with many
    _replicator shards that can add up to a significant overhead, mostly in terms
    of number of active processes.
    
    Add a mechanism to close dbs which have an idle db updater. Before hibernation
    was used to limit the memory pressure, however that is often not enough.
    
    Some databases are only read periodically so their updater would time
    out. To prevent that from happening keep the last read timestamp in
    the couch file process dictionary. Idle check then avoid closing dbs
    which have been recently read from.
    
    (Original idea for using timeouts in gen_server replies belongs to
    Paul Davis)
    
    COUCHDB-3323
---
 src/couch/src/couch_db_updater.erl | 108 +++++++++++++++++++++++++++----------
 src/couch/src/couch_file.erl       |  26 ++++++---
 src/couch/src/couch_server.erl     |  28 ++++++++++
 src/couch/src/couch_util.erl       |  19 +++++++
 4 files changed, 146 insertions(+), 35 deletions(-)

diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index bb8e9da..49061b2 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -21,6 +21,8 @@
 
 -include_lib("couch/include/couch_db.hrl").
 
+-define(IDLE_LIMIT_DEFAULT, 61000).
+
 -record(comp_header, {
     db_header,
     meta_state
@@ -36,6 +38,7 @@
 
 init({DbName, Filepath, Fd, Options}) ->
     erlang:put(io_priority, {db_update, DbName}),
+    update_idle_limit_from_config(),
     case lists:member(create, Options) of
     true ->
         % create a new header and writes it to the file
@@ -70,7 +73,7 @@ init({DbName, Filepath, Fd, Options}) ->
     % we don't load validation funs here because the fabric query is liable to
     % race conditions.  Instead see couch_db:validate_doc_update, which loads
     % them lazily
-    {ok, Db#db{main_pid = self()}}.
+    {ok, Db#db{main_pid = self()}, idle_limit()}.
 
 
 terminate(_Reason, Db) ->
@@ -84,23 +87,23 @@ terminate(_Reason, Db) ->
     ok.
 
 handle_call(get_db, _From, Db) ->
-    {reply, {ok, Db}, Db};
+    {reply, {ok, Db}, Db, idle_limit()};
 handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
-    {reply, ok, Db}; % no data waiting, return ok immediately
+    {reply, ok, Db, idle_limit()}; % no data waiting, return ok immediately
 handle_call(full_commit, _From,  Db) ->
-    {reply, ok, commit_data(Db)};
+    {reply, ok, commit_data(Db), idle_limit()};
 handle_call({full_commit, RequiredSeq}, _From, Db)
         when RequiredSeq =< Db#db.committed_update_seq ->
-    {reply, ok, Db};
+    {reply, ok, Db, idle_limit()};
 handle_call({full_commit, _}, _, Db) ->
-    {reply, ok, commit_data(Db)}; % commit the data and return ok
+    {reply, ok, commit_data(Db), idle_limit()}; % commit the data and return ok
 handle_call(start_compact, _From, Db) ->
-    {noreply, NewDb} = handle_cast(start_compact, Db),
-    {reply, {ok, NewDb#db.compactor_pid}, NewDb};
+    {noreply, NewDb, _Timeout} = handle_cast(start_compact, Db),
+    {reply, {ok, NewDb#db.compactor_pid}, NewDb, idle_limit()};
 handle_call(compactor_pid, _From, #db{compactor_pid = Pid} = Db) ->
-    {reply, Pid, Db};
+    {reply, Pid, Db, idle_limit()};
 handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
-    {reply, ok, Db};
+    {reply, ok, Db, idle_limit()};
 handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
     unlink(Pid),
     exit(Pid, kill),
@@ -108,12 +111,12 @@ handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
     ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"),
     Db2 = Db#db{compactor_pid = nil},
     ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
-    {reply, ok, Db2};
+    {reply, ok, Db2, idle_limit()};
 handle_call(increment_update_seq, _From, Db) ->
     Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
     ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
     couch_event:notify(Db#db.name, updated),
-    {reply, {ok, Db2#db.update_seq}, Db2};
+    {reply, {ok, Db2#db.update_seq}, Db2, idle_limit()};
 
 handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
     {ok, Ptr, _} = couch_file:append_term(
@@ -121,17 +124,17 @@ handle_call({set_security, NewSec}, _From, #db{compression = Comp} =
Db) ->
     Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
             update_seq=Db#db.update_seq+1}),
     ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
-    {reply, ok, Db2};
+    {reply, ok, Db2, idle_limit()};
 
 handle_call({set_revs_limit, Limit}, _From, Db) ->
     Db2 = commit_data(Db#db{revs_limit=Limit,
             update_seq=Db#db.update_seq+1}),
     ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
-    {reply, ok, Db2};
+    {reply, ok, Db2, idle_limit()};
 
 handle_call({purge_docs, _IdRevs}, _From,
         #db{compactor_pid=Pid}=Db) when Pid /= nil ->
-    {reply, {error, purge_during_compaction}, Db};
+    {reply, {error, purge_during_compaction}, Db, idle_limit()};
 handle_call({purge_docs, IdRevs}, _From, Db) ->
     #db{
         fd = Fd,
@@ -199,13 +202,14 @@ handle_call({purge_docs, IdRevs}, _From, Db) ->
 
     ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
     couch_event:notify(Db#db.name, updated),
-    {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2}.
+    {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2,
+        idle_limit()}.
 
 
 handle_cast({load_validation_funs, ValidationFuns}, Db) ->
     Db2 = Db#db{validate_doc_funs = ValidationFuns},
     ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
-    {noreply, Db2};
+    {noreply, Db2, idle_limit()};
 handle_cast(start_compact, Db) ->
     case Db#db.compactor_pid of
     nil ->
@@ -213,10 +217,10 @@ handle_cast(start_compact, Db) ->
         Pid = spawn_link(fun() -> start_copy_compact(Db) end),
         Db2 = Db#db{compactor_pid=Pid},
         ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
-        {noreply, Db2};
+        {noreply, Db2, idle_limit()};
     _ ->
         % compact currently running, this is a no-op
-        {noreply, Db}
+        {noreply, Db, idle_limit()}
     end;
 handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath,fd=Fd}=Db) ->
     {ok, NewFd} = couch_file:open(CompactFilepath),
@@ -259,7 +263,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath,fd=Fd}=Db)
->
         ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
         couch_event:notify(NewDb3#db.name, compacted),
         couch_log:info("Compaction for db \"~s\" completed.", [Db#db.name]),
-        {noreply, NewDb3#db{compactor_pid=nil}};
+        {noreply, NewDb3#db{compactor_pid=nil}, idle_limit()};
     false ->
         couch_log:info("Compaction file still behind main file "
                        "(update seq=~p. compact update seq=~p). Retrying.",
@@ -268,9 +272,12 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath,fd=Fd}=Db)
->
         Pid = spawn_link(fun() -> start_copy_compact(Db) end),
         Db2 = Db#db{compactor_pid=Pid},
         ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
-        {noreply, Db2}
+        {noreply, Db2, idle_limit()}
     end;
 
+handle_cast(wakeup, Db) ->
+    {noreply, Db, idle_limit()};
+
 handle_cast(Msg, #db{name = Name} = Db) ->
     couch_log:error("Database `~s` updater received unexpected cast: ~p",
                     [Name, Msg]),
@@ -317,30 +324,49 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
             false ->
                 Db2
         end,
-        {noreply, Db3, hibernate}
+        {noreply, Db3, hibernate_if_no_idle_limit()}
     catch
         throw: retry ->
             [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
-            {noreply, Db, hibernate}
+            {noreply, Db, hibernate_if_no_idle_limit()}
     end;
 handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) ->
     %no outstanding delayed commits, ignore
-    {noreply, Db};
+    {noreply, Db, idle_limit()};
 handle_info(delayed_commit, Db) ->
     case commit_data(Db) of
         Db ->
-            {noreply, Db};
+            {noreply, Db, idle_limit()};
         Db2 ->
             ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
-            {noreply, Db2}
+            {noreply, Db2, idle_limit()}
     end;
 handle_info({'EXIT', _Pid, normal}, Db) ->
-    {noreply, Db};
+    {noreply, Db, idle_limit()};
 handle_info({'EXIT', _Pid, Reason}, Db) ->
     {stop, Reason, Db};
 handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
     couch_log:error("DB ~s shutting down - Fd ~p", [Name, Reason]),
-    {stop, normal, Db#db{fd=undefined, fd_monitor=closed}}.
+    {stop, normal, Db#db{fd=undefined, fd_monitor=closed}};
+handle_info(timeout, #db{fd=Fd, name=DbName} = Db) ->
+    IdleLimitMSec = update_idle_limit_from_config(),
+    case couch_db:is_idle(Db) of
+        true ->
+            MSecSinceLastRead = couch_file:msec_since_last_read(Fd),
+            case MSecSinceLastRead > IdleLimitMSec of
+                true ->
+                    ok = couch_server:close_db_if_idle(DbName);
+                false ->
+                    ok
+            end;
+        false ->
+            ok
+    end,
+    % Send a message to wake up and then hibernate. Hibernation here is done to
+    % force a thorough garbage collection.
+    gen_server:cast(self(), wakeup),
+    {noreply, Db, hibernate}.
+
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -1457,3 +1483,29 @@ default_security_object(_DbName) ->
         "everyone" ->
             []
     end.
+
+% These functions rely on using the process dictionary. This is
+% usually frowned upon however in this case it is done to avoid
+% changing to a different server state record. Once PSE (Pluggable
+% Storage Engine) code lands this should be moved to the #db{} record.
+update_idle_limit_from_config() ->
+    Default = integer_to_list(?IDLE_LIMIT_DEFAULT),
+    IdleLimit = case config:get("couchdb", "idle_check_timeout", Default) of
+        "infinity" ->
+            infinity;
+        Milliseconds ->
+            list_to_integer(Milliseconds)
+    end,
+    put(idle_limit, IdleLimit),
+    IdleLimit.
+
+idle_limit() ->
+    get(idle_limit).
+
+hibernate_if_no_idle_limit() ->
+    case idle_limit() of
+        infinity ->
+            hibernate;
+        Timeout when is_integer(Timeout) ->
+            Timeout
+    end.
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index d40c525..8df462b 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -44,6 +44,7 @@
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
 -export([write_header/2, read_header/1]).
 -export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
+-export([msec_since_last_read/1]).
 
 % gen_server callbacks
 -export([init/1, terminate/2, code_change/3]).
@@ -338,12 +339,25 @@ init_status_error(ReturnPid, Ref, Error) ->
     ReturnPid ! {Ref, self(), Error},
     ignore.
 
+
+% Return time since last read. The return value is conservative in the
+% sense that if no read timestamp has been found, it would return 0. This
+% result is used to decide if reader is idle so returning 0 will avoid marking
+% it idle by accident when process is starting up.
+msec_since_last_read(Fd) when is_pid(Fd) ->
+    Now = os:timestamp(),
+    LastRead = couch_util:process_dict_get(Fd, read_timestamp, Now),
+    DtMSec = timer:now_diff(Now, LastRead) div 1000,
+    max(0, DtMSec).
+
+
 % server functions
 
 init({Filepath, Options, ReturnPid, Ref}) ->
     OpenOptions = file_open_options(Options),
     Limit = get_pread_limit(),
     IsSys = lists:member(sys_db, Options),
+    update_read_timestamp(),
     case lists:member(create, Options) of
     true ->
         filelib:ensure_dir(Filepath),
@@ -422,6 +436,7 @@ handle_call(close, _From, #file{fd=Fd}=File) ->
     {stop, normal, file:close(Fd), File#file{fd = nil}};
 
 handle_call({pread_iolist, Pos}, _From, File) ->
+    update_read_timestamp(),
     {LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4),
     case iolist_to_binary(LenIolist) of
     <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term
@@ -695,13 +710,10 @@ is_idle(#file{is_sys=false}) ->
     {Fd :: pid() | tuple(), FilePath :: string()} | undefined.
 
 process_info(Pid) ->
-    {dictionary, Dict} = erlang:process_info(Pid, dictionary),
-    case lists:keyfind(couch_file_fd, 1, Dict) of
-        false ->
-            undefined;
-        {couch_file_fd, {Fd, InitialName}} ->
-            {Fd, InitialName}
-    end.
+    couch_util:process_dict_get(Pid, couch_file_fd).
+
+update_read_timestamp() ->
+    put(read_timestamp, os:timestamp()).
 
 upgrade_state(#file{db_monitor=DbPid}=File) when is_pid(DbPid) ->
     unlink(DbPid),
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index ad2a5f0..26c6c77 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -21,6 +21,7 @@
 -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
 -export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
 -export([close_lru/0]).
+-export([close_db_if_idle/1]).
 
 % config_listener api
 -export([handle_config_change/5, handle_config_terminate/3]).
@@ -173,6 +174,15 @@ hash_admin_passwords(Persist) ->
             config:set("admins", User, ?b2l(HashedPassword), Persist)
         end, couch_passwords:get_unhashed_admins()).
 
+close_db_if_idle(DbName) ->
+    case ets:lookup(couch_dbs, DbName) of
+        [#db{}] ->
+            gen_server:cast(couch_server, {close_db_if_idle, DbName});
+         _ ->
+            ok
+    end.
+
+
 init([]) ->
     % read config and register for configuration changes
 
@@ -508,6 +518,24 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true}
=
     {noreply, Server#server{lru = couch_lru:update(DbName, Lru)}};
 handle_cast({update_lru, _DbName}, Server) ->
     {noreply, Server};
+handle_cast({close_db_if_idle, DbName}, Server) ->
+    case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of
+    true ->
+        [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName),
+        case couch_db:is_idle(Db) of
+        true ->
+            true = ets:delete(couch_dbs, DbName),
+            true = ets:delete(couch_dbs_pid_to_name, Pid),
+            exit(Pid, kill),
+            {noreply, db_closed(Server, Db#db.options)};
+        false ->
+            true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}),
+            {noreply, Server}
+        end;
+    false ->
+        {noreply, Server}
+    end;
+
 handle_cast(Msg, Server) ->
     {stop, {unknown_cast_message, Msg}, Server}.
 
diff --git a/src/couch/src/couch_util.erl b/src/couch/src/couch_util.erl
index 6001ae2..4b84861 100644
--- a/src/couch/src/couch_util.erl
+++ b/src/couch/src/couch_util.erl
@@ -33,6 +33,7 @@
 -export([find_in_binary/2]).
 -export([callback_exists/3, validate_callback_exists/3]).
 -export([with_proc/4]).
+-export([process_dict_get/2, process_dict_get/3]).
 
 -include_lib("couch/include/couch_db.hrl").
 
@@ -598,3 +599,21 @@ with_proc(M, F, A, Timeout) ->
         erlang:demonitor(Ref, [flush]),
         {error, timeout}
     end.
+
+
+process_dict_get(Pid, Key) ->
+    process_dict_get(Pid, Key, undefined).
+
+
+process_dict_get(Pid, Key, DefaultValue) ->
+    case process_info(Pid, dictionary) of
+        {dictionary, Dict} ->
+            case lists:keyfind(Key, 1, Dict) of
+                false ->
+                    DefaultValue;
+                {Key, Value} ->
+                    Value
+            end;
+        undefined ->
+            DefaultValue
+    end.

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <commits@couchdb.apache.org>'].

Mime
View raw message