kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 0.11.0 updated: KAFKA-7255: Fix timing issue with create/update in SimpleAclAuthorizer (#5478)
Date Wed, 08 Aug 2018 18:39:38 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 06b8f09  KAFKA-7255: Fix timing issue with create/update in SimpleAclAuthorizer (#5478)
06b8f09 is described below

commit 06b8f09bf37be5d8caeba87a0a03d87187bdc42f
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Wed Aug 8 17:44:57 2018 +0100

    KAFKA-7255: Fix timing issue with create/update in SimpleAclAuthorizer (#5478)
    
    ACL updates currently get `(currentAcls, currentVersion)` for the resource from ZK and
do a conditional update using `(currentAcls+newAcl, currentVersion)`. This supports concurrent
atomic updates if the resource path already exists in ZK. If the path doesn't exist, we currently
do a conditional createOrUpdate using `(newAcl, 0)`. So two brokers adding acls using `(newAcl1,
0)` and `(newAcl2, 0)` will result in one broker creating the path and setting newAcl1, while
the other broker c [...]
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 50 ++++++++++++++--------
 1 file changed, 33 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index c947a2e..eb98193 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -21,12 +21,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
 
 import kafka.network.RequestChannel.Session
-import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth.SimpleAclAuthorizer._
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
-import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import scala.collection.JavaConverters._
 import org.apache.log4j.Logger
@@ -64,7 +63,11 @@ object SimpleAclAuthorizer {
   //prefix of all the change notification sequence node.
   val AclChangedPrefix = "acl_changes_"
 
-  private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
+  private val UnknownZkVersion = -2
+  private case class VersionedAcls(acls: Set[Acl], zkVersion: Int) {
+    def exists: Boolean = zkVersion != UnknownZkVersion
+  }
+  private val NoAcls = VersionedAcls(Set.empty, UnknownZkVersion)
 }
 
 class SimpleAclAuthorizer extends Authorizer with Logging {
@@ -194,7 +197,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   override def removeAcls(resource: Resource): Boolean = {
     inWriteLock(lock) {
       val result = zkUtils.deletePath(toResourcePath(resource))
-      updateCache(resource, VersionedAcls(Set(), 0))
+      updateCache(resource, NoAcls)
       updateAclChangedFlag(resource)
       result
     }
@@ -283,7 +286,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       val data = Json.encode(Acl.toJsonCompatibleMap(newAcls))
       val (updateSucceeded, updateVersion) =
         if (newAcls.nonEmpty) {
-         updatePath(path, data, currentVersionedAcls.zkVersion)
+          if (currentVersionedAcls.exists)
+            updatePath(path, data, currentVersionedAcls.zkVersion)
+          else
+            createPath(path, data)
         } else {
           trace(s"Deleting path for $resource because it had no ACLs remaining")
           (zkUtils.conditionalDeletePath(path, currentVersionedAcls.zkVersion), 0)
@@ -316,23 +322,30 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   /**
-    * Updates a zookeeper path with an expected version. If the topic does not exist, it
will create it.
+    * Updates a zookeeper path with an expected version. Fails if the path does not exist.
     * Returns if the update was successful and the new version.
     */
   private def updatePath(path: String, data: String, expectedVersion: Int): (Boolean, Int)
= {
     try {
       zkUtils.conditionalUpdatePersistentPathIfExists(path, data, expectedVersion)
     } catch {
-      case _: ZkNoNodeException =>
-        try {
-          debug(s"Node $path does not exist, attempting to create it.")
-          zkUtils.createPersistentPath(path, data)
-          (true, 0)
-        } catch {
-          case _: ZkNodeExistsException =>
-            debug(s"Failed to create node for $path because it already exists.")
-            (false, 0)
-        }
+      case _: ZkNoNodeException => (false, UnknownZkVersion)
+    }
+  }
+
+  /**
+   * Creates a zookeeper path with the provided data. Fails if the path already exists.
+   * Returns if the create was successful and the new version.
+   */
+  private def createPath(path: String, data: String): (Boolean, Int) = {
+    try {
+      debug(s"Node $path does not exist, attempting to create it.")
+      zkUtils.createPersistentPath(path, data)
+      (true, 0)
+    } catch {
+      case _: ZkNodeExistsException =>
+        debug(s"Failed to create node for $path because it already exists.")
+        (false, UnknownZkVersion)
     }
   }
 
@@ -342,7 +355,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   private def getAclsFromZk(resource: Resource): VersionedAcls = {
     val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource))
-    VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion)
+    aclJson match {
+      case Some(acl) => VersionedAcls(Acl.fromJson(acl), stat.getVersion)
+      case None => NoAcls
+    }
   }
 
   private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {


Mime
View raw message