couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [07/19] couch-replicator commit: updated refs/heads/COUCHDB-3288-remove-public-db-record to a8ac02d
Date Tue, 04 Apr 2017 21:17:40 GMT
Restore adding some jitter-ed sleep to shard scanning code.

Otherwise a large cluster will flood replicator manager with potentially
hundreds of thousands of `{resume, Shard}` messages. For each one, it
would try to open a changes feed which can add significant load and has
been seen in production to hit varios system limits.

This brings back the change from before the switch to using mem3 shards
for replicator db scans.

Also adds a few tests.

COUCHDB-3311


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/45d739af
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/45d739af
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/45d739af

Branch: refs/heads/COUCHDB-3288-remove-public-db-record
Commit: 45d739af3fcf8b4f8e3ccca152cb3c2d781dc2fc
Parents: 648e465
Author: Nick Vatamaniuc <vatamane@apache.org>
Authored: Tue Feb 28 14:00:22 2017 -0500
Committer: Nick Vatamaniuc <vatamane@apache.org>
Committed: Tue Feb 28 14:00:22 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 74 ++++++++++++++++++++++++++++-------
 1 file changed, 60 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/45d739af/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index bdc3b8f..4e5073e 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -934,22 +934,30 @@ scan_all_dbs(Server) when is_pid(Server) ->
     {ok, Db} = mem3_util:ensure_exists(
         config:get("mem3", "shards_db", "_dbs")),
     ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
-    ChangesFun(fun({change, {Change}, _}, _) ->
-        DbName = couch_util:get_value(<<"id">>, Change),
-        case DbName of <<"_design/", _/binary>> -> ok; _Else ->
-            case couch_replicator_utils:is_deleted(Change) of
-            true ->
-                ok;
-            false ->
-                [gen_server:cast(Server, {resume_scan, ShardName})
-                    || ShardName <- replicator_shards(DbName)],
-                ok
-            end
-        end;
-        (_, _) -> ok
-    end),
+    ChangesFun({fun scan_changes_cb/3, {Server, 1}}),
     couch_db:close(Db).
 
+scan_changes_cb({change, {Change}, _}, _, {Server, AccCount}) ->
+    DbName = couch_util:get_value(<<"id">>, Change),
+    case DbName of <<"_design/", _/binary>> -> {Server, AccCount}; _Else ->
+        case couch_replicator_utils:is_deleted(Change) of
+        true ->
+            {Server, AccCount};
+        false ->
+            UpdatedCount = lists:foldl(fun(ShardName, Count) ->
+                spawn_link(fun() ->
+                    timer:sleep(jitter(Count)),
+                    gen_server:cast(Server, {resume_scan, ShardName})
+                end),
+                Count + 1
+           end, AccCount, replicator_shards(DbName)),
+           {Server, UpdatedCount}
+        end
+    end;
+
+scan_changes_cb(_, _, {Server, AccCount}) ->
+    {Server, AccCount}.
+
 
 replicator_shards(DbName) ->
     case is_replicator_db(DbName) of
@@ -1027,4 +1035,42 @@ t_fail_non_replicator_shard() ->
     end).
 
 
+scan_dbs_test_() ->
+{
+      foreach,
+      fun() -> test_util:start_couch([mem3, fabric]) end,
+      fun(Ctx) -> test_util:stop_couch(Ctx) end,
+      [
+          t_resume_db_shard(),
+          t_sleep_based_on_count()
+     ]
+}.
+
+
+t_resume_db_shard() ->
+    ?_test(begin
+        DbName0 = ?tempdb(),
+        DbName = <<DbName0/binary, "/_replicator">>,
+        ok = fabric:create_db(DbName, [?CTX]),
+        Change = {[{<<"id">>, DbName}]},
+        scan_changes_cb({change, Change, req}, type, {self(), 1}),
+        ResumeMsg = receive Msg -> Msg after 1000 -> timeout end,
+        ?assertMatch({'$gen_cast', {resume_scan, <<"shards/", _/binary>>}}, ResumeMsg),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
+t_sleep_based_on_count() ->
+    ?_test(begin
+        DbName0 = ?tempdb(),
+        DbName = <<DbName0/binary, "/_replicator">>,
+        ok = fabric:create_db(DbName, [?CTX]),
+        Change = {[{<<"id">>, DbName}]},
+        scan_changes_cb({change, Change, req}, type, {self(), 1000}),
+        Timeout = receive Msg -> Msg after 100 -> timeout end,
+        ?assertEqual(timeout, Timeout),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
 -endif.


Mime
View raw message