kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1580; Reject producer requests to internal topics; reviewed by Joel Koshy and Neha Narkhede
Date Fri, 15 Aug 2014 22:50:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d678449b9 -> c6f08b609


KAFKA-1580; Reject producer requests to internal topics; reviewed by Joel Koshy and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6f08b60
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6f08b60
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6f08b60

Branch: refs/heads/trunk
Commit: c6f08b6094924ff3560a8e16090c85ed1a415b03
Parents: d678449
Author: Jonathan Natkins <natty@wibidata.com>
Authored: Fri Aug 15 15:49:54 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Aug 15 15:49:54 2014 -0700

----------------------------------------------------------------------
 .../common/errors/InvalidTopicException.java    | 38 ++++++++++++++++++++
 .../apache/kafka/common/protocol/Errors.java    | 16 +++------
 .../main/scala/kafka/common/ErrorMapping.scala  |  4 ++-
 .../src/main/scala/kafka/server/KafkaApis.scala | 12 +++++--
 .../kafka/api/ProducerFailureHandlingTest.scala |  9 ++++-
 5 files changed, 63 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6f08b60/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
new file mode 100644
index 0000000..1d90b59
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * The client has attempted to perform an operation on an invalid topic.
+ */
+public class InvalidTopicException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidTopicException() {
+        super();
+    }
+
+    public InvalidTopicException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public InvalidTopicException(String message) {
+        super(message);
+    }
+
+    public InvalidTopicException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6f08b60/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 3374bd9..d434f42 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -19,17 +19,7 @@ package org.apache.kafka.common.protocol;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.errors.NetworkException;
-import org.apache.kafka.common.errors.NotLeaderForPartitionException;
-import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
-import org.apache.kafka.common.errors.OffsetOutOfRangeException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.*;
 
 
 /**
@@ -51,7 +41,9 @@ public enum Errors {
     // TODO: errorCode 8, 9, 11
     MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger
than the max message size the server will accept.")),
     OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset
request was too large.")),
-    NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response
was received."));
+    NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response
was received.")),
+    // TODO: errorCode 14, 15, 16
+    INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform
an operation on an invalid topic."));
 
     private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>,
Errors>();
     private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6f08b60/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 5559d26..3fae791 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -46,6 +46,7 @@ object ErrorMapping {
   val OffsetsLoadInProgressCode: Short = 14
   val ConsumerCoordinatorNotAvailableCode: Short = 15
   val NotCoordinatorForConsumerCode: Short = 16
+  val InvalidTopicCode : Short = 17
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -63,7 +64,8 @@ object ErrorMapping {
       classOf[OffsetMetadataTooLargeException].asInstanceOf[Class[Throwable]] -> OffsetMetadataTooLargeCode,
       classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode,
       classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] ->
ConsumerCoordinatorNotAvailableCode,
-      classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode
+      classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode,
+      classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6f08b60/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bb94673..c584b55 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -160,7 +160,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
     val sTime = SystemTime.milliseconds
-    val localProduceResults = appendToLocalLog(produceRequest)
+    val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
     val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
@@ -236,11 +236,15 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Helper method for handling a parsed producer request
    */
-  private def appendToLocalLog(producerRequest: ProducerRequest): Iterable[ProduceResult]
= {
+  private def appendToLocalLog(producerRequest: ProducerRequest, isOffsetCommit: Boolean):
Iterable[ProduceResult] = {
     val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data
     trace("Append [%s] to local log ".format(partitionAndData.toString))
     partitionAndData.map {case (topicAndPartition, messages) =>
       try {
+        if (Topic.InternalTopics.contains(topicAndPartition.topic) &&
+            !(isOffsetCommit && topicAndPartition.topic == OffsetManager.OffsetsTopicName))
{
+          throw new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic))
+        }
         val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
         val info = partitionOpt match {
           case Some(partition) =>
@@ -268,6 +272,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           fatal("Halting due to unrecoverable I/O error while handling produce request: ",
e)
           Runtime.getRuntime.halt(1)
           null
+        case ite: InvalidTopicException =>
+          warn("Produce request with correlation id %d from client %s on partition %s failed
due to %s".format(
+            producerRequest.correlationId, producerRequest.clientId, topicAndPartition, ite.getMessage))
+          new ProduceResult(topicAndPartition, ite)
         case utpe: UnknownTopicOrPartitionException =>
           warn("Produce request with correlation id %d from client %s on partition %s failed
due to %s".format(
                producerRequest.correlationId, producerRequest.clientId, topicAndPartition,
utpe.getMessage))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6f08b60/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 789e74c..39f777b 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.api
 
+import kafka.common.Topic
+import org.apache.kafka.common.errors.InvalidTopicException
 import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 import org.junit.Assert._
@@ -65,7 +67,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
     consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
 
-    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false,
bufferSize = producerBufferSize);
+    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false,
bufferSize = producerBufferSize)
     producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false,
bufferSize = producerBufferSize)
     producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false,
bufferSize = producerBufferSize)
   }
@@ -295,6 +297,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent,
uniqueMessageSize)
   }
 
+  @Test(expected = classOf[InvalidTopicException])
+  def testCannotSendToInternalTopic() {
+    producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
+  }
+
   private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
   {
     val numRecords = 1000


Mime
View raw message