kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/5] kafka git commit: KAFKA-5259; TransactionalId auth implies ProducerId auth
Date Wed, 24 May 2017 22:29:01 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b0d354b..380685f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -93,7 +93,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
         format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
       ApiKeys.forId(request.requestId) match {
-        case ApiKeys.PRODUCE => handleProducerRequest(request)
+        case ApiKeys.PRODUCE => handleProduceRequest(request)
         case ApiKeys.FETCH => handleFetchRequest(request)
         case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
         case ApiKeys.METADATA => handleTopicMetadataRequest(request)
@@ -368,97 +368,107 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle a produce request
    */
-  def handleProducerRequest(request: RequestChannel.Request) {
+  def handleProduceRequest(request: RequestChannel.Request) {
     val produceRequest = request.body[ProduceRequest]
     val numBytesAppended = request.header.toStruct.sizeOf + request.bodyAndSize.size
 
-    if (produceRequest.isTransactional && !authorize(request.session, Write, new Resource(ProducerTransactionalId, produceRequest.transactionalId())))
-      sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-    else if (produceRequest.isIdempotent && !authorize(request.session, Write, Resource.ProducerIdResource))
-      sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED.exception))
-    else {
-      val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
-        produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
-          authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
-        }
+    def sendErrorResponse(error: Errors): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        produceRequest.getErrorResponse(requestThrottleMs, error.exception))
+    }
 
-      val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
-        case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic))
+    if (produceRequest.isTransactional) {
+      if (!authorize(request.session, Write, new Resource(TransactionalId, produceRequest.transactionalId))) {
+        sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
+        return
       }
+      // Note that authorization to a transactionalId implies ProducerId authorization
 
-      // the callback for sending a produce response
-      def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+    } else if (produceRequest.isIdempotent && !authorize(request.session, IdempotentWrite, Resource.ClusterResource)) {
+      sendErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
+      return
+    }
+
+    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
+      produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
+        authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
+      }
 
-        val mergedResponseStatus = responseStatus ++
-            unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
-            nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
+    val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
+      case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic))
+    }
 
-        var errorInResponse = false
+    // the callback for sending a produce response
+    def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
 
-        mergedResponseStatus.foreach { case (topicPartition, status) =>
-          if (status.error != Errors.NONE) {
-            errorInResponse = true
-            debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
-              request.header.correlationId,
-              request.header.clientId,
-              topicPartition,
-              status.error.exceptionName))
-          }
+      val mergedResponseStatus = responseStatus ++
+        unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
+        nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
+
+      var errorInResponse = false
+
+      mergedResponseStatus.foreach { case (topicPartition, status) =>
+        if (status.error != Errors.NONE) {
+          errorInResponse = true
+          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
+            request.header.correlationId,
+            request.header.clientId,
+            topicPartition,
+            status.error.exceptionName))
         }
+      }
 
-        def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
-          if (produceRequest.acks == 0) {
-            // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
-            // the request, since no response is expected by the producer, the server will close socket server so that
-            // the producer client will know that some error has happened and will refresh its metadata
-            val action =
-              if (errorInResponse) {
-                val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
-                  topicPartition -> status.error.exceptionName
-                }.mkString(", ")
-                info(
-                  s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
-                    s"from client id ${request.header.clientId} with ack=0\n" +
-                    s"Topic and partition to exceptions: $exceptionsSummary"
-                )
-                RequestChannel.CloseConnectionAction
-              } else RequestChannel.NoOpAction
-            sendResponseExemptThrottle(new RequestChannel.Response(request, None, action))
-          } else {
-            sendResponseMaybeThrottle(request, requestThrottleMs =>
-              new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs)
+      def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
+        if (produceRequest.acks == 0) {
+          // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
+          // the request, since no response is expected by the producer, the server will close socket server so that
+          // the producer client will know that some error has happened and will refresh its metadata
+          val action =
+          if (errorInResponse) {
+            val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
+              topicPartition -> status.error.exceptionName
+            }.mkString(", ")
+            info(
+              s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
+                s"from client id ${request.header.clientId} with ack=0\n" +
+                s"Topic and partition to exceptions: $exceptionsSummary"
             )
-          }
+            RequestChannel.CloseConnectionAction
+          } else RequestChannel.NoOpAction
+          sendResponseExemptThrottle(new RequestChannel.Response(request, None, action))
+        } else {
+          sendResponseMaybeThrottle(request, requestThrottleMs =>
+            new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs))
         }
+      }
 
-        // When this callback is triggered, the remote API call has completed
-        request.apiRemoteCompleteTimeNanos = time.nanoseconds
+      // When this callback is triggered, the remote API call has completed
+      request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
-        quotas.produce.recordAndMaybeThrottle(
-          request.session.sanitizedUser,
-          request.header.clientId,
-          numBytesAppended,
-          produceResponseCallback)
-      }
+      quotas.produce.recordAndMaybeThrottle(
+        request.session.sanitizedUser,
+        request.header.clientId,
+        numBytesAppended,
+        produceResponseCallback)
+    }
 
-      if (authorizedRequestInfo.isEmpty)
-        sendResponseCallback(Map.empty)
-      else {
-        val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
-
-        // call the replica manager to append messages to the replicas
-        replicaManager.appendRecords(
-          timeout = produceRequest.timeout.toLong,
-          requiredAcks = produceRequest.acks,
-          internalTopicsAllowed = internalTopicsAllowed,
-          isFromClient = true,
-          entriesPerPartition = authorizedRequestInfo,
-          responseCallback = sendResponseCallback)
-
-        // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
-        // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log
-        produceRequest.clearPartitionRecords()
-      }
+    if (authorizedRequestInfo.isEmpty)
+      sendResponseCallback(Map.empty)
+    else {
+      val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
+
+      // call the replica manager to append messages to the replicas
+      replicaManager.appendRecords(
+        timeout = produceRequest.timeout.toLong,
+        requiredAcks = produceRequest.acks,
+        internalTopicsAllowed = internalTopicsAllowed,
+        isFromClient = true,
+        entriesPerPartition = authorizedRequestInfo,
+        responseCallback = sendResponseCallback)
+
+      // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
+      // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log
+      produceRequest.clearPartitionRecords()
     }
   }
 
@@ -1052,13 +1062,16 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleFindCoordinatorRequest(request: RequestChannel.Request) {
     val findCoordinatorRequest = request.body[FindCoordinatorRequest]
 
-    if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
-      !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) {
-      sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new FindCoordinatorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode))
-    } else {
-      // TODO: Authorize by transactional id if coordinator type is TRANSACTION
+    def sendErrorResponse(error: Errors): Unit =
+      sendResponseMaybeThrottle(request, requestThrottleMs => new FindCoordinatorResponse(error, Node.noNode))
 
+    if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
+        !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey)))
+      sendErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
+    else if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION &&
+        !authorize(request.session, Describe, new Resource(TransactionalId, findCoordinatorRequest.coordinatorKey)))
+      sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
+    else {
       // get metadata (and create the topic if necessary)
       val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match {
         case FindCoordinatorRequest.CoordinatorType.GROUP =>
@@ -1102,18 +1115,18 @@ class KafkaApis(val requestChannel: RequestChannel,
     val describeRequest = request.body[DescribeGroupsRequest]
 
     val groups = describeRequest.groupIds.asScala.map { groupId =>
-        if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
-          groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
-        } else {
-          val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-          val members = summary.members.map { member =>
-            val metadata = ByteBuffer.wrap(member.metadata)
-            val assignment = ByteBuffer.wrap(member.assignment)
-            new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
-          }
-          groupId -> new DescribeGroupsResponse.GroupMetadata(error, summary.state, summary.protocolType,
-            summary.protocol, members.asJava)
+      if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
+        groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
+      } else {
+        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+        val members = summary.members.map { member =>
+          val metadata = ByteBuffer.wrap(member.metadata)
+          val assignment = ByteBuffer.wrap(member.assignment)
+          new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
         }
+        groupId -> new DescribeGroupsResponse.GroupMetadata(error, summary.state, summary.protocolType,
+          summary.protocol, members.asJava)
+      }
     }.toMap
 
     sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeGroupsResponse(requestThrottleMs, groups.asJava))
@@ -1414,31 +1427,36 @@ class KafkaApis(val requestChannel: RequestChannel,
     val initProducerIdRequest = request.body[InitProducerIdRequest]
     val transactionalId = initProducerIdRequest.transactionalId
 
+    def sendErrorResponse(error: Errors): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs => new InitProducerIdResponse(requestThrottleMs, error))
+    }
 
-    if (!authorize(request.session, Write, Resource.ProducerIdResource)) {
-      sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new InitProducerIdResponse(requestThrottleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED))
-    } else if (transactionalId == null || authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) {
-      // Send response callback
-      def sendResponseCallback(result: InitProducerIdResult): Unit = {
-        def createResponse(requestThrottleMs: Int): AbstractResponse = {
-          val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch)
-          trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
-          responseBody
-        }
-        sendResponseMaybeThrottle(request, createResponse)
+    if (transactionalId != null) {
+      if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) {
+        sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
+        return
       }
-      txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
-    }else
-      sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new InitProducerIdResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+    } else if (!authorize(request.session, IdempotentWrite, Resource.ClusterResource)) {
+      sendErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
+      return
+    }
+
+    def sendResponseCallback(result: InitProducerIdResult): Unit = {
+      def createResponse(requestThrottleMs: Int): AbstractResponse = {
+        val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch)
+        trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
+    }
+    txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
   }
 
   def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
     val endTxnRequest = request.body[EndTxnRequest]
     val transactionalId = endTxnRequest.transactionalId
 
-    if(authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) {
+    if (authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) {
       def sendResponseCallback(error: Errors) {
         def createResponse(requestThrottleMs: Int): AbstractResponse = {
           val responseBody = new EndTxnResponse(requestThrottleMs, error)
@@ -1471,6 +1489,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
+      trace(s"End transaction marker append for producer id $producerId completed with status: $responseStatus")
       errors.put(producerId, responseStatus.mapValues(_.error).asJava)
 
       val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse) =>
@@ -1524,9 +1543,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     val transactionalId = addPartitionsToTxnRequest.transactionalId
     val partitionsToAdd = addPartitionsToTxnRequest.partitions
 
-    if(!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId)))
+    if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception()))
+        addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
     else {
       val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())}
 
@@ -1535,24 +1554,24 @@ class KafkaApis(val requestChannel: RequestChannel,
           authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
         }
 
-      val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
-        tp => authorize(request.session, Write, new Resource(Topic, tp.topic))
+      val unauthorizedForWriteRequestInfo = existingAndAuthorizedForDescribeTopics.filterNot { tp =>
+        authorize(request.session, Write, new Resource(Topic, tp.topic))
       }
 
       if (nonExistingOrUnauthorizedForDescribeTopics.nonEmpty
         || unauthorizedForWriteRequestInfo.nonEmpty
         || internalTopics.nonEmpty) {
 
-        // Only send back error responses for the partitions that failed. If there are any partition failures
-        // then the entire request fails
-        val partitionErrors = unauthorizedForWriteRequestInfo.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) }.toMap ++
-          nonExistingOrUnauthorizedForDescribeTopics.map { tp => (tp, Errors.UNKNOWN_TOPIC_OR_PARTITION) }.toMap ++
-          internalTopics.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) }
+        // Any failed partition check causes the entire request to fail. We only send back error responses
+        // for the partitions that failed to avoid needing to send an ambiguous error code for the partitions
+        // which succeeded.
+        val partitionErrors = (unauthorizedForWriteRequestInfo.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
+          nonExistingOrUnauthorizedForDescribeTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++
+          internalTopics.map(_ ->Errors.TOPIC_AUTHORIZATION_FAILED)).toMap
 
         sendResponseMaybeThrottle(request, requestThrottleMs =>
           new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava))
       } else {
-        // Send response callback
         def sendResponseCallback(error: Errors): Unit = {
           def createResponse(requestThrottleMs: Int): AbstractResponse = {
             val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(requestThrottleMs,
@@ -1565,14 +1584,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
 
         txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
-          addPartitionsToTxnRequest.producerId(),
-          addPartitionsToTxnRequest.producerEpoch(),
+          addPartitionsToTxnRequest.producerId,
+          addPartitionsToTxnRequest.producerEpoch,
           partitionsToAdd.asScala.toSet,
           sendResponseCallback)
       }
     }
-
-
   }
 
   def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = {
@@ -1581,44 +1598,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groupId = addOffsetsToTxnRequest.consumerGroupId
     val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
-    if (!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId)))
+    if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
     else if (!authorize(request.session, Read, new Resource(Group, groupId)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
     else {
-        // Send response callback
-        def sendResponseCallback(error: Errors): Unit = {
-          def createResponse(requestThrottleMs: Int): AbstractResponse = {
-            val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(requestThrottleMs, error)
-            trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
-            responseBody
-          }
-          sendResponseMaybeThrottle(request, createResponse)
+      def sendResponseCallback(error: Errors): Unit = {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(requestThrottleMs, error)
+          trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId on partition " +
+            s"$offsetTopicPartition: errors: $error from client ${request.header.clientId}")
+          responseBody
         }
-
-        txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
-          addOffsetsToTxnRequest.producerId,
-          addOffsetsToTxnRequest.producerEpoch,
-          Set(offsetTopicPartition),
-          sendResponseCallback)
+        sendResponseMaybeThrottle(request, createResponse)
       }
-    }
 
+      txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
+        addOffsetsToTxnRequest.producerId,
+        addOffsetsToTxnRequest.producerEpoch,
+        Set(offsetTopicPartition),
+        sendResponseCallback)
+    }
+  }
 
   def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = {
     val header = request.header
     val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest]
-    // reject the request if not authorized to the group
-    if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId))) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val results = txnOffsetCommitRequest.offsets.keySet.asScala.map { topicPartition =>
-        (topicPartition, error)
-      }.toMap
+
+    def sendErrorResponse(error: Errors): Unit = {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new TxnOffsetCommitResponse(requestThrottleMs, results.asJava))
-    } else {
+        txnOffsetCommitRequest.getErrorResponse(requestThrottleMs, error.exception))
+    }
+
+    // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization
+    // since it is implied by transactionalId authorization
+    if (!authorize(request.session, Write, new Resource(TransactionalId, txnOffsetCommitRequest.transactionalId)))
+      sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
+    else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId)))
+      sendErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
+    else {
       val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = txnOffsetCommitRequest.offsets.asScala.toMap.partition {
         case (topicPartition, _) =>
           val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 0198d38..c464834 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -42,6 +42,7 @@ import org.apache.kafka.common.KafkaException
 import kafka.admin.AdminUtils
 import kafka.log.LogConfig
 import kafka.network.SocketServer
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
 
@@ -55,6 +56,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val createTopic = "topic-new"
   val deleteTopic = "topic-delete"
   val transactionalId = "transactional.id"
+  val producerId = 83392L
   val part = 0
   val correlationId = 0
   val clientId = "client-Id"
@@ -64,22 +66,24 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val topicResource = new Resource(Topic, topic)
   val groupResource = new Resource(Group, group)
   val deleteTopicResource = new Resource(Topic, deleteTopic)
-  val producerTransactionalIdResource = new Resource(ProducerTransactionalId, transactionalId)
+  val transactionalIdResource = new Resource(TransactionalId, transactionalId)
 
   val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
+  val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
   val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
   val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
+  val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)))
   val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
   val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
   val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
   val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs)))
   val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs)))
-  val producerTransactionalIdWriteAcl = Map(producerTransactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
+  val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
+  val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
 
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
   val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
-  var transactionalProducer: KafkaProducer[Array[Byte], Array[Byte]] = _
 
   val producerCount = 1
   val consumerCount = 2
@@ -115,7 +119,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse],
       ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse],
       ApiKeys.DESCRIBE_CONFIGS -> classOf[DescribeConfigsResponse],
-      ApiKeys.ALTER_CONFIGS -> classOf[AlterConfigsResponse]
+      ApiKeys.ALTER_CONFIGS -> classOf[AlterConfigsResponse],
+      ApiKeys.INIT_PRODUCER_ID -> classOf[InitProducerIdResponse],
+      ApiKeys.WRITE_TXN_MARKERS -> classOf[WriteTxnMarkersResponse],
+      ApiKeys.ADD_PARTITIONS_TO_TXN -> classOf[AddPartitionsToTxnResponse],
+      ApiKeys.ADD_OFFSETS_TO_TXN -> classOf[AddOffsetsToTxnResponse],
+      ApiKeys.END_TXN -> classOf[EndTxnResponse],
+      ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse]
   )
 
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@@ -140,17 +150,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) =>
       resp.configs.get(new RResource(RResourceType.TOPIC, tp.topic)).error.error),
     ApiKeys.ALTER_CONFIGS -> ((resp: AlterConfigsResponse) =>
-      resp.errors.get(new RResource(RResourceType.TOPIC, tp.topic)).error)
+      resp.errors.get(new RResource(RResourceType.TOPIC, tp.topic)).error),
+    ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error),
+    ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errors(producerId).get(tp)),
+    ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)),
+    ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => resp.error),
+    ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error),
+    ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp))
   )
 
   val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
     ApiKeys.METADATA -> topicDescribeAcl,
-    ApiKeys.PRODUCE -> topicWriteAcl,
+    ApiKeys.PRODUCE -> (topicWriteAcl ++ transactionIdWriteAcl ++ clusterIdempotentWriteAcl),
     ApiKeys.FETCH -> topicReadAcl,
     ApiKeys.LIST_OFFSETS -> topicDescribeAcl,
     ApiKeys.OFFSET_COMMIT -> (topicReadAcl ++ groupReadAcl),
     ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupReadAcl),
-    ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupReadAcl),
+    ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupDescribeAcl ++ transactionalIdDescribeAcl),
     ApiKeys.UPDATE_METADATA_KEY -> clusterAcl,
     ApiKeys.JOIN_GROUP -> groupReadAcl,
     ApiKeys.SYNC_GROUP -> groupReadAcl,
@@ -163,7 +179,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
     ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
     ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl,
-    ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl
+    ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl,
+    ApiKeys.INIT_PRODUCER_ID -> (transactionIdWriteAcl ++ clusterIdempotentWriteAcl),
+    ApiKeys.WRITE_TXN_MARKERS -> clusterAcl,
+    ApiKeys.ADD_PARTITIONS_TO_TXN -> (topicWriteAcl ++ transactionIdWriteAcl),
+    ApiKeys.ADD_OFFSETS_TO_TXN -> (groupReadAcl ++ transactionIdWriteAcl),
+    ApiKeys.END_TXN -> transactionIdWriteAcl,
+    ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl)
   )
 
   @Before
@@ -177,14 +199,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
         maxBlockMs = 3000,
         acks = 1)
 
-    val transactionalProperties = new Properties()
-    transactionalProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    transactionalProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
-    transactionalProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 3,
-      props = Some(transactionalProperties)
-    )
-
     for (_ <- 0 until consumerCount)
       consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
 
@@ -204,7 +218,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     producers.foreach(_.close())
     consumers.foreach(_.wakeup())
     consumers.foreach(_.close())
-    transactionalProducer.close()
     removeAllAcls()
     super.tearDown()
   }
@@ -338,17 +351,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     )
 
     for ((key, request) <- requestKeyToRequest) {
-      removeAllAcls
+      removeAllAcls()
       val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
       sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false)
 
       val resourceToAcls = requestKeysToAcls(key)
-      resourceToAcls.get(topicResource).map { acls =>
+      resourceToAcls.get(topicResource).foreach { acls =>
         val describeAcls = topicDescribeAcl(topicResource)
         val isAuthorized =  describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
         sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true)
-        removeAllAcls
+        removeAllAcls()
       }
 
       for ((resource, acls) <- resourceToAcls)
@@ -377,17 +390,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     )
 
     for ((key, request) <- requestKeyToRequest) {
-      removeAllAcls
+      removeAllAcls()
       val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
       sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false)
 
       val resourceToAcls = requestKeysToAcls(key)
-      resourceToAcls.get(topicResource).map { acls =>
+      resourceToAcls.get(topicResource).foreach { acls =>
         val describeAcls = topicDescribeAcl(topicResource)
         val isAuthorized =  describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
         sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
-        removeAllAcls
+        removeAllAcls()
       }
 
       for ((resource, acls) <- resourceToAcls)
@@ -852,44 +865,182 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test(expected = classOf[TransactionalIdAuthorizationException])
-  def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnInitTransactions(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
-    transactionalProducer.initTransactions()
+  def testTransactionalProducerInitTransactionsNoWriteTransactionalIdAcl(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), transactionalIdResource)
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+  }
+
+  @Test(expected = classOf[TransactionalIdAuthorizationException])
+  def testTransactionalProducerInitTransactionsNoDescribeTransactionalIdAcl(): Unit = {
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+  }
+
+  @Test
+  def testSendOffsetsWithNoConsumerGroupDescribeAccess(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+    producer.beginTransaction()
+    try {
+      producer.sendOffsetsToTransaction(Map(new TopicPartition(topic, 0) -> new OffsetAndMetadata(0L)).asJava, group)
+      fail("Should have raised GroupAuthorizationException")
+    } catch {
+      case e: GroupAuthorizationException =>
+    }
+  }
+
+  @Test
+  def testSendOffsetsWithNoConsumerGroupWriteAccess(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+    producer.beginTransaction()
+    try {
+      producer.sendOffsetsToTransaction(Map(new TopicPartition(topic, 0) -> new OffsetAndMetadata(0L)).asJava, group)
+      fail("Should have raised GroupAuthorizationException")
+    } catch {
+      case e: GroupAuthorizationException =>
+    }
+  }
+
+  @Test
+  def testIdempotentProducerNoIdempotentWriteAclInInitProducerId(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    val producer = buildIdempotentProducer()
+    try {
+      // the InitProducerId is sent asynchronously, so we expect the error either in the callback
+      // or raised from send itself
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get()
+      fail("Should have raised ClusterAuthorizationException")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
+    }
+    try {
+      // the second time, the call to send itself should fail (the producer becomes unusable
+      // if no producerId can be obtained)
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes))
+      fail("Should have raised ClusterAuthorizationException")
+    } catch {
+      case e: KafkaException =>
+        assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
+    }
+  }
+
+  @Test
+  def testIdempotentProducerNoIdempotentWriteAclInProduce(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
+
+    val producer = buildIdempotentProducer()
+
+    // first send should be fine since we have permission to get a ProducerId
+    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get()
+
+    // revoke the IdempotentWrite permission
+    removeAllAcls()
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+
+    try {
+      // the send should now fail with a cluster auth error
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get()
+      fail("Should have raised ClusterAuthorizationException")
+    } catch {
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
+    }
+    try {
+      // the second time, the call to send itself should fail (the producer becomes unusable
+      // if no producerId can be obtained)
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes))
+      fail("Should have raised ClusterAuthorizationException")
+    } catch {
+      case e: KafkaException =>
+        assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
+    }
   }
 
   @Test
   def shouldInitTransactionsWhenAclSet(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
-    transactionalProducer.initTransactions()
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
   }
 
+  @Test
+  def testTransactionalProducerTopicAuthorizationExceptionInSendCallback(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    // add describe access so that we can fetch metadata
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+    producer.beginTransaction()
+    try {
+      producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
+      Assert.fail("expected TopicAuthorizationException")
+    } catch {
+      case e: ExecutionException =>
+        e.getCause match {
+          case cause: TopicAuthorizationException =>
+            assertEquals(Set(topic), cause.unauthorizedTopics().asScala)
+          case other =>
+            fail("Unexpected failure cause in send callback")
+        }
+    }
+  }
+
+  @Test
+  def testTransactionalProducerTopicAuthorizationExceptionInCommit(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    // add describe access so that we can fetch metadata
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+    producer.beginTransaction()
+    try {
+      producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes))
+      producer.commitTransaction()
+      Assert.fail("expected TopicAuthorizationException")
+    } catch {
+      case e: TopicAuthorizationException =>
+        assertEquals(Set(topic), e.unauthorizedTopics().asScala)
+    }
+  }
 
   @Test
   def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
-    transactionalProducer.initTransactions()
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
     removeAllAcls()
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
     try {
-      transactionalProducer.beginTransaction()
-      transactionalProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get
+      producer.beginTransaction()
+      producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
       Assert.fail("expected TransactionalIdAuthorizationException")
     } catch {
-      case e: ExecutionException => assertTrue(s"expected TransactionalIdAuthorizationException, but got ${e.getCause}", e.getCause.isInstanceOf[TransactionalIdAuthorizationException])
+      case e: ExecutionException => assertTrue(s"expected TransactionalIdAuthorizationException, but got ${e.getCause}",
+        e.getCause.isInstanceOf[TransactionalIdAuthorizationException])
     }
   }
 
   @Test
   def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnEndTransaction(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
-    transactionalProducer.initTransactions()
-    transactionalProducer.beginTransaction()
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+    producer.beginTransaction()
+    producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
+    producer.flush()
     removeAllAcls()
     try {
-      transactionalProducer.commitTransaction()
+      producer.commitTransaction()
       Assert.fail("expected TransactionalIdAuthorizationException")
     } catch {
       case _: TransactionalIdAuthorizationException => // ok
@@ -898,50 +1049,27 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource)
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), groupResource)
-    transactionalProducer.initTransactions()
-    transactionalProducer.beginTransaction()
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+    producer.beginTransaction()
     removeAllAcls()
     try {
-      val offsets: util.Map[TopicPartition, OffsetAndMetadata] = Map(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition) -> new OffsetAndMetadata(1L)).asJava
-      transactionalProducer.sendOffsetsToTransaction(offsets, group)
+      val offsets: util.Map[TopicPartition, OffsetAndMetadata] = Map(new TopicPartition(tp.topic, tp.partition) -> new OffsetAndMetadata(1L)).asJava
+      producer.sendOffsetsToTransaction(offsets, group)
       Assert.fail("expected TransactionalIdAuthorizationException")
     } catch {
       case _: TransactionalIdAuthorizationException => // ok
     }
   }
 
-
-  @Test
-  def shouldThrowProducerIdAuthorizationExceptionWhenAclNotSet(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
-    val idempotentProperties = new Properties()
-    idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    val idempotentProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 3,
-      props = Some(idempotentProperties)
-    )
-    try {
-      idempotentProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get
-      Assert.fail("expected ProducerIdAuthorizationException")
-    } catch {
-      case e: ExecutionException => assertTrue(s"expected ProducerIdAuthorizationException, but got ${e.getCause}", e.getCause.isInstanceOf[ProducerIdAuthorizationException])
-    }
-  }
-
   @Test
   def shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL(): Unit = {
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
-    val idempotentProperties = new Properties()
-    idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    val idempotentProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 3,
-      props = Some(idempotentProperties)
-    )
-    idempotentProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get
+    val producer = buildIdempotentProducer()
+    producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
   }
 
   def removeAllAcls() = {
@@ -1032,4 +1160,24 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     requests.OffsetFetchResponse.parse(response, request.version)
   }
 
+  private def buildTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
+    val transactionalProperties = new Properties()
+    transactionalProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
+    val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 3,
+      props = Some(transactionalProperties))
+    producers += producer
+    producer
+  }
+
+  private def buildIdempotentProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
+    val idempotentProperties = new Properties()
+    idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 3,
+      props = Some(idempotentProperties))
+    producers += producer
+    producer
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 5d46348..1b88f40 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -77,17 +77,17 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
   def testBrokerFailure() {
     // basic idea is to seed a topic with 10000 records, and copy it transactionally while bouncing brokers
     // constantly through the period.
-    val consumerGroup=  "myGroup"
+    val consumerGroup = "myGroup"
     val numInputRecords = 5000
     createTopics()
 
     TestUtils.seedTopicWithNumberedRecords(inputTopic, numInputRecords, servers)
-
     val consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic))
-    val producer = TestUtils.createTransactionalProducer("my-test-producer-t.id", servers)
+    val producer = TestUtils.createTransactionalProducer("test-txn", servers)
 
-    val scheduler = new BounceScheduler
     producer.initTransactions()
+
+    val scheduler = new BounceScheduler
     scheduler.start()
 
     var numMessagesProcessed = 0
@@ -97,16 +97,17 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
         val toRead = Math.min(200, numInputRecords - numMessagesProcessed)
         trace(s"$iteration: About to read $toRead messages, processed $numMessagesProcessed so far..")
         val records = TestUtils.pollUntilAtLeastNumRecords(consumer, toRead)
-        trace(s"received ${records.size} messages. sending them transactionally to $outputTopic")
+        trace(s"Received ${records.size} messages, sending them transactionally to $outputTopic")
+
         producer.beginTransaction()
-        val shouldAbort = iteration % 2 == 0
-        records.zipWithIndex.foreach { case (record, i) =>
-          producer.send(
-            TestUtils.producerRecordWithExpectedTransactionStatus(outputTopic, record.key, record.value, !shouldAbort),
-            new ErrorLoggingCallback(outputTopic, record.key, record.value, true))
+        val shouldAbort = iteration % 3 == 0
+        records.foreach { record =>
+          producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(outputTopic, record.key, record.value,
+            !shouldAbort), new ErrorLoggingCallback(outputTopic, record.key, record.value, true))
         }
         trace(s"Sent ${records.size} messages. Committing offsets.")
         producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroup)
+
         if (shouldAbort) {
           trace(s"Committed offsets. Aborting transaction of ${records.size} messages.")
           producer.abortTransaction()
@@ -125,8 +126,8 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
 
     scheduler.shutdown()
 
-    val verifyingConsumer = createConsumerAndSubscribeToTopics("randoGroup", List(outputTopic), readCommitted = true)
-    val outputRecords = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).map { case(record) =>
+    val verifyingConsumer = createConsumerAndSubscribeToTopics("randomGroup", List(outputTopic), readCommitted = true)
+    val outputRecords = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).map { record =>
       TestUtils.assertCommittedAndGetValue(record).toInt
     }
     val recordSet = outputRecords.toSet
@@ -142,7 +143,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
     val props = new Properties()
     if (readCommitted)
       props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
-    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200")
+    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000")
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000")
     props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
@@ -157,8 +158,8 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
   private def createTopics() =  {
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
-    TestUtils.createTopic(zkUtils, inputTopic, numPartitions, numServers, servers, topicConfig)
-    TestUtils.createTopic(zkUtils, outputTopic, numPartitions, numServers, servers, topicConfig)
+    TestUtils.createTopic(zkUtils, inputTopic, numPartitions, 3, servers, topicConfig)
+    TestUtils.createTopic(zkUtils, outputTopic, numPartitions, 3, servers, topicConfig)
   }
 
   private class BounceScheduler extends ShutdownableThread("daemon-broker-bouncer", false) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index ec6b3ea..fd9d884 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -120,15 +120,15 @@ class TransactionsTest extends KafkaServerTestHarness {
     consumer.subscribe(List(topic1))
     producer.initTransactions()
 
-    val random = new Random()
     var shouldCommit = false
     var recordsProcessed = 0
     try {
       while (recordsProcessed < numSeedMessages) {
+        val records = TestUtils.pollUntilAtLeastNumRecords(consumer, Math.min(10, numSeedMessages - recordsProcessed))
+
         producer.beginTransaction()
         shouldCommit = !shouldCommit
 
-        val records = TestUtils.pollUntilAtLeastNumRecords(consumer, Math.min(10, numSeedMessages - recordsProcessed))
         records.zipWithIndex.foreach { case (record, i) =>
           val key = new String(record.key(), "UTF-8")
           val value = new String(record.value(), "UTF-8")
@@ -153,7 +153,7 @@ class TransactionsTest extends KafkaServerTestHarness {
       consumer.close()
     }
 
-    // Inspite of random aborts, we should still have exactly 1000 messages in topic2. Ie. we should not
+    // In spite of random aborts, we should still have exactly 1000 messages in topic2. Ie. we should not
     // re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally.
     val verifyingConsumer = transactionalConsumer("foobargroup")
     verifyingConsumer.subscribe(List(topic2))
@@ -334,7 +334,6 @@ class TransactionsTest extends KafkaServerTestHarness {
         fail("Should not be able to send messages from a fenced producer.")
       } catch {
         case e : ProducerFencedException =>
-          producer1.close()
         case e : ExecutionException =>
           assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
         case e : Exception =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index f5b0a06..f379585 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -36,26 +36,33 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
   private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
   private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2"))
   private val BrokerResources = Set(new Resource(Broker, "0"), new Resource(Broker, "1"))
+  private val TransactionalIdResources = Set(new Resource(TransactionalId, "t0"), new Resource(TransactionalId, "t1"))
 
   private val ResourceToCommand = Map[Set[Resource], Array[String]](
     TopicResources -> Array("--topic", "test-1", "--topic", "test-2"),
     Set(Resource.ClusterResource) -> Array("--cluster"),
     GroupResources -> Array("--group", "testGroup-1", "--group", "testGroup-2"),
-    BrokerResources -> Array("--broker", "0", "--broker", "1")
+    BrokerResources -> Array("--broker", "0", "--broker", "1"),
+    TransactionalIdResources -> Array("--transactional-id", "t0", "--transactional-id", "t1")
   )
 
   private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])](
     TopicResources -> (Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs),
       Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete",
         "--operation", "DescribeConfigs", "--operation", "AlterConfigs")),
-    Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operation", "Create", "--operation", "ClusterAction")),
+    Set(Resource.ClusterResource) -> (Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite),
+      Array("--operation", "Create", "--operation", "ClusterAction", "--operation", "DescribeConfigs",
+        "--operation", "AlterConfigs", "--operation", "IdempotentWrite")),
     GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation", "Describe")),
-    BrokerResources -> (Set(DescribeConfigs), Array("--operation", "DescribeConfigs"))
+    BrokerResources -> (Set(DescribeConfigs), Array("--operation", "DescribeConfigs")),
+    TransactionalIdResources -> (Set(Describe, Write), Array("--operation", "Describe", "--operation", "Write"))
   )
 
-  private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]](
+  private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]](
     TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts),
-    Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, Set(Create), Hosts)
+    TransactionalIdResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts),
+    Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, Set(Some(Create),
+      if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts)
   )
 
   private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]](
@@ -64,10 +71,13 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
   )
 
   private val CmdToResourcesToAcl = Map[Array[String], Map[Set[Resource], Set[Acl]]](
-    Array[String]("--producer") -> ProducerResourceToAcls,
+    Array[String]("--producer") -> ProducerResourceToAcls(),
+    Array[String]("--producer", "--idempotent") -> ProducerResourceToAcls(enableIdempotence = true),
     Array[String]("--consumer") -> ConsumerResourceToAcls,
     Array[String]("--producer", "--consumer") -> ConsumerResourceToAcls.map { case (k, v) => k -> (v ++
-      ProducerResourceToAcls.getOrElse(k, Set.empty[Acl])) }
+      ProducerResourceToAcls().getOrElse(k, Set.empty[Acl])) },
+    Array[String]("--producer", "--idempotent", "--consumer") -> ConsumerResourceToAcls.map { case (k, v) => k -> (v ++
+      ProducerResourceToAcls(enableIdempotence = true).getOrElse(k, Set.empty[Acl])) }
   )
 
   @Test
@@ -108,11 +118,11 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
           }
         }
       }
-      testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand, args, brokerProps)
+      testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, args, brokerProps)
     }
   }
 
-  @Test (expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[IllegalArgumentException])
   def testInvalidAuthorizerProperty() {
     val args = Array("--authorizer-properties", "zookeeper.connect " + zkConnect)
     AclCommand.withAuthorizer(new AclCommandOptions(args))(null)
@@ -120,10 +130,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
   private def testRemove(resources: Set[Resource], resourceCmd: Array[String], args: Array[String], brokerProps: Properties) {
     for (resource <- resources) {
-        AclCommand.main(args ++ resourceCmd :+ "--remove" :+ "--force")
-        withAuthorizer(brokerProps) { authorizer =>
-          TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
-        }
+      AclCommand.main(args ++ resourceCmd :+ "--remove" :+ "--force")
+      withAuthorizer(brokerProps) { authorizer =>
+        TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
+      }
     }
   }
 


Mime
View raw message