kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 1.0 updated: MINOR: additional check to follower fetch handling (#4433)
Date Thu, 18 Jan 2018 19:39:19 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 91ee274  MINOR: additional check to follower fetch handling  (#4433)
91ee274 is described below

commit 91ee2748f366fa99a6342536698de168e1f85e8e
Author: Edoardo Comar <ecomar@uk.ibm.com>
AuthorDate: Thu Jan 18 19:14:49 2018 +0000

    MINOR: additional check to follower fetch handling  (#4433)
    
    add check to KafkaApis, add unit test specific to follower fetch
    developed with @mimaison
    
    Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>,
Rajini Sivaram <rajinisivaram@googlemail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 26 +++++++++++++---------
 .../kafka/api/AuthorizerIntegrationTest.scala      | 24 ++++++++++++++++++++
 .../kafka/api/EndToEndAuthorizationTest.scala      |  3 ++-
 3 files changed, 42 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8f50ec0..7555e4a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -482,18 +482,24 @@ class KafkaApis(val requestChannel: RequestChannel,
     val nonExistingTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]()
     val authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
 
-    for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) {
-      if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
-        unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+    if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction,
Resource.ClusterResource))    
+      for (topicPartition <- fetchRequest.fetchData.asScala.keys)
+        unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.CLUSTER_AUTHORIZATION_FAILED,
           FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
           FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
-      else if (!metadataCache.contains(topicPartition.topic))
-        nonExistingTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-          FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-          FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
-      else
-        authorizedRequestInfo += (topicPartition -> partitionData)
-    }
+    else
+      for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) {
+        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
+          unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+            FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+            FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+        else if (!metadataCache.contains(topicPartition.topic))
+          nonExistingTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+            FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+            FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+        else
+          authorizedRequestInfo += (topicPartition -> partitionData)
+      }
 
     def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = {
 
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 522fcd3..7ee393e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -274,6 +274,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   private def createFetchRequest = {
     val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
     partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100))
+    requests.FetchRequest.Builder.forConsumer(100, Int.MaxValue, partitionMap).build()
+  }
+
+  private def createFetchFollowerRequest = {
+    val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
+    partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100))
     val version = ApiKeys.FETCH.latestVersion
     requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue, partitionMap).build()
   }
@@ -487,6 +493,24 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
+  def testFetchFollowerRequest() {
+    val key = ApiKeys.FETCH
+    val request = createFetchFollowerRequest
+      
+    removeAllAcls()
+    val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
+
+    val readAcls = topicReadAcl.get(topicResource).get
+    addAndVerifyAcls(readAcls, topicResource)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
+ 
+    val clusterAcls = clusterAcl.get(Resource.ClusterResource).get
+    addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
+  }
+
+  @Test
   def testProduceWithNoTopicAccess() {
     try {
       sendRecords(numRecords, tp)
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 720d8b6..32e0e85 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -61,6 +61,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness
with Sas
 
   override def configureSecurityBeforeServersStart() {
     AclCommand.main(clusterAclArgs)
+    AclCommand.main(topicBrokerReadAclArgs)
   }
 
   val numRecords = 1
@@ -156,8 +157,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness
with Sas
   @Before
   override def setUp() {
     super.setUp()
-    AclCommand.main(topicBrokerReadAclArgs)
     servers.foreach { s =>
+      TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource)
       TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic,
"*"))
     }
     // create the test topic with all the brokers as replicas

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message