kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5327; Console Consumer should not commit messages not printed (#4546)
Date Mon, 26 Feb 2018 19:26:37 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2591a46  KAFKA-5327; Console Consumer should not commit messages not printed (#4546)
2591a46 is described below

commit 2591a469305532ba79497dd435f58a9051b689db
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Tue Feb 27 03:26:34 2018 +0800

    KAFKA-5327; Console Consumer should not commit messages not printed (#4546)
    
    Ensure that the consumer's offsets are reset prior to closing so that any buffered messages
which haven't been printed are not committed.
---
 .../main/scala/kafka/consumer/BaseConsumer.scala   | 19 ++++++--
 .../main/scala/kafka/tools/ConsoleConsumer.scala   | 14 +++---
 .../unit/kafka/tools/ConsoleConsumerTest.scala     | 53 ++++++++++++++++++++--
 3 files changed, 71 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 04ac2d9..9e49fe4 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -23,6 +23,7 @@ import java.util.regex.Pattern
 import kafka.api.OffsetRequest
 import kafka.common.StreamEndException
 import kafka.message.Message
+import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.header.Headers
@@ -55,10 +56,8 @@ case class BaseConsumerRecord(topic: String,
 
 @deprecated("This class has been deprecated and will be removed in a future release. " +
             "Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
-class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: Option[Long],
whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue)
extends BaseConsumer {
-  import org.apache.kafka.clients.consumer.KafkaConsumer
-
-  val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
+class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: Option[Long],
whitelist: Option[String],
+                       consumer: Consumer[Array[Byte], Array[Byte]], val timeoutMs: Long
= Long.MaxValue) extends BaseConsumer {
   consumerInit()
   var recordIter = consumer.poll(0).iterator
 
@@ -91,6 +90,17 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int],
offset:
     }
   }
 
+  def resetUnconsumedOffsets() {
+    val smallestUnconsumedOffsets = collection.mutable.Map[TopicPartition, Long]()
+    while (recordIter.hasNext) {
+      val record = recordIter.next()
+      val tp = new TopicPartition(record.topic, record.partition)
+      // avoid auto-committing offsets which haven't been consumed
+      smallestUnconsumedOffsets.getOrElseUpdate(tp, record.offset)
+    }
+    smallestUnconsumedOffsets.foreach { case (tp, offset) => consumer.seek(tp, offset)
}
+  }
+
   override def receive(): BaseConsumerRecord = {
     if (!recordIter.hasNext) {
       recordIter = consumer.poll(timeoutMs).iterator
@@ -114,6 +124,7 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int],
offset:
   }
 
   override def cleanup() {
+    resetUnconsumedOffsets()
     this.consumer.close()
   }
 
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 7d2d371..24fa583 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -31,10 +31,10 @@ import kafka.message._
 import kafka.metrics.KafkaMetricsReporter
 import kafka.utils._
 import kafka.utils.Implicits._
-import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.common.errors.{AuthenticationException, WakeupException}
 import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -72,10 +72,11 @@ object ConsoleConsumer extends Logging {
         new OldConsumer(conf.filterSpec, props)
       } else {
         val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
+        val consumer = new KafkaConsumer(getNewConsumerProps(conf), new ByteArrayDeserializer,
new ByteArrayDeserializer)
         if (conf.partitionArg.isDefined)
-          new NewShinyConsumer(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg),
None, getNewConsumerProps(conf), timeoutMs)
+          new NewShinyConsumer(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg),
None, consumer, timeoutMs)
         else
-          new NewShinyConsumer(Option(conf.topicArg), None, None, Option(conf.whitelistArg),
getNewConsumerProps(conf), timeoutMs)
+          new NewShinyConsumer(Option(conf.topicArg), None, None, Option(conf.whitelistArg),
consumer, timeoutMs)
       }
 
     addShutdownHook(consumer, conf)
@@ -202,15 +203,12 @@ object ConsoleConsumer extends Logging {
     }
   }
 
-  def getNewConsumerProps(config: ConsumerConfig): Properties = {
+  private[tools] def getNewConsumerProps(config: ConsumerConfig): Properties = {
     val props = new Properties
-
     props ++= config.consumerProps
     props ++= config.extraConsumerProps
     setAutoOffsetResetValue(config, props)
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
     props
   }
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 364f6b3..9ae8b96 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -17,18 +17,65 @@
 
 package kafka.tools
 
-import java.io.{PrintStream, FileOutputStream}
+import java.io.{FileOutputStream, PrintStream}
 
 import kafka.common.MessageFormatter
-import kafka.consumer.{BaseConsumer, BaseConsumerRecord}
+import kafka.consumer.{BaseConsumer, BaseConsumerRecord, NewShinyConsumer}
 import kafka.utils.{Exit, TestUtils}
+import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.easymock.EasyMock
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
 
 class ConsoleConsumerTest {
 
+  @Before
+  def setup(): Unit = {
+    ConsoleConsumer.messageCount = 0
+  }
+
+  @Test
+  def shouldResetUnConsumedOffsetsBeforeExitForNewConsumer() {
+    val topic = "test"
+    val maxMessages: Int = 123
+    val totalMessages: Int = 700
+    val startOffset: java.lang.Long = 0L
+
+    val mockConsumer = new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.EARLIEST)
+    val tp1 = new TopicPartition(topic, 0)
+    val tp2 = new TopicPartition(topic, 1)
+
+    val consumer = new NewShinyConsumer(Some(topic), None, None, None, mockConsumer)
+
+    mockConsumer.rebalance(List(tp1, tp2).asJava)
+    mockConsumer.updateBeginningOffsets(Map(tp1 -> startOffset, tp2 -> startOffset).asJava)
+
+    0 until totalMessages foreach { i =>
+      // add all records, each partition should have half of `totalMessages`
+      mockConsumer.addRecord(new ConsumerRecord[Array[Byte], Array[Byte]](topic, i % 2, i
/ 2, "key".getBytes, "value".getBytes))
+    }
+
+    // Mocks
+    val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])
+
+    // Expectations
+    EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject())).times(maxMessages)
+    EasyMock.replay(formatter)
+
+    // Test
+    ConsoleConsumer.process(maxMessages, formatter, consumer, System.out, skipMessageOnError
= false)
+    assertEquals(totalMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2))
+
+    consumer.resetUnconsumedOffsets()
+    assertEquals(maxMessages, mockConsumer.position(tp1) + mockConsumer.position(tp2))
+
+    EasyMock.verify(formatter)
+  }
+
   @Test
   def shouldLimitReadsToMaxMessageLimit() {
     //Mocks

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message