couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [couchdb] branch COUCHDB-3287-pluggable-storage-engines updated: Use a temporary process when caching shard maps
Date Fri, 14 Apr 2017 17:21:48 GMT
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3287-pluggable-storage-engines
in repository https://gitbox.apache.org/repos/asf/couchdb.git

The following commit(s) were added to refs/heads/COUCHDB-3287-pluggable-storage-engines by
this push:
       new  1bdf84c   Use a temporary process when caching shard maps
1bdf84c is described below

commit 1bdf84cc5b6eb6f8b8a5e78c472d9b5e66aeefd1
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
AuthorDate: Fri Apr 14 12:18:49 2017 -0500

    Use a temporary process when caching shard maps
    
    This change introduces a new shard_writer process into the mem3_shards
    caching approach. It turns out its not terribly difficult to create
    thundering herd scenarios that back up the mem3_shards mailbox. And if
    the Q value is large this backup can happen quite quickly.
    
    This changes things so that we use a temporary process to perform the
    actual `ets:insert/2` call which keeps the shard map out of mem3_shards'
    message queue.
    
    A second optimization is that only a single client will attempt to send
    the shard map to begin with by checking the existence of the writer key
    using `ets:insert_new/2`.
---
 src/mem3/src/mem3_shards.erl | 76 ++++++++++++++++++++++++++++++++++++--------
 1 file changed, 63 insertions(+), 13 deletions(-)

diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index b26baf8..c247284 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -37,7 +37,10 @@
 -define(DBS, mem3_dbs).
 -define(SHARDS, mem3_shards).
 -define(ATIMES, mem3_atimes).
+-define(OPENERS, mem3_openers).
 -define(RELISTEN_DELAY, 5000).
+-define(WRITE_TIMEOUT, 1000).
+-define(WRITE_IDLE_TIMEOUT, 30000).
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -179,13 +182,14 @@ handle_config_terminate(_Server, _Reason, _State) ->
 init([]) ->
     ets:new(?SHARDS, [
         bag,
-        protected,
+        public,
         named_table,
         {keypos,#shard.dbname},
         {read_concurrency, true}
     ]),
     ets:new(?DBS, [set, protected, named_table]),
     ets:new(?ATIMES, [ordered_set, protected, named_table]),
+    ets:new(?OPENERS, [bag, public, named_table]),
     ok = config:listen_for_changes(?MODULE, nil),
     SizeList = config:get("mem3", "shard_cache_size", "25000"),
     UpdateSeq = get_update_seq(),
@@ -208,22 +212,22 @@ handle_cast({cache_hit, DbName}, St) ->
     couch_stats:increment_counter([mem3, shard_cache, hit]),
     cache_hit(DbName),
     {noreply, St};
-handle_cast({cache_insert, DbName, Shards, UpdateSeq}, St) ->
-    couch_stats:increment_counter([mem3, shard_cache, miss]),
+handle_cast({cache_insert, DbName, Writer, UpdateSeq}, St) ->
     NewSt = case UpdateSeq < St#st.update_seq of
         true ->
             couch_log:notice("~p ignoring add ~s :: ~p < ~p", [?MODULE, DbName, UpdateSeq,
St#st.update_seq]),
+            Writer ! cancel,
             St;
         false ->
-            cache_free(cache_insert(St, DbName, Shards))
+            cache_free(cache_insert(St, DbName, Writer))
     end,
     {noreply, NewSt};
 handle_cast({cache_remove, DbName}, St) ->
     couch_stats:increment_counter([mem3, shard_cache, eviction]),
     couch_log:notice("~p removing ~s ~p", [?MODULE, DbName, St#st.update_seq]),
     {noreply, cache_remove(St, DbName)};
-handle_cast({cache_insert_change, DbName, Shards, UpdateSeq}, St) ->
-    Msg = {cache_insert, DbName, Shards, UpdateSeq},
+handle_cast({cache_insert_change, DbName, Writer, UpdateSeq}, St) ->
+    Msg = {cache_insert, DbName, Writer, UpdateSeq},
     {noreply, NewSt} = handle_cast(Msg, St),
     {noreply, NewSt#st{update_seq = UpdateSeq}};
 handle_cast({cache_remove_change, DbName, UpdateSeq}, St) ->
@@ -331,7 +335,9 @@ changes_callback({change, {Change}, _}, _) ->
                     [DbName, Reason]);
             {Doc} ->
                 Shards = mem3_util:build_ordered_shards(DbName, Doc),
-                Msg = {cache_insert_change, DbName, Shards, Seq},
+                Writer = spawn_shard_writer(DbName, Shards),
+                ets:insert(?OPENERS, {DbName, Writer}),
+                Msg = {cache_insert_change, DbName, Writer, Seq},
                 gen_server:cast(?MODULE, Msg),
                 [create_if_missing(mem3:name(S), mem3:engine(S)) || S
                     <- Shards, mem3:node(S) =:= node()]
@@ -343,6 +349,7 @@ changes_callback(timeout, _) ->
     ok.
 
 load_shards_from_disk(DbName) when is_binary(DbName) ->
+    couch_stats:increment_counter([mem3, shard_cache, miss]),
     X = ?l2b(config:get("mem3", "shards_db", "_dbs")),
     {ok, Db} = mem3_util:ensure_exists(X),
     try
@@ -356,10 +363,16 @@ load_shards_from_db(ShardDb, DbName) ->
     {ok, #doc{body = {Props}}} ->
         Seq = couch_db:get_update_seq(ShardDb),
         Shards = mem3_util:build_ordered_shards(DbName, Props),
-        case erlang:process_info(whereis(?MODULE), message_queue_len) of
-            {_, N} when is_integer(N), N < 5000 ->
-                gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq});
-            _ ->
+        case maybe_spawn_shard_writer(DbName, Shards) of
+            Writer when is_pid(Writer) ->
+                case ets:insert_new(?OPENERS, {DbName, Writer}) of
+                    true ->
+                        Msg = {cache_insert, DbName, Writer, Seq},
+                        gen_server:cast(?MODULE, Msg);
+                    false ->
+                        Writer ! cancel
+                end;
+            ignore ->
                 ok
         end,
         Shards;
@@ -390,10 +403,10 @@ create_if_missing(Name, Options) ->
             end
     end.
 
-cache_insert(#st{cur_size=Cur}=St, DbName, Shards) ->
+cache_insert(#st{cur_size=Cur}=St, DbName, Writer) ->
     NewATime = now(),
     true = ets:delete(?SHARDS, DbName),
-    true = ets:insert(?SHARDS, Shards),
+    flush_write(DbName, Writer),
     case ets:lookup(?DBS, DbName) of
         [{DbName, ATime}] ->
             couch_log:notice("~p cache_insert ~s :: ~p :: ~p -> ~p", [?MODULE, DbName,
St, ATime, NewATime]),
@@ -451,6 +464,43 @@ cache_clear(St) ->
     true = ets:delete_all_objects(?ATIMES),
     St#st{cur_size=0}.
 
+maybe_spawn_shard_writer(DbName, Shards) ->
+    case ets:member(?OPENERS, DbName) of
+        true ->
+            ignore;
+        false ->
+            spawn_shard_writer(DbName, Shards)
+    end.
+
+spawn_shard_writer(DbName, Shards) ->
+    erlang:spawn(fun() -> shard_writer(DbName, Shards) end).
+
+shard_writer(DbName, Shards) ->
+    try
+        receive
+            write ->
+                true = ets:insert(?SHARDS, Shards);
+            cancel ->
+                ok
+        after ?WRITE_IDLE_TIMEOUT ->
+            ok
+        end
+    after
+        true = ets:delete(?OPENERS, {DbName, self()})
+    end.
+
+flush_write(DbName, Writer) ->
+    Ref = erlang:monitor(process, Writer),
+    Writer ! write,
+    receive
+        {'DOWN', Ref, _, _, normal} ->
+            ok;
+        {'DOWN', Ref, _, _, Error} ->
+            erlang:exit({mem3_shards_bad_write, Error})
+    after ?WRITE_TIMEOUT ->
+        erlang:exit({mem3_shards_write_timeout, DbName})
+    end.
+
 filter_shards_by_name(Name, Shards) ->
     filter_shards_by_name(Name, [], Shards).
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <commits@couchdb.apache.org>'].

Mime
View raw message