couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [couchdb] 04/10: Ensure a user creation is handlined on one node only
Date Sun, 08 Oct 2017 16:12:55 GMT
This is an automated email from the ASF dual-hosted git repository.

jan pushed a commit to branch 749-fix-couch_peruser-app-structure
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a29e1646e610ac2b210ea4a55c38c3159f0b82bb
Author: Jan Lehnardt <jan@apache.org>
AuthorDate: Sat Oct 7 17:04:54 2017 +0200

    Ensure a user creation is handlined on one node only
    
    This patch makes use of the mechanism that ensures that replications
    are only run on one node.
    
    When the cluster has nodes added/removed all changes listeners are
    restarted.
---
 src/couch_peruser/src/couch_peruser.app.src |   2 +-
 src/couch_peruser/src/couch_peruser.erl     | 158 +++++++++++++++++++---------
 2 files changed, 110 insertions(+), 50 deletions(-)

diff --git a/src/couch_peruser/src/couch_peruser.app.src b/src/couch_peruser/src/couch_peruser.app.src
index 777446d..42b7b25 100644
--- a/src/couch_peruser/src/couch_peruser.app.src
+++ b/src/couch_peruser/src/couch_peruser.app.src
@@ -14,7 +14,7 @@
     {description, "couch_peruser - maintains per-user databases in CouchDB"},
     {vsn, git},
     {registered, []},
-    {applications, [kernel, stdlib, config, couch, fabric]},
+    {applications, [kernel, stdlib, config, couch, fabric, couch_replicator, mem3]},
     {mod, {couch_peruser_app, []}},
     {env, []},
     {modules, [couch_peruser, couch_peruser_app, couch_peruser_sup]}
diff --git a/src/couch_peruser/src/couch_peruser.erl b/src/couch_peruser/src/couch_peruser.erl
index 63ef084..9161f56 100644
--- a/src/couch_peruser/src/couch_peruser.erl
+++ b/src/couch_peruser/src/couch_peruser.erl
@@ -22,6 +22,9 @@
 -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
+% cluster state notification callback
+-export([notify_cluster_event/2]).
+
 -export([init_changes_handler/1, changes_handler/3]).
 
 -record(state, {parent, db_name, delete_dbs, changes_pid, changes_ref}).
@@ -34,10 +37,13 @@ start_link() ->
     gen_server:start_link(?MODULE, [], []).
 
 init() ->
+    couch_log:debug("peruser: starting on node ~p", [node()]),
     case config:get_boolean("couch_peruser", "enable", false) of
     false ->
+        couch_log:debug("peruser: disabled on node ~p", [node()]),
         #clusterState{};
     true ->
+        couch_log:debug("peruser: enabled on node ~p", [node()]),
         DbName = ?l2b(config:get(
                          "couch_httpd_auth", "authentication_db", "_users")),
         DeleteDbs = config:get_boolean("couch_peruser", "delete_dbs", false),
@@ -47,21 +53,37 @@ init() ->
             db_name = DbName,
             delete_dbs = DeleteDbs
         },
-        try
-            States = lists:map(fun (A) ->
-                S = #state{parent = ClusterState#clusterState.parent,
-                           db_name = A#shard.name,
-                           delete_dbs = DeleteDbs},
-                {Pid, Ref} = spawn_opt(
-                    ?MODULE, init_changes_handler, [S], [link, monitor]),
-                S#state{changes_pid=Pid, changes_ref=Ref}
-            end, mem3:local_shards(DbName)),
-
-            ClusterState#clusterState{states = States}
-        catch error:database_does_not_exist ->
-            couch_log:warning("couch_peruser can't proceed as underlying database (~s) is
missing, disables itself.", [DbName]),
-            config:set("couch_peruser", "enable", "false", lists:concat([binary_to_list(DbName),
" is missing"]))
-        end
+
+        % set up cluster-stable listener
+        couch_replicator_clustering:link_cluster_event_listener(?MODULE,
+            notify_cluster_event, [self()]),
+
+        couch_log:debug("peruser: registered for cluster event on node ~p", [node()]),
+        ClusterState
+    end.
+
+% Cluster membership change notification callback
+-spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
+notify_cluster_event(Server, {cluster, _} = Event) ->
+    couch_log:debug("peruser: received cluster event ~p on node ~p", [Event, node()]),
+    gen_server:cast(Server, Event).
+
+start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterState) ->
+    couch_log:debug("peruser: start_listening() on node ~p", [node()]),
+    try
+        States = lists:map(fun (A) ->
+            S = #state{parent = ClusterState#clusterState.parent,
+                       db_name = A#shard.name,
+                       delete_dbs = DeleteDbs},
+            {Pid, Ref} = spawn_opt(
+                ?MODULE, init_changes_handler, [S], [link, monitor]),
+            S#state{changes_pid=Pid, changes_ref=Ref}
+        end, mem3:local_shards(DbName)),
+
+        ClusterState#clusterState{states = States}
+    catch error:database_does_not_exist ->
+        couch_log:warning("couch_peruser can't proceed as underlying database (~s) is missing,
disables itself.", [DbName]),
+        config:set("couch_peruser", "enable", "false", lists:concat([binary_to_list(DbName),
" is missing"]))
     end.
 
 init_changes_handler(#state{db_name=DbName} = State) ->
@@ -76,24 +98,30 @@ init_changes_handler(#state{db_name=DbName} = State) ->
         ok
     end.
 
-changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{}) ->
+
+changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName}) ->
     case couch_util:get_value(<<"id">>, Doc) of
-    <<"org.couchdb.user:",User/binary>> ->
-        case couch_util:get_value(<<"deleted">>, Doc, false) of
-        false ->
-            UserDb = ensure_user_db(User),
-            ok = ensure_security(User, UserDb, fun add_user/3),
-            State;
+    <<"org.couchdb.user:",User/binary>>=DocId ->
+        case should_handle_doc(DbName, DocId) of
         true ->
-            case State#state.delete_dbs of
-            true ->
-                _UserDb = delete_user_db(User),
-                State;
+            case couch_util:get_value(<<"deleted">>, Doc, false) of
             false ->
-                UserDb = user_db_name(User),
-                ok = ensure_security(User, UserDb, fun remove_user/3),
-                State
-            end
+                UserDb = ensure_user_db(User),
+                ok = ensure_security(User, UserDb, fun add_user/3),
+                State;
+            true ->
+                case State#state.delete_dbs of
+                true ->
+                    _UserDb = delete_user_db(User),
+                    State;
+                false ->
+                    UserDb = user_db_name(User),
+                    ok = ensure_security(User, UserDb, fun remove_user/3),
+                    State
+                end
+            end;
+        false ->
+            State
         end;
     _ ->
         State
@@ -101,6 +129,25 @@ changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{})
->
 changes_handler(_Event, _ResType, State) ->
     State.
 
+should_handle_doc(DbName, DocId) ->
+  case couch_replicator_clustering:owner(DbName, DocId) of
+      unstable ->
+          % todo: when we do proper resume[1], we can return false here
+          % and rely on a module restart when the cluster is stable again
+          % in the meantime, we risk conflicts when the cluster gets unstable
+          % and users are being created.
+          % [1] https://github.com/apache/couchdb/issues/872
+          true;
+      ThisNode when ThisNode =:= node() ->
+          couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]),
+          % do the deed
+          true;
+      _OtherNode ->
+          couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]),
+          false
+  end.
+
+
 delete_user_db(User) ->
     UserDb = user_db_name(User),
     try
@@ -158,20 +205,25 @@ remove_user(User, Prop, {Modified, SecProps}) ->
     end.
 
 ensure_security(User, UserDb, TransformFun) ->
-    {ok, Shards} = fabric:get_all_security(UserDb, [?ADMIN_CTX]),
-    {_ShardInfo, {SecProps}} = hd(Shards),
-    % assert that shards have the same security object
-    true = lists:all(fun ({_, {SecProps1}}) ->
-        SecProps =:= SecProps1
-    end, Shards),
-    case lists:foldl(
-           fun (Prop, SAcc) -> TransformFun(User, Prop, SAcc) end,
-           {false, SecProps},
-           [<<"admins">>, <<"members">>]) of
-    {false, _} ->
-        ok;
-    {true, SecProps1} ->
-        ok = fabric:set_security(UserDb, {SecProps1}, [?ADMIN_CTX])
+    case fabric:get_all_security(UserDb, [?ADMIN_CTX]) of
+    {error, no_majority} ->
+      % single node, ignore
+       ok;
+    {ok, Shards} ->
+        {_ShardInfo, {SecProps}} = hd(Shards),
+        % assert that shards have the same security object
+        true = lists:all(fun ({_, {SecProps1}}) ->
+            SecProps =:= SecProps1
+        end, Shards),
+        case lists:foldl(
+               fun (Prop, SAcc) -> TransformFun(User, Prop, SAcc) end,
+               {false, SecProps},
+               [<<"admins">>, <<"members">>]) of
+        {false, _} ->
+            ok;
+        {true, SecProps1} ->
+            ok = fabric:set_security(UserDb, {SecProps1}, [?ADMIN_CTX])
+        end
     end.
 
 user_db_name(User) ->
@@ -179,6 +231,11 @@ user_db_name(User) ->
         [string:to_lower(integer_to_list(X, 16)) || <<X>> <= User]),
     <<?USERDB_PREFIX,HexUser/binary>>.
 
+exit_changes(ClusterState) ->
+    lists:foreach(fun (State) ->
+        demonitor(State#state.changes_ref, [flush]),
+        exit(State#state.changes_pid, kill)
+    end, ClusterState#clusterState.states).
 
 %% gen_server callbacks
 
@@ -191,16 +248,19 @@ handle_call(_Msg, _From, State) ->
 
 
 handle_cast(update_config, ClusterState) when ClusterState#clusterState.states =/= undefined
->
-    lists:foreach(fun (State) ->
-        demonitor(State#state.changes_ref, [flush]),
-        exit(State#state.changes_pid, kill)
-    end, ClusterState#clusterState.states),
-
+    exit_changes(ClusterState),
     {noreply, init()};
 handle_cast(update_config, _) ->
     {noreply, init()};
 handle_cast(stop, State) ->
     {stop, normal, State};
+handle_cast({cluster, unstable}, ClusterState) when ClusterState#clusterState.states =/=
undefined ->
+    exit_changes(ClusterState),
+    {noreply, init()};
+handle_cast({cluster, unstable}, _) ->
+    {noreply, init()};
+handle_cast({cluster, stable}, State) ->
+    {noreply, start_listening(State)};
 handle_cast(_Msg, State) ->
     {noreply, State}.
 

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <commits@couchdb.apache.org>.

Mime
View raw message