couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vatam...@apache.org
Subject [41/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae
Date Tue, 14 Mar 2017 19:26:36 GMT
Use mem3 in couch_multidb_changes to discover _replicator shards

This is a forward-port of a corresponding commit in master:

"Use mem3 to discover all _replicator shards in replicator manager"

https://github.com/apache/couchdb-couch-replicator/commit/b281d2bb320ed6e6d8226765315a40637ba91a46

This wasn't a direct merge as replicator shard discovery and traversal is slightly
different.

`couch_multidb_changes` is more generic and takes a db suffix and callback
module. So `<<"_replicator">>` is not hard-coded in multidb changes module.

`couch_replicator_manager` handles local `_replicator` db by directly
creating it and launching a changes feed for it. In the scheduling replicator
creation is separate from monitoring. The logic is handled in `scan_all_dbs`
function where first thing it always checks if there is a local db present
matching the suffix, if so a `{resume_scan, DbName}` is sent to main process.
Due to supervisor order by the time that code runs a local replicator db
will be created already.

COUCHDB-3277


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/fb77cbc4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/fb77cbc4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/fb77cbc4

Branch: refs/heads/63012-scheduler
Commit: fb77cbc463caa573a51f971243a5cb18ee8b2e9a
Parents: c813d0e
Author: Nick Vatamaniuc <vatamane@apache.org>
Authored: Wed Jan 25 01:28:18 2017 -0500
Committer: Nick Vatamaniuc <vatamane@apache.org>
Committed: Wed Jan 25 01:28:18 2017 -0500

----------------------------------------------------------------------
 src/couch_multidb_changes.erl | 169 ++++++++++++++++++++++++-------------
 1 file changed, 111 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/fb77cbc4/src/couch_multidb_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_multidb_changes.erl b/src/couch_multidb_changes.erl
index b9c9ad5..af85a78 100644
--- a/src/couch_multidb_changes.erl
+++ b/src/couch_multidb_changes.erl
@@ -21,6 +21,7 @@
 -export([changes_reader/3, changes_reader_cb/3]).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
 
 -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
 
@@ -249,34 +250,44 @@ changes_reader_cb(_, _, Acc) ->
 
 
 scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
-    Root = config:get("couchdb", "database_dir", "."),
-    NormRoot = couch_util:normpath(Root),
-    Pat = io_lib:format("~s(\\.[0-9]{10,})?.couch$", [DbSuffix]),
-    filelib:fold_files(Root, lists:flatten(Pat), true,
-        fun(Filename, Acc) ->
-            % shamelessly stolen from couch_server.erl
-            NormFilename = couch_util:normpath(Filename),
-            case NormFilename -- NormRoot of
-                [$/ | RelativeFilename] -> ok;
-                RelativeFilename -> ok
-            end,
-            DbName = ?l2b(filename:rootname(RelativeFilename, ".couch")),
-            Jitter = jitter(Acc),
-            spawn_link(fun() ->
-                timer:sleep(Jitter),
-                gen_server:cast(Server, {resume_scan, DbName})
-            end),
-            Acc + 1
-        end, 1).
+    ok = scan_local_db(Server, DbSuffix),
+    {ok, Db} = mem3_util:ensure_exists(
+        config:get("mem3", "shards_db", "_dbs")),
+    ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
+    ChangesFun(fun({change, {Change}, _}, _) ->
+        DbName = couch_util:get_value(<<"id">>, Change),
+        case DbName of <<"_design/", _/binary>> -> ok; _Else ->
+            case couch_replicator_utils:is_deleted(Change) of
+            true ->
+                ok;
+            false ->
+                [gen_server:cast(Server, {resume_scan, ShardName})
+                    || ShardName <- filter_shards(DbName, DbSuffix)],
+                ok
+            end
+        end;
+        (_, _) -> ok
+    end),
+    couch_db:close(Db).
+
+
+filter_shards(DbName, DbSuffix) ->
+    case DbSuffix =:= couch_db:dbname_suffix(DbName) of
+    false ->
+        [];
+    true ->
+        [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+    end.
 
 
-% calculate random delay proportional to the number of replications
-% on current node, in order to prevent a stampede:
-%   - when a source with multiple replication targets fails
-%   - when we restart couch_replication_manager
-jitter(N) ->
-    Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC),
-    random:uniform(Range).
+scan_local_db(Server, DbSuffix) when is_pid(Server) ->
+    case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of
+        {ok, Db} ->
+            gen_server:cast(Server, {resume_scan, DbSuffix}),
+            ok = couch_db:close(Db);
+        _Error ->
+            ok
+    end.
 
 
 is_design_doc({Change}) ->
@@ -336,7 +347,6 @@ couch_multidb_changes_test_() ->
             t_handle_call_resume_scan_no_chfeed_ets_entry(),
             t_start_link(),
             t_start_link_no_ddocs(),
-            t_scanner_finds_shard(),
             t_misc_gen_server_callbacks()
         ]
     }.
@@ -345,7 +355,15 @@ setup() ->
     mock_logs(),
     mock_callback_mod(),
     meck:expect(couch_event, register_all, 1, ok),
-    meck:expect(config, get, fun("couchdb", "database_dir", _) -> ?TEMPDIR end),
+    meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+    meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}),
+    ChangesFun = meck:val(fun(_) -> ok end),
+    meck:expect(couch_changes, handle_changes, 4, ChangesFun),
+    meck:expect(couch_db, open_int,
+        fun(?DBNAME, [?CTX, sys_db]) -> {ok, db};
+            (_, _) -> {not_found, no_db_file}
+        end),
+    meck:expect(couch_db, close, 1, ok),
     mock_changes_reader(),
     % create process to stand in for couch_ever_server
     % mocking erlang:monitor doesn't, so give it real process to monitor
@@ -357,8 +375,7 @@ setup() ->
 teardown(EvtPid) ->
     unlink(EvtPid),
     exit(EvtPid, kill),
-    meck:unload(),
-    delete_shard_file(?DBNAME).
+    meck:unload().
 
 
 t_handle_call_change() ->
@@ -648,15 +665,6 @@ t_start_link_no_ddocs() ->
         exit(Pid, kill)
     end).
 
-t_scanner_finds_shard() ->
-    ?_test(begin
-        ok = create_shard_file(?DBNAME),
-        {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
-        ok = meck:wait(?MOD, db_found, [?DBNAME, zig], 2000),
-        unlink(Pid),
-        exit(Pid, kill)
-    end).
-
 t_misc_gen_server_callbacks() ->
     ?_test(begin
         ?assertEqual(ok, terminate(reason, state)),
@@ -664,6 +672,70 @@ t_misc_gen_server_callbacks() ->
     end).
 
 
+scan_dbs_test_() ->
+{
+      foreach,
+      fun() -> test_util:start_couch([mem3, fabric]) end,
+      fun(Ctx) -> test_util:stop_couch(Ctx) end,
+      [
+          t_pass_shard(),
+          t_fail_shard(),
+          t_pass_local(),
+          t_fail_local()
+     ]
+}.
+
+
+t_pass_shard() ->
+    ?_test(begin
+        DbName0 = ?tempdb(),
+        DbSuffix = <<"_replicator">>,
+        DbName = <<DbName0/binary, "/", DbSuffix/binary>>,
+        ok = fabric:create_db(DbName, [?CTX]),
+        ?assertEqual(8, length(filter_shards(DbName, DbSuffix))),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
+t_fail_shard() ->
+    ?_test(begin
+        DbName = ?tempdb(),
+        ok = fabric:create_db(DbName, [?CTX]),
+        ?assertEqual([], filter_shards(DbName, <<"_replicator">>)),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
+t_pass_local() ->
+    ?_test(begin
+        LocalDb = ?tempdb(),
+        {ok, Db} = couch_db:create(LocalDb, [?CTX]),
+        ok = couch_db:close(Db),
+        scan_local_db(self(), LocalDb),
+        receive
+            {'$gen_cast', Msg} ->
+                ?assertEqual(Msg, {resume_scan, LocalDb})
+        after 0 ->
+                ?assert(false)
+        end
+    end).
+
+
+t_fail_local() ->
+ ?_test(begin
+        LocalDb = ?tempdb(),
+        {ok, Db} = couch_db:create(LocalDb, [?CTX]),
+        ok = couch_db:close(Db),
+        scan_local_db(self(), <<"some_other_db">>),
+        receive
+            {'$gen_cast', Msg} ->
+                ?assertNotEqual(Msg, {resume_scan, LocalDb})
+        after 0 ->
+                ?assert(true)
+        end
+    end).
+
+
 % Test helper functions
 
 mock_logs() ->
@@ -699,7 +771,6 @@ kill_mock_change_reader_and_get_its_args(Pid) ->
     end.
 
 mock_changes_reader() ->
-    meck:expect(couch_db, open_int, 2, {ok, db}),
     meck:expect(couch_changes, handle_db_changes,
         fun(_ChArgs, _Req, db) ->
             fun mock_changes_reader_loop/1
@@ -734,24 +805,6 @@ change_row(Id) when is_binary(Id) ->
         {doc, {[{<<"_id">>, Id}, {<<"_rev">>, <<"1-f00">>}]}}
     ]}.
 
-shard_fname(DbName) ->
-    filename:join([?TEMPDIR, binary_to_list(DbName) ++ ".couch"]).
-
-delete_shard_file(DbName) ->
-    Path = shard_fname(DbName),
-    case filelib:is_file(Path) of
-        true ->
-            ok = file:delete(Path);
-        false ->
-            ok
-    end.
-
-create_shard_file(DbName) ->
-    Path = shard_fname(DbName),
-    ok = filelib:ensure_dir(Path),
-    ok = file:write_file(Path, <<>>).
-
-
 handle_call_ok(Msg, State) ->
     ?assertMatch({reply, ok, _}, handle_call(Msg, from, State)).
 


Mime
View raw message