kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5994; Log ClusterAuthorizationException for all ClusterAction requests
Date Thu, 10 Jan 2019 13:49:28 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8959bd  KAFKA-5994; Log ClusterAuthorizationException for all ClusterAction requests
e8959bd is described below

commit e8959bd766cc6e19f6208fe7ea0a3103cc8fe123
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
AuthorDate: Thu Jan 10 19:18:58 2019 +0530

    KAFKA-5994; Log ClusterAuthorizationException for all ClusterAction requests
    
    Author: Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
    
    Closes #5021 from omkreddy/KAFKA-5994-CLUSTER-AUTH
---
 core/src/main/scala/kafka/server/KafkaApis.scala | 18 ++++++------------
 1 file changed, 6 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6cf2403..7ad5d72 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -181,10 +181,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    if (!isAuthorizedClusterAction(request)) {
-      sendResponseMaybeThrottle(request, throttleTimeMs => leaderAndIsrRequest.getErrorResponse(throttleTimeMs,
-        Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
-    } else if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch())) {
+    authorizeClusterAction(request)
+    if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch())) {
       // When the broker restarts very quickly, it is possible for this broker to receive
request intended
       // for its previous generation so the broker should skip the stale request.
       info("Received LeaderAndIsr request with broker epoch " +
@@ -201,11 +199,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     // We can't have the ensureTopicExists check here since the controller sends it as an
advisory to all brokers so they
     // stop serving data to clients for the topic being deleted
     val stopReplicaRequest = request.body[StopReplicaRequest]
-    if (!isAuthorizedClusterAction(request)) {
-      val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
-      sendResponseMaybeThrottle(request, _ =>
-        new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
-    } else if (isBrokerEpochStale(stopReplicaRequest.brokerEpoch())) {
+    authorizeClusterAction(request)
+    if (isBrokerEpochStale(stopReplicaRequest.brokerEpoch())) {
       // When the broker restarts very quickly, it is possible for this broker to receive
request intended
       // for its previous generation so the broker should skip the stale request.
       info("Received stop replica request with broker epoch " +
@@ -234,9 +229,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val correlationId = request.header.correlationId
     val updateMetadataRequest = request.body[UpdateMetadataRequest]
 
-    if (!isAuthorizedClusterAction(request)) {
-      sendResponseMaybeThrottle(request, _ => new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))
-    } else if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch())) {
+    authorizeClusterAction(request)
+    if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch())) {
       // When the broker restarts very quickly, it is possible for this broker to receive
request intended
       // for its previous generation so the broker should skip the stale request.
       info("Received update metadata request with broker epoch " +


Mime
View raw message