kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
Date Mon, 02 Jul 2018 21:11:16 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0db8074  MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
0db8074 is described below

commit 0db8074d492ec96f3e894b484e8f0bb9fdd45544
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Mon Jul 2 22:11:05 2018 +0100

    MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
    
    ZooKeeper listener for change notifications should be created before loading the ACL cache
to avoid timing window if acls are modified when broker is starting up.
    
    Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@confluent.io>
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  |  7 ++--
 .../security/auth/SimpleAclAuthorizerTest.scala    | 37 +++++++++++++++++++++-
 2 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index c5bbfdf..5535258 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -100,9 +100,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
     extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1
 
-    loadCache()
-
+    // Start change listeners first and then populate the cache so that there is no timing
window
+    // between loading cache and processing change notifications.
     startZkChangeListeners()
+    loadCache()
   }
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean
= {
@@ -275,7 +276,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     }
   }
 
-  private def startZkChangeListeners(): Unit = {
+  private[auth] def startZkChangeListeners(): Unit = {
     aclChangeListeners = ZkAclChangeStore.stores
       .map(store => store.createListener(AclChangedNotificationHandler, zkClient))
   }
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 3d1ceb6..5b65a7f 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -19,12 +19,13 @@ package kafka.security.auth
 import java.net.InetAddress
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.UUID
+import java.util.concurrent.{Executors, Semaphore, TimeUnit}
 
 import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.{ZkAclStore, ZooKeeperTestHarness}
 import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
 import org.apache.kafka.common.errors.UnsupportedVersionException
@@ -332,6 +333,40 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     }
   }
 
+  /**
+   * Verify that there is no timing window between loading ACL cache and setting
+   * up ZK change listener. Cache must be loaded before creating change listener
+   * in the authorizer to avoid the timing window.
+   */
+  @Test
+  def testChangeListenerTiming() {
+    val configureSemaphore = new Semaphore(0)
+    val listenerSemaphore = new Semaphore(0)
+    val executor = Executors.newSingleThreadExecutor
+    val simpleAclAuthorizer3 = new SimpleAclAuthorizer {
+      override private[auth] def startZkChangeListeners(): Unit = {
+        configureSemaphore.release()
+        listenerSemaphore.acquireUninterruptibly()
+        super.startZkChangeListeners()
+      }
+    }
+    try {
+      val future = executor.submit(CoreUtils.runnable(simpleAclAuthorizer3.configure(config.originals)))
+      configureSemaphore.acquire()
+      val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+      val acls = Set(new Acl(user1, Deny, "host-1", Read))
+      simpleAclAuthorizer.addAcls(acls, resource)
+
+      listenerSemaphore.release()
+      future.get(10, TimeUnit.SECONDS)
+
+      assertEquals(acls, simpleAclAuthorizer3.getAcls(resource))
+    } finally {
+      simpleAclAuthorizer3.close()
+      executor.shutdownNow()
+    }
+  }
+
   @Test
   def testLocalConcurrentModificationOfResourceAcls() {
     val commonResource = Resource(Topic, "test", LITERAL)


Mime
View raw message