couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] 02/03: Use a temporary process when caching shard maps
Date Fri, 21 Apr 2017 15:46:39 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3376-fix-mem3-shards
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b71677f593fb62e590f37c2cbb5bb851bd12c11c
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Fri Apr 14 12:18:49 2017 -0500

    Use a temporary process when caching shard maps
    
    This change introduces a new shard_writer process into the mem3_shards
    caching approach. It turns out its not terribly difficult to create
    thundering herd scenarios that back up the mem3_shards mailbox. And if
    the Q value is large this backup can happen quite quickly.
    
    This changes things so that we use a temporary process to perform the
    actual `ets:insert/2` call which keeps the shard map out of mem3_shards'
    message queue.
    
    A second optimization is that only a single client will attempt to send
    the shard map to begin with by checking the existence of the writer key
    using `ets:insert_new/2`.
    
    COUCHDB-3376
---
 src/mem3/src/mem3_shards.erl | 96 ++++++++++++++++++++++++++++++++++++++------
 1 file changed, 83 insertions(+), 13 deletions(-)

diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index ca5deaf..bbdc3b5 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -28,7 +28,8 @@
     max_size = 25000,
     cur_size = 0,
     changes_pid,
-    update_seq
+    update_seq,
+    write_timeout
 }).
 
 -include_lib("mem3/include/mem3.hrl").
@@ -37,6 +38,7 @@
 -define(DBS, mem3_dbs).
 -define(SHARDS, mem3_shards).
 -define(ATIMES, mem3_atimes).
+-define(OPENERS, mem3_openers).
 -define(RELISTEN_DELAY, 5000).
 
 start_link() ->
@@ -172,6 +174,13 @@ handle_config_change("mem3", "shard_cache_size", SizeList, _, _) ->
     {ok, gen_server:call(?MODULE, {set_max_size, Size}, infinity)};
 handle_config_change("mem3", "shards_db", _DbName, _, _) ->
     {ok, gen_server:call(?MODULE, shard_db_changed, infinity)};
+handle_config_change("mem3", "shard_write_timeout", Timeout, _, _) ->
+    Timeout = try
+        list_to_integer(Timeout)
+    catch _:_ ->
+        1000
+    end,
+    {ok, gen_server:call(?MODULE, {set_write_timeout, Timeout})};
 handle_config_change(_, _, _, _, _) ->
     {ok, nil}.
 
@@ -183,21 +192,24 @@ handle_config_terminate(_Server, _Reason, _State) ->
 init([]) ->
     ets:new(?SHARDS, [
         bag,
-        protected,
+        public,
         named_table,
         {keypos,#shard.dbname},
         {read_concurrency, true}
     ]),
     ets:new(?DBS, [set, protected, named_table]),
     ets:new(?ATIMES, [ordered_set, protected, named_table]),
+    ets:new(?OPENERS, [bag, public, named_table]),
     ok = config:listen_for_changes(?MODULE, nil),
     SizeList = config:get("mem3", "shard_cache_size", "25000"),
+    WriteTimeout = config:get_integer("mem3", "shard_write_timeout", 1000),
     UpdateSeq = get_update_seq(),
     {ok, #st{
         max_size = list_to_integer(SizeList),
         cur_size = 0,
         changes_pid = start_changes_listener(UpdateSeq),
-        update_seq = UpdateSeq
+        update_seq = UpdateSeq,
+        write_timeout = WriteTimeout
     }}.
 
 handle_call({set_max_size, Size}, _From, St) ->
@@ -205,6 +217,8 @@ handle_call({set_max_size, Size}, _From, St) ->
 handle_call(shard_db_changed, _From, St) ->
     exit(St#st.changes_pid, shard_db_changed),
     {reply, ok, St};
+handle_call({set_write_timeout, Timeout}, _From, St) ->
+    {reply, ok, St#st{write_timeout = Timeout}};
 handle_call(_Call, _From, St) ->
     {noreply, St}.
 
@@ -212,23 +226,25 @@ handle_cast({cache_hit, DbName}, St) ->
     couch_stats:increment_counter([mem3, shard_cache, hit]),
     cache_hit(DbName),
     {noreply, St};
-handle_cast({cache_insert, DbName, Shards, UpdateSeq}, St) ->
-    couch_stats:increment_counter([mem3, shard_cache, miss]),
+handle_cast({cache_insert, DbName, Writer, UpdateSeq}, St) ->
     % This comparison correctly uses the `<` operator
     % and not `=<`. The easiest way to understand why is
     % to think of when a _dbs db doesn't change. If it used
     % `=<` it would be impossible to insert anything into
     % the cache.
     NewSt = case UpdateSeq < St#st.update_seq of
-        true -> St;
-        false -> cache_free(cache_insert(St, DbName, Shards))
+        true ->
+            Writer ! cancel,
+            St;
+        false ->
+            cache_free(cache_insert(St, DbName, Writer, St#st.write_timeout))
     end,
     {noreply, NewSt};
 handle_cast({cache_remove, DbName}, St) ->
     couch_stats:increment_counter([mem3, shard_cache, eviction]),
     {noreply, cache_remove(St, DbName)};
-handle_cast({cache_insert_change, DbName, Shards, UpdateSeq}, St) ->
-    Msg = {cache_insert, DbName, Shards, UpdateSeq},
+handle_cast({cache_insert_change, DbName, Writer, UpdateSeq}, St) ->
+    Msg = {cache_insert, DbName, Writer, UpdateSeq},
     {noreply, NewSt} = handle_cast(Msg, St),
     {noreply, NewSt#st{update_seq = UpdateSeq}};
 handle_cast({cache_remove_change, DbName, UpdateSeq}, St) ->
@@ -333,7 +349,11 @@ changes_callback({change, {Change}, _}, _) ->
                     [DbName, Reason]);
             {Doc} ->
                 Shards = mem3_util:build_ordered_shards(DbName, Doc),
-                Msg = {cache_insert_change, DbName, Shards, Seq},
+                IdleTimeout = config:get_integer(
+                        "mem3", "writer_idle_timeout", 30000),
+                Writer = spawn_shard_writer(DbName, Shards, IdleTimeout),
+                ets:insert(?OPENERS, {DbName, Writer}),
+                Msg = {cache_insert_change, DbName, Writer, Seq},
                 gen_server:cast(?MODULE, Msg),
                 [create_if_missing(mem3:name(S)) || S
                     <- Shards, mem3:node(S) =:= node()]
@@ -345,6 +365,7 @@ changes_callback(timeout, _) ->
     ok.
 
 load_shards_from_disk(DbName) when is_binary(DbName) ->
+    couch_stats:increment_counter([mem3, shard_cache, miss]),
     X = ?l2b(config:get("mem3", "shards_db", "_dbs")),
     {ok, Db} = mem3_util:ensure_exists(X),
     try
@@ -358,7 +379,19 @@ load_shards_from_db(#db{} = ShardDb, DbName) ->
     {ok, #doc{body = {Props}}} ->
         Seq = couch_db:get_update_seq(ShardDb),
         Shards = mem3_util:build_ordered_shards(DbName, Props),
-        gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq}),
+        IdleTimeout = config:get_integer("mem3", "writer_idle_timeout", 30000),
+        case maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) of
+            Writer when is_pid(Writer) ->
+                case ets:insert_new(?OPENERS, {DbName, Writer}) of
+                    true ->
+                        Msg = {cache_insert, DbName, Writer, Seq},
+                        gen_server:cast(?MODULE, Msg);
+                    false ->
+                        Writer ! cancel
+                end;
+            ignore ->
+                ok
+        end,
         Shards;
     {not_found, _} ->
         erlang:error(database_does_not_exist, ?b2l(DbName))
@@ -389,10 +422,10 @@ create_if_missing(Name) ->
         end
     end.
 
-cache_insert(#st{cur_size=Cur}=St, DbName, Shards) ->
+cache_insert(#st{cur_size=Cur}=St, DbName, Writer, Timeout) ->
     NewATime = now(),
     true = ets:delete(?SHARDS, DbName),
-    true = ets:insert(?SHARDS, Shards),
+    flush_write(DbName, Writer, Timeout),
     case ets:lookup(?DBS, DbName) of
         [{DbName, ATime}] ->
             true = ets:delete(?ATIMES, ATime),
@@ -443,6 +476,43 @@ cache_clear(St) ->
     true = ets:delete_all_objects(?ATIMES),
     St#st{cur_size=0}.
 
+maybe_spawn_shard_writer(DbName, Shards) ->
+    case ets:member(?OPENERS, DbName) of
+        true ->
+            ignore;
+        false ->
+            spawn_shard_writer(DbName, Shards)
+    end.
+
+spawn_shard_writer(DbName, Shards) ->
+    erlang:spawn(fun() -> shard_writer(DbName, Shards) end).
+
+shard_writer(DbName, Shards) ->
+    try
+        receive
+            write ->
+                true = ets:insert(?SHARDS, Shards);
+            cancel ->
+                ok
+        after ?WRITE_IDLE_TIMEOUT ->
+            ok
+        end
+    after
+        true = ets:delete_object(?OPENERS, {DbName, self()})
+    end.
+
+flush_write(DbName, Writer) ->
+    Ref = erlang:monitor(process, Writer),
+    Writer ! write,
+    receive
+        {'DOWN', Ref, _, _, normal} ->
+            ok;
+        {'DOWN', Ref, _, _, Error} ->
+            erlang:exit({mem3_shards_bad_write, Error})
+    after ?WRITE_TIMEOUT ->
+        erlang:exit({mem3_shards_write_timeout, DbName})
+    end.
+
 filter_shards_by_name(Name, Shards) ->
     filter_shards_by_name(Name, [], Shards).
 

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <commits@couchdb.apache.org>.

Mime
View raw message