kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1650; avoid data loss when mirror maker shutdown uncleanly; reviewed by Guozhang Wang
Date Fri, 05 Dec 2014 23:19:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4fc74958e -> 280162996


KAFKA-1650; avoid data loss when mirror maker shutdown uncleanly; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/28016299
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/28016299
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/28016299

Branch: refs/heads/trunk
Commit: 2801629964882015a9148e1c0ade22da46376faa
Parents: 4fc7495
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Dec 5 15:18:25 2014 -0800
Committer: Guozhang Wang <guwang@linkedin.com>
Committed: Fri Dec 5 15:18:25 2014 -0800

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   |  47 ++-
 .../consumer/ConsumerRebalanceListener.java     |  42 ++
 .../consumer/ZookeeperConsumerConnector.scala   |   4 +
 .../main/scala/kafka/tools/MirrorMaker.scala    | 414 +++++++++++++++----
 .../kafka/utils/ByteBoundedBlockingQueue.scala  |  11 +
 .../ZookeeperConsumerConnectorTest.scala        |  60 ++-
 6 files changed, 493 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/28016299/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index da29a8c..e991d21 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -28,6 +28,7 @@ import kafka.api._
 import kafka.client.ClientUtils
 import kafka.cluster._
 import kafka.common._
+import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.metrics._
 import kafka.network.BlockingChannel
 import kafka.serializer._
@@ -39,6 +40,7 @@ import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener,
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
 import scala.collection._
+import scala.collection.JavaConversions._
 
 
 /**
@@ -102,6 +104,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private val offsetsChannelLock = new Object
 
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
+  private var consumerRebalanceListener: ConsumerRebalanceListener = null
 
   // useful for tracking migration of consumers to store offsets in kafka
   private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "commits", TimeUnit.SECONDS,
Map("clientId" -> config.clientId))
@@ -161,6 +164,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     wildcardStreamsHandler.streams
   }
 
+  def setConsumerRebalanceListener(listener: ConsumerRebalanceListener) {
+    if (messageStreamCreated.get())
+      throw new MessageStreamsExistException(this.getClass.getSimpleName +
+        " can only set consumer rebalance listener before creating streams",null)
+    consumerRebalanceListener = listener
+  }
+
   private def createFetcher() {
     if (enableFetcher)
       fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient))
@@ -286,17 +296,27 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
   }
 
+  /**
+   * KAFKA-1743: This method added for backward compatibility.
+   */
+  def commitOffsets { commitOffsets(true) }
+
   def commitOffsets(isAutoCommit: Boolean) {
+    commitOffsets(isAutoCommit, null)
+  }
+
+  def commitOffsets(isAutoCommit: Boolean,
+                    topicPartitionOffsets: immutable.Map[TopicAndPartition, OffsetAndMetadata])
{
     var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0)
// no retries for commits from auto-commit
     var done = false
 
     while (!done) {
       val committed = offsetsChannelLock synchronized { // committed when we receive either
no error codes or only MetadataTooLarge errors
-        val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos)
=>
+        val offsetsToCommit = if (topicPartitionOffsets == null) {immutable.Map(topicRegistry.flatMap
{ case (topic, partitionTopicInfos) =>
           partitionTopicInfos.map { case (partition, info) =>
             TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
           }
-        }.toSeq:_*)
+        }.toSeq:_*)} else topicPartitionOffsets
 
         if (offsetsToCommit.size > 0) {
           if (config.offsetsStorage == "zookeeper") {
@@ -374,11 +394,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
   }
 
-  /**
-   * KAFKA-1743: This method added for backward compatibility.
-   */
-  def commitOffsets { commitOffsets(true) }
-
   private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = {
     val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
     val offsetString = readDataMaybeNull(zkClient, dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
@@ -653,9 +668,18 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
          * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
          */
         closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
-
+        if (consumerRebalanceListener != null) {
+          info("Calling beforeReleasingPartitions() from rebalance listener.")
+          consumerRebalanceListener.beforeReleasingPartitions(
+            if (topicRegistry.size == 0)
+              new java.util.HashMap[String, java.util.Set[java.lang.Integer]]
+            else
+              mapAsJavaMap(topicRegistry.map(topics =>
+                topics._1 -> topics._2.keys
+              ).toMap).asInstanceOf[java.util.Map[String, java.util.Set[java.lang.Integer]]]
+          )
+        }
         releasePartitionOwnership(topicRegistry)
-
         val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics,
zkClient)
         val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext)
         val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
@@ -711,7 +735,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         case Some(f) =>
           f.stopConnections
           clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
-          info("Committing all offsets after clearing the fetcher queues")
           /**
           * here, we need to commit offsets before stopping the consumer from returning any
more messages
           * from the current data chunk. Since partition ownership is not yet released, this
commit offsets
@@ -720,8 +743,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           * by the consumer, there will be no more messages returned by this iterator until
the rebalancing finishes
           * successfully and the fetchers restart to fetch more data chunks
           **/
-        if (config.autoCommitEnable)
+        if (config.autoCommitEnable) {
+          info("Committing all offsets after clearing the fetcher queues")
           commitOffsets(true)
+        }
         case None =>
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/28016299/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
new file mode 100644
index 0000000..facf509
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.consumer;
+
+import kafka.common.TopicAndPartition;
+import kafka.consumer.ConsumerThreadId;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This listener is used for execution of tasks defined by user when a consumer rebalance
+ * occurs in {@link kafka.consumer.ZookeeperConsumerConnector}
+ */
+public interface ConsumerRebalanceListener {
+
+    /**
+     * This method is called after all the fetcher threads are stopped but before the
+     * ownership of partitions are released. Depending on whether auto offset commit is
+     * enabled or not, offsets may or may not have been committed.
+     * This listener is initially added to prevent duplicate messages on consumer rebalance
+     * in mirror maker, where offset auto commit is disabled to prevent data loss. It could
+     * also be used in more general cases.
+     */
+    public void beforeReleasingPartitions(Map<String, Set<Integer>> partitionOwnership);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/28016299/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 9d5a47f..9baad34 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -115,6 +115,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     underlying.commitOffsets(retryOnFailure)
   }
 
+  def setConsumerRebalanceListener(consumerRebalanceListener: ConsumerRebalanceListener)
{
+    underlying.setConsumerRebalanceListener(consumerRebalanceListener)
+  }
+
   def shutdown() {
     underlying.shutdown
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/28016299/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index f399105..b06ff60 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -17,29 +17,66 @@
 
 package kafka.tools
 
+import com.yammer.metrics.core._
+import kafka.common.{TopicAndPartition, OffsetAndMetadata}
+import kafka.javaapi.consumer.ConsumerRebalanceListener
+import kafka.utils._
 import kafka.consumer._
-import kafka.metrics.KafkaMetricsGroup
-import kafka.producer.{BaseProducer, NewShinyProducer, OldProducer}
 import kafka.serializer._
-import kafka.utils._
-import org.apache.kafka.clients.producer.ProducerRecord
-
-import java.util.Random
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
-import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch, TimeUnit}
+import kafka.producer.{OldProducer, NewShinyProducer}
+import kafka.metrics.KafkaMetricsGroup
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord}
+import org.apache.kafka.common.KafkaException
 
 import scala.collection.JavaConversions._
 
 import joptsimple.OptionParser
+import java.util.Properties
+import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
+import java.util.concurrent._
 
-object MirrorMaker extends Logging {
+/**
+ * The mirror maker consists of three major modules:
+ *  Consumer Threads - The consumer threads consume messages from source Kafka cluster through
+ *                     ZookeeperConsumerConnector and put them into corresponding data channel
queue based on hash value
+ *                     of source topic-partitionId string. This guarantees the message order
in source partition is
+ *                     preserved.
+ *  Producer Threads - Producer threads take messages out of data channel queues and send
them to target cluster. Each
+ *                     producer thread is bound to one data channel queue, so that the message
order is preserved.
+ *  Data Channel - The data channel has multiple queues. The number of queue is same as number
of producer threads.
+ *
+ * If new producer is used, the offset will be committed based on the new producer's callback.
An offset map is
+ * maintained and updated on each send() callback. A separate offset commit thread will commit
the offset periodically.
+ * @note For mirror maker, MaxInFlightRequests of producer should be set to 1 for producer
if the order of the messages
+ *       needs to be preserved. Mirror maker also depends on the in-order delivery to guarantee
no data loss.
+ *       We are not force it to be 1 because in some use cases throughput might be important
whereas out of order or
+ *       minor data loss is acceptable.
+ */
+object MirrorMaker extends Logging with KafkaMetricsGroup {
 
-  private var connectors: Seq[ZookeeperConsumerConnector] = null
+  private var connector: ZookeeperConsumerConnector = null
   private var consumerThreads: Seq[ConsumerThread] = null
   private var producerThreads: Seq[ProducerThread] = null
   private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
-
-  private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes)
+  private var offsetCommitThread: OffsetCommitThread = null
+
+  private val valueFactory = (k: TopicAndPartition) => new Pool[Int, Long]
+  private val topicPartitionOffsetMap: Pool[TopicAndPartition, Pool[Int, Long]] =
+      new Pool[TopicAndPartition, Pool[Int,Long]](Some(valueFactory))
+  // Track the messages unacked for consumer rebalance
+  private var numMessageUnacked: AtomicInteger = new AtomicInteger(0)
+  private var consumerRebalanceListener: MirrorMakerConsumerRebalanceListener = null
+  // This is to indicate whether the rebalance is going on so the producer callback knows
if
+  // the rebalance latch needs to be pulled.
+  private var inRebalance: AtomicBoolean = new AtomicBoolean(false)
+
+  private val shutdownMessage : MirrorMakerRecord = new MirrorMakerRecord("shutdown", 0,
0, null, "shutdown".getBytes)
+
+  newGauge("MirrorMaker-Unacked-Messages",
+    new Gauge[Int] {
+      def value = numMessageUnacked.get()
+    })
 
   def main(args: Array[String]) {
     
@@ -47,12 +84,12 @@ object MirrorMaker extends Logging {
     val parser = new OptionParser
 
     val consumerConfigOpt = parser.accepts("consumer.config",
-      "Consumer config to consume from a source cluster. " +
-      "You may specify multiple of these.")
+      "Embedded consumer config for consuming from the source cluster.")
       .withRequiredArg()
       .describedAs("config file")
       .ofType(classOf[String])
 
+    // Please see note about MaxInflightRequests
     val producerConfigOpt = parser.accepts("producer.config",
       "Embedded producer config.")
       .withRequiredArg()
@@ -83,6 +120,13 @@ object MirrorMaker extends Logging {
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(10000)
 
+    val bufferByteSizeOpt =  parser.accepts("queue.byte.size",
+      "Maximum bytes that can be buffered in each data channel queue")
+      .withRequiredArg()
+      .describedAs("Data channel queue size in terms of number of bytes")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(100000000)
+
     val whitelistOpt = parser.accepts("whitelist",
       "Whitelist of topics to mirror.")
       .withRequiredArg()
@@ -95,6 +139,13 @@ object MirrorMaker extends Logging {
       .describedAs("Java regex (String)")
       .ofType(classOf[String])
 
+    val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
+       "Offset commit interval in ms")
+      .withRequiredArg()
+      .describedAs("offset commit interval in millisecond")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(60000)
+
     val helpOpt = parser.accepts("help", "Print this message.")
     
     if(args.length == 0)
@@ -116,15 +167,28 @@ object MirrorMaker extends Logging {
     val numProducers = options.valueOf(numProducersOpt).intValue()
     val numStreams = options.valueOf(numStreamsOpt).intValue()
     val bufferSize = options.valueOf(bufferSizeOpt).intValue()
+    val bufferByteSize = options.valueOf(bufferByteSizeOpt).intValue()
+    val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
 
-    // create consumer streams
-    connectors = options.valuesOf(consumerConfigOpt).toList
-      .map(cfg => new ConsumerConfig(Utils.loadProps(cfg)))
-      .map(new ZookeeperConsumerConnector(_))
-    val numConsumers = connectors.size * numStreams
+    // create consumer connector
+    val consumerConfigProps = Utils.loadProps(options.valuesOf(consumerConfigOpt).head)
+    val consumerConfig = new ConsumerConfig(consumerConfigProps)
+    connector = new ZookeeperConsumerConnector(consumerConfig)
 
     // create a data channel btw the consumers and the producers
-    val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers)
+    val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numInputs = numStreams,
numOutputs = numProducers)
+
+    // set consumer rebalance listener
+    // Customized consumer rebalance listener should extend MirrorMakerConsumerRebalanceListener
+    // and take datachannel as argument.
+    val customRebalanceListenerClass = consumerConfigProps.getProperty("consumer.rebalance.listener")
+    consumerRebalanceListener = {
+      if (customRebalanceListenerClass == null) {
+        new MirrorMakerConsumerRebalanceListener(mirrorDataChannel)
+      } else
+        Utils.createObject[MirrorMakerConsumerRebalanceListener](customRebalanceListenerClass,
mirrorDataChannel)
+    }
+    connector.setConsumerRebalanceListener(consumerRebalanceListener)
 
     // create producer threads
     val useNewProducer = options.has(useNewProducerOpt)
@@ -134,12 +198,25 @@ object MirrorMaker extends Logging {
       producerProps.setProperty("client.id", clientId + "-" + i)
       val producer =
       if (useNewProducer)
-        new NewShinyProducer(producerProps)
+        new MirrorMakerNewProducer(producerProps)
       else
-        new OldProducer(producerProps)
+        new MirrorMakerOldProducer(producerProps)
       new ProducerThread(mirrorDataChannel, producer, i)
     })
 
+    // create offset commit thread
+    if (useNewProducer) {
+      /**
+       * The offset commit thread periodically commit consumed offsets to the source cluster.
With the new producer,
+       * the offsets are updated upon the returned future metadata of the send() call; with
the old producer,
+       * the offsets are updated upon the consumer's iterator advances. By doing this, it
is guaranteed no data
+       * loss even when mirror maker is uncleanly shutdown with the new producer, while with
the old producer
+       * messages inside the data channel could be lost upon mirror maker unclean shutdown.
+       */
+      offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs)
+      offsetCommitThread.start()
+    }
+
     // create consumer threads
     val filterSpec = if (options.has(whitelistOpt))
       new Whitelist(options.valueOf(whitelistOpt))
@@ -148,14 +225,14 @@ object MirrorMaker extends Logging {
 
     var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil
     try {
-      streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams, new
DefaultDecoder(), new DefaultDecoder())).flatten
+      streams = connector.createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder(),
new DefaultDecoder())
     } catch {
       case t: Throwable =>
         fatal("Unable to create stream - shutting down mirror maker.")
-        connectors.foreach(_.shutdown)
+        connector.shutdown()
     }
     consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1,
mirrorDataChannel, streamAndIndex._2))
-    assert(consumerThreads.size == numConsumers)
+    assert(consumerThreads.size == numStreams)
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
@@ -170,28 +247,51 @@ object MirrorMaker extends Logging {
     // since the consumer threads can hit a timeout/other exception;
     // but in this case the producer should still be able to shutdown
     // based on the shutdown message in the channel
-    producerThreads.foreach(_.awaitShutdown)
+    producerThreads.foreach(_.awaitShutdown())
   }
 
   def cleanShutdown() {
     if (isShuttingdown.compareAndSet(false, true)) {
-      if (connectors != null) connectors.foreach(_.shutdown)
-      if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown)
+      info("Start clean shutdown.")
+      // Consumer threads will exit when isCleanShutdown is set.
+      info("Shutting down consumer threads.")
+      if (consumerThreads != null) {
+        consumerThreads.foreach(_.shutdown())
+        consumerThreads.foreach(_.awaitShutdown())
+      }
+      // After consumer threads exit, shutdown producer.
+      info("Shutting down producer threads.")
       if (producerThreads != null) {
-        producerThreads.foreach(_.shutdown)
-        producerThreads.foreach(_.awaitShutdown)
+        producerThreads.foreach(_.shutdown())
+        producerThreads.foreach(_.awaitShutdown())
+      }
+      // offset commit thread should only be shutdown after producer threads are shutdown,
so we don't lose offsets.
+      info("Shutting down offset commit thread.")
+      if (offsetCommitThread != null) {
+        offsetCommitThread.shutdown()
+        offsetCommitThread.awaitShutdown()
       }
+      // connector can only be shutdown after offsets are committed.
+      info("Shutting down consumer connectors.")
+      if (connector != null)
+        connector.shutdown()
       info("Kafka mirror maker shutdown successfully")
     }
   }
 
-  class DataChannel(capacity: Int, numInputs: Int, numOutputs: Int) extends KafkaMetricsGroup
{
-
-    val queues = new Array[BlockingQueue[ProducerRecord]](numOutputs)
-    for (i <- 0 until numOutputs)
-      queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity)
-
-    private val counter = new AtomicInteger(new Random().nextInt())
+  class DataChannel(messageCapacity: Int, byteCapacity: Int, numInputs: Int, numOutputs:
Int)
+      extends KafkaMetricsGroup {
+
+    val queues = new Array[ByteBoundedBlockingQueue[MirrorMakerRecord]](numOutputs)
+    val channelSizeHists = new Array[Histogram](numOutputs)
+    val channelByteSizeHists = new Array[Histogram](numOutputs)
+    val sizeFunction = (record: MirrorMakerRecord) => record.size
+    for (i <- 0 until numOutputs) {
+      queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](messageCapacity, byteCapacity,
Some(sizeFunction))
+      channelSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-NumMessages".format(i))
+      channelByteSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-Bytes".format(i))
+    }
+    private val channelRecordSizeHist = newHistogram("MirrorMaker-DataChannel-Record-Size")
 
     // We use a single meter for aggregated wait percentage for the data channel.
     // Since meter is calculated as total_recorded_value / time_window and
@@ -199,21 +299,16 @@ object MirrorMaker extends Logging {
     // time should be discounted by # threads.
     private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS)
     private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
-    private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size")
 
-    def put(record: ProducerRecord) {
-      // If the key of the message is empty, use round-robin to select the queue
-      // Otherwise use the queue based on the key value so that same key-ed messages go to
the same queue
+    def put(record: MirrorMakerRecord) {
+      // Use hash of source topic-partition to decide which queue to put the message in.
The benefit is that
+      // we can maintain the message order for both keyed and non-keyed messages.
       val queueId =
-        if(record.key() != null) {
-          Utils.abs(java.util.Arrays.hashCode(record.key())) % numOutputs
-        } else {
-          Utils.abs(counter.getAndIncrement()) % numOutputs
-        }
+        Utils.abs(java.util.Arrays.hashCode((record.sourceTopic + record.sourcePartition).toCharArray))
% numOutputs
       put(record, queueId)
     }
 
-    def put(record: ProducerRecord, queueId: Int) {
+    def put(record: MirrorMakerRecord, queueId: Int) {
       val queue = queues(queueId)
 
       var putSucceed = false
@@ -222,20 +317,27 @@ object MirrorMaker extends Logging {
         putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
         waitPut.mark((SystemTime.nanoseconds - startPutTime) / numInputs)
       }
-      channelSizeHist.update(queue.size)
+      channelSizeHists(queueId).update(queue.size())
+      channelByteSizeHists(queueId).update(queue.byteSize())
+      channelRecordSizeHist.update(sizeFunction(record))
     }
 
-    def take(queueId: Int): ProducerRecord = {
+    def take(queueId: Int): MirrorMakerRecord = {
       val queue = queues(queueId)
-      var data: ProducerRecord = null
+      var data: MirrorMakerRecord = null
       while (data == null) {
         val startTakeTime = SystemTime.nanoseconds
         data = queue.poll(500, TimeUnit.MILLISECONDS)
         waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numOutputs)
       }
-      channelSizeHist.update(queue.size)
+      channelSizeHists(queueId).update(queue.size())
+      channelByteSizeHists(queueId).update(queue.byteSize())
       data
     }
+
+    def clear() {
+      queues.foreach(queue => queue.clear())
+    }
   }
 
   class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
@@ -245,34 +347,44 @@ object MirrorMaker extends Logging {
 
     private val shutdownLatch = new CountDownLatch(1)
     private val threadName = "mirrormaker-consumer-" + threadId
-    private var isCleanShutdown: Boolean = true
     this.logIdent = "[%s] ".format(threadName)
+    private var shutdownFlag: Boolean = false
 
     this.setName(threadName)
 
     override def run() {
       info("Starting mirror maker consumer thread " + threadName)
       try {
-        for (msgAndMetadata <- stream) {
-          val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message)
+        val iter = stream.iterator()
+        while (!shutdownFlag && iter.hasNext()) {
+          val msgAndMetadata = iter.next()
+          val data = new MirrorMakerRecord(msgAndMetadata.topic,
+                                           msgAndMetadata.partition,
+                                           msgAndMetadata.offset,
+                                           msgAndMetadata.key(),
+                                           msgAndMetadata.message())
           mirrorDataChannel.put(data)
         }
       } catch {
         case e: Throwable => {
           fatal("Stream unexpectedly exited.", e)
-          isCleanShutdown = false
         }
       } finally {
         shutdownLatch.countDown()
         info("Consumer thread stopped")
+
         // If it exits accidentally, stop the entire mirror maker.
-        if (!isCleanShutdown) {
+        if (!isShuttingdown.get()) {
           fatal("Consumer thread exited abnormally, stopping the whole mirror maker.")
           System.exit(-1)
         }
       }
     }
 
+    def shutdown() {
+      shutdownFlag = true
+    }
+
     def awaitShutdown() {
       try {
         shutdownLatch.await()
@@ -284,44 +396,44 @@ object MirrorMaker extends Logging {
   }
 
   class ProducerThread (val dataChannel: DataChannel,
-                        val producer: BaseProducer,
+                        val producer: MirrorMakerBaseProducer,
                         val threadId: Int) extends Thread with Logging with KafkaMetricsGroup
{
     private val threadName = "mirrormaker-producer-" + threadId
     private val shutdownComplete: CountDownLatch = new CountDownLatch(1)
-    private var isCleanShutdown: Boolean = true
     this.logIdent = "[%s] ".format(threadName)
 
     setName(threadName)
 
-    override def run {
+    override def run() {
       info("Starting mirror maker producer thread " + threadName)
       try {
         while (true) {
-          val data: ProducerRecord = dataChannel.take(threadId)
-          trace("Sending message with value size %d".format(data.value().size))
+          val data: MirrorMakerRecord = dataChannel.take(threadId)
+          trace("Sending message with value size %d".format(data.value.size))
           if(data eq shutdownMessage) {
             info("Received shutdown message")
             return
           }
-          producer.send(data.topic(), data.key(), data.value())
+          producer.send(new TopicAndPartition(data.sourceTopic, data.sourcePartition),
+                        data.sourceOffset,
+                        data.key,
+                        data.value)
         }
       } catch {
-        case t: Throwable => {
+        case t: Throwable =>
           fatal("Producer thread failure due to ", t)
-          isCleanShutdown = false
-        }
       } finally {
-        shutdownComplete.countDown
+        shutdownComplete.countDown()
         info("Producer thread stopped")
-        // If it exits accidentally, stop the entire mirror maker.
-        if (!isCleanShutdown) {
+        // if it exits accidentally, stop the entire mirror maker
+        if (!isShuttingdown.get()) {
           fatal("Producer thread exited abnormally, stopping the whole mirror maker.")
           System.exit(-1)
         }
       }
     }
 
-    def shutdown {
+    def shutdown() {
       try {
         info("Producer thread " + threadName + " shutting down")
         dataChannel.put(shutdownMessage, threadId)
@@ -333,10 +445,10 @@ object MirrorMaker extends Logging {
       }
     }
 
-    def awaitShutdown {
+    def awaitShutdown() {
       try {
-        shutdownComplete.await
-        producer.close
+        shutdownComplete.await()
+        producer.close()
         info("Producer thread shutdown complete")
       } catch {
         case ie: InterruptedException => {
@@ -345,5 +457,163 @@ object MirrorMaker extends Logging {
       }
     }
   }
+
+  class OffsetCommitThread(commitIntervalMs: Int) extends Thread with Logging with KafkaMetricsGroup
{
+    private val threadName = "mirrormaker-offset-commit-thread"
+    private val shutdownComplete: CountDownLatch = new CountDownLatch(1)
+    this.logIdent = "[%s]".format(threadName)
+    var shutdownFlag: Boolean = false
+    var commitCounter: Int = 0
+
+    this.setName(threadName)
+
+    newGauge("MirrorMaker-Offset-Commit-Counter",
+      new Gauge[Int] {
+        def value = commitCounter
+      })
+
+    /**
+     * Use the connector to commit all the offsets.
+     */
+    override def run() {
+      info("Starting mirror maker offset commit thread")
+      try {
+        while (!shutdownFlag) {
+          Thread.sleep(commitIntervalMs)
+          commitOffset()
+        }
+      } catch {
+        case t: Throwable => fatal("Exits due to", t)
+      } finally {
+        swallow(commitOffset())
+        shutdownComplete.countDown()
+        info("Offset commit thread exited")
+        if (!isShuttingdown.get()) {
+          fatal("Offset commit thread exited abnormally, stopping the whole mirror maker.")
+          System.exit(-1)
+        }
+      }
+    }
+
+    def commitOffset() {
+      val offsetsToCommit = collection.immutable.Map(topicPartitionOffsetMap.map {
+        case (topicPartition, partitionOffsetMap) =>
+        topicPartition -> OffsetAndMetadata(getOffsetToCommit(partitionOffsetMap), null)
+      }.toSeq: _*)
+      trace("committing offset: %s".format(offsetsToCommit))
+      if (connector == null) {
+        warn("No consumer connector available to commit offset.")
+      } else {
+        connector.commitOffsets(isAutoCommit = false, offsetsToCommit)
+        commitCounter += 1
+      }
+    }
+
+    private def getOffsetToCommit(offsetsMap: Pool[Int, Long]): Long = {
+      val offsets = offsetsMap.map(_._2).toSeq.sorted
+      val iter = offsets.iterator
+      var offsetToCommit = iter.next()
+      while (iter.hasNext && offsetToCommit + 1 == iter.next())
+        offsetToCommit += 1
+      // The committed offset will be the first offset of un-consumed message, hence we need
to increment by one.
+      offsetToCommit + 1
+    }
+
+    def shutdown() {
+      shutdownFlag = true
+    }
+
+    def awaitShutdown() {
+      try {
+        shutdownComplete.await()
+        info("Offset commit thread shutdown complete")
+      } catch {
+        case ie: InterruptedException => {
+          warn("Shutdown of the offset commit thread interrupted")
+        }
+      }
+    }
+  }
+
+  private[kafka] trait MirrorMakerBaseProducer {
+    def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte])
+    def close()
+  }
+
+  private class MirrorMakerNewProducer (val producerProps: Properties)
+      extends NewShinyProducer(producerProps) with MirrorMakerBaseProducer {
+
+    override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte],
value: Array[Byte]) {
+      val record = new ProducerRecord(topicPartition.topic, key, value)
+      if(sync) {
+        topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(this.producer.send(record).get().partition(),
offset)
+      } else {
+        this.producer.send(record,
+          new MirrorMakerProducerCallback(topicPartition, offset, key, value))
+        numMessageUnacked.incrementAndGet()
+      }
+    }
+  }
+
+  private class MirrorMakerOldProducer (val producerProps: Properties)
+      extends OldProducer(producerProps) with MirrorMakerBaseProducer {
+
+    override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte],
value: Array[Byte]) {
+      super.send(topicPartition.topic, key, value)
+    }
+
+    override def close() {
+      super.close()
+    }
+  }
+
+  private class MirrorMakerProducerCallback (val topicPartition: TopicAndPartition,
+                                             val offset: Long,
+                                             val key: Array[Byte],
+                                             val value: Array[Byte])
+    extends ErrorLoggingCallback(topicPartition.topic, key, value, false) {
+
+    override def onCompletion(metadata: RecordMetadata, exception: Exception) {
+      if (exception != null) {
+        // Use default call back to log error
+        super.onCompletion(metadata, exception)
+      } else {
+        trace("updating offset:[%s] -> %d".format(topicPartition, offset))
+        topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(metadata.partition(),
offset)
+      }
+      // Notify the rebalance callback only when all the messages handed to producer are
acked.
+      // There is a very slight chance that 1 message is held by producer thread and not
handed to producer.
+      // That message might have duplicate. We are not handling that here.
+      if (numMessageUnacked.decrementAndGet() == 0 && inRebalance.get()) {
+        inRebalance synchronized {inRebalance.notify()}
+      }
+    }
+  }
+
+  class MirrorMakerConsumerRebalanceListener (dataChannel: DataChannel) extends ConsumerRebalanceListener
{
+
+    override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]])
{
+      info("Clearing data channel.")
+      dataChannel.clear()
+      info("Waiting until all the messages are acked.")
+      inRebalance synchronized {
+        inRebalance.set(true)
+        while (numMessageUnacked.get() > 0)
+          inRebalance.wait()
+      }
+      info("Committing offsets.")
+      offsetCommitThread.commitOffset()
+      inRebalance.set(true)
+    }
+  }
+
+  private[kafka] class MirrorMakerRecord (val sourceTopic: String,
+                                          val sourcePartition: Int,
+                                          val sourceOffset: Long,
+                                          val key: Array[Byte],
+                                          val value: Array[Byte]) {
+    def size = value.length + {if (key == null) 0 else key.length}
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/28016299/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
index 6a85d7e..26149af 100644
--- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
+++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
@@ -216,4 +216,15 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int,
val queueBy
    * @return the remaining bytes capacity of the queue
    */
   def remainingByteSize = math.max(0, queueByteCapacity - currentByteSize.get())
+
+  /**
+   * remove all the items in the queue
+   */
+  def clear() {
+    putLock synchronized {
+      queue.clear()
+      currentByteSize.set(0)
+      putLock.notify()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/28016299/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 8c4687b..3ccccbd 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -19,17 +19,20 @@ package kafka.consumer
 
 import junit.framework.Assert._
 import kafka.integration.KafkaServerTestHarness
+import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.server._
 import scala.collection._
+import scala.collection.JavaConversions._
 import org.scalatest.junit.JUnit3Suite
 import kafka.message._
 import kafka.serializer._
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
-import java.util.Collections
+import kafka.producer.{KeyedMessage, Producer}
+import java.util.{Collections, Properties}
 import org.apache.log4j.{Logger, Level}
 import kafka.utils.TestUtils._
-import kafka.common.MessageStreamsExistException
+import kafka.common.{TopicAndPartition, MessageStreamsExistException}
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with
Logging {
 
@@ -344,6 +347,49 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     zkClient.close()
   }
 
+  def testConsumerRebalanceListener() {
+    // Send messages to create topic
+    sendMessagesToPartition(configs, topic, 0, nMessages)
+    sendMessagesToPartition(configs, topic, 1, nMessages)
+
+    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumer1))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    // Register consumer rebalance listener
+    val rebalanceListener1 = new TestConsumerRebalanceListener()
+    zkConsumerConnector1.setConsumerRebalanceListener(rebalanceListener1)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic ->
1), new StringDecoder(), new StringDecoder())
+
+    // Check if rebalance listener is fired
+    assertEquals(true, rebalanceListener1.listenerCalled)
+    assertEquals(null, rebalanceListener1.partitionOwnership.get(topic))
+    // reset the flag
+    rebalanceListener1.listenerCalled = false
+
+    val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
+    val expected_1 = List(("0", "group1_consumer1-0"),
+                          ("1", "group1_consumer1-0"))
+    assertEquals(expected_1, actual_1)
+
+    val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumer2))
+    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    // Register consumer rebalance listener
+    val rebalanceListener2 = new TestConsumerRebalanceListener()
+    zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2)
+    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic ->
1), new StringDecoder(), new StringDecoder())
+
+    val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
+    val expected_2 = List(("0", "group1_consumer1-0"),
+                          ("1", "group1_consumer2-0"))
+    assertEquals(expected_2, actual_2)
+
+    // Check if rebalance listener is fired
+    assertEquals(true, rebalanceListener1.listenerCalled)
+    assertEquals(Set[Int](0, 1), rebalanceListener1.partitionOwnership.get(topic))
+    assertEquals(true, rebalanceListener2.listenerCalled)
+    assertEquals(null, rebalanceListener2.partitionOwnership.get(topic))
+
+  }
+
   def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
     val children = zkClient.getChildren(path)
     Collections.sort(children)
@@ -355,4 +401,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
       (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
   }
 
+  private class TestConsumerRebalanceListener extends ConsumerRebalanceListener {
+    var listenerCalled: Boolean = false
+    var partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]] = null
+
+    override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]])
{
+      listenerCalled = true
+      this.partitionOwnership = partitionOwnership
+    }
+  }
+
 }


Mime
View raw message