kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1152970 [16/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Date Mon, 01 Aug 2011 23:42:17 GMT
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,569 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import scala.collection._
+import org.apache.log4j.Logger
+import kafka.cluster._
+import kafka.utils._
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import java.net.InetAddress
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+import kafka.api.OffsetRequest
+
+/**
+ * This class handles the consumers interaction with zookeeper
+ *
+ * Directories:
+ * 1. Consumer id registry:
+ * /consumers/[group_id]/ids[consumer_id] -> topic1,...topicN
+ * A consumer has a unique consumer id within a consumer group. A consumer registers its id as an ephemeral znode
+ * and puts all topics that it subscribes to as the value of the znode. The znode is deleted when the client is gone.
+ * A consumer subscribes to event changes of the consumer id registry within its group.
+ *
+ * The consumer id is picked up from configuration, instead of the sequential id assigned by ZK. Generated sequential
+ * ids are hard to recover during temporary connection loss to ZK, since it's difficult for the client to figure out
+ * whether the creation of a sequential znode has succeeded or not. More details can be found at
+ * (http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling)
+ *
+ * 2. Broker node registry:
+ * /brokers/[0...N] --> { "host" : "host:port",
+ *                        "topics" : {"topic1": ["partition1" ... "partitionN"], ...,
+ *                                    "topicN": ["partition1" ... "partitionN"] } }
+ * This is a list of all present broker brokers. A unique logical node id is configured on each broker node. A broker
+ * node registers itself on start-up and creates a znode with the logical node id under /brokers. The value of the znode
+ * is a JSON String that contains (1) the host name and the port the broker is listening to, (2) a list of topics that
+ * the broker serves, (3) a list of logical partitions assigned to each topic on the broker.
+ * A consumer subscribes to event changes of the broker node registry.
+ *
+ * 3. Partition owner registry:
+ * /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id
+ * This stores the mapping before broker partitions and consumers. Each partition is owned by a unique consumer
+ * within a consumer group. The mapping is reestablished after each rebalancing.
+ *
+ * 4. Consumer offset tracking:
+ * /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
+ * Each consumer tracks the offset of the latest message consumed for each partition.
+ *
+ */
+private[kafka] object ZookeeperConsumerConnector {
+  val MAX_N_RETRIES = 4
+  val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
+}
+
+/**
+ *  JMX interface for monitoring consumer
+ */
+trait ZookeeperConsumerConnectorMBean {
+  def getPartOwnerStats: String
+  def getConsumerGroup: String
+  def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long
+  def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long
+  def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
+}
+
+private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
+                                                val enableFetcher: Boolean) // for testing only
+  extends ConsumerConnector with ZookeeperConsumerConnectorMBean {
+
+  private val logger = Logger.getLogger(getClass())
+  private val isShuttingDown = new AtomicBoolean(false)
+  private val rebalanceLock = new Object
+  private var fetcher: Option[Fetcher] = None
+  private var zkClient: ZkClient = null
+  private val topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
+  // queues : (topic,consumerThreadId) -> queue
+  private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
+  private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+  connectZk
+  createFetcher
+  if (config.autoCommit) {
+    logger.info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
+    scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs)
+  }
+
+  def this(config: ConsumerConfig) = this(config, true)
+
+  def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]] = {
+    consume(topicCountMap)
+  }
+
+  private def createFetcher() {
+    if (enableFetcher)
+      fetcher = Some(new Fetcher(config, zkClient))
+  }
+
+  private def connectZk() {
+    logger.info("Connecting to zookeeper instance at " + config.zkConnect)
+    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+  }
+
+  def shutdown() {
+    val canShutdown = isShuttingDown.compareAndSet(false, true);
+    if (canShutdown) {
+      logger.info("ZKConsumerConnector shutting down")
+      try {
+        scheduler.shutdown
+        fetcher match {
+          case Some(f) => f.shutdown
+          case None =>
+        }
+        sendShudownToAllQueues
+        if (zkClient != null) {
+          zkClient.close()
+          zkClient = null
+        }
+      }
+      catch {
+        case e =>
+          logger.fatal(e)
+          logger.fatal(Utils.stackTrace(e))
+      }
+      logger.info("ZKConsumerConnector shut down completed")
+    }
+  }
+
+  def consume(topicCountMap: scala.collection.Map[String,Int]): Map[String,List[KafkaMessageStream]] = {
+    logger.debug("entering consume ")
+    if (topicCountMap == null)
+      throw new RuntimeException("topicCountMap is null")
+
+    val dirs = new ZKGroupDirs(config.groupId)
+    var ret = new mutable.HashMap[String,List[KafkaMessageStream]]
+
+    var consumerUuid : String = null
+    config.consumerId match {
+      case Some(consumerId) // for testing only 
+      => consumerUuid = consumerId
+      case None // generate unique consumerId automatically
+      => consumerUuid = InetAddress.getLocalHost.getHostName + "-" + System.currentTimeMillis
+    }
+    val consumerIdString = config.groupId + "_" + consumerUuid
+    val topicCount = new TopicCount(consumerIdString, topicCountMap)
+
+    // listener to consumer and partition changes
+    val loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString)
+    registerConsumerInZK(dirs, consumerIdString, topicCount)
+    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+
+    // create a queue per topic per consumer thread
+    val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
+    for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) {
+      var streamList: List[KafkaMessageStream] = Nil
+      for (threadId <- threadIdSet) {
+        val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+        queues.put((topic, threadId), stream)
+        streamList ::= new KafkaMessageStream(stream, config.consumerTimeoutMs)
+      }
+      ret += (topic -> streamList)
+      logger.debug("adding topic " + topic + " and stream to map..")
+
+      // register on broker partition path changes
+      val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic
+      ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath)
+      zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
+    }
+
+    // register listener for session expired event
+    zkClient.subscribeStateChanges(
+      new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
+
+    // explicitly trigger load balancing for this consumer
+    loadBalancerListener.syncedRebalance
+    ret
+  }
+
+  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
+    logger.info("begin registering consumer " + consumerIdString + " in ZK")
+    ZkUtils.createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
+    logger.info("end registering consumer " + consumerIdString + " in ZK")
+  }
+
+  private def sendShudownToAllQueues() = {
+    for (queue <- queues.values) {
+      logger.debug("Clearing up queue")
+      queue.clear
+      queue.put(ZookeeperConsumerConnector.shutdownCommand)
+      logger.debug("Cleared queue and sent shutdown command")
+    }
+  }
+
+  def autoCommit() {
+    if(logger.isTraceEnabled)
+      logger.trace("auto committing")
+    try {
+      commitOffsets
+    }
+    catch {
+      case t: Throwable =>
+      // log it and let it go
+        logger.error("exception during autoCommit: ", t)
+    }
+  }
+
+  def commitOffsets() {
+    if (zkClient == null)
+      return
+    for ((topic, infos) <- topicRegistry) {
+      val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+      for (info <- infos.values) {
+        val newOffset = info.getConsumeOffset
+        try {
+          ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
+            newOffset.toString)
+        }
+        catch {
+          case t: Throwable =>
+          // log it and let it go
+            logger.warn("exception during commitOffsets: " + t + Utils.stackTrace(t))
+        }
+        if(logger.isDebugEnabled)
+          logger.debug("Committed offset " + newOffset + " for topic " + info)
+      }
+    }
+  }
+
+  // for JMX
+  def getPartOwnerStats(): String = {
+    val builder = new StringBuilder
+    for ((topic, infos) <- topicRegistry) {
+      builder.append("\n" + topic + ": [")
+      val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+      for(partition <- infos.values) {
+        builder.append("\n    {")
+        builder.append{partition.partition.name}
+        builder.append(",fetch offset:" + partition.getFetchOffset)
+        builder.append(",consumer offset:" + partition.getConsumeOffset)
+        builder.append("}")
+      }
+      builder.append("\n        ]")
+    }
+    builder.toString
+  }
+
+  // for JMX
+  def getConsumerGroup(): String = config.groupId
+
+  def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long =
+    getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId)
+
+  def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
+    val partition = new Partition(brokerId, partitionId)
+    val partitionInfos = topicRegistry.get(topic)
+    if (partitionInfos != null) {
+      val partitionInfo = partitionInfos.get(partition)
+      if (partitionInfo != null)
+        return partitionInfo.getConsumeOffset
+    }
+
+    //otherwise, try to get it from zookeeper
+    try {
+      val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+      val znode = topicDirs.consumerOffsetDir + "/" + partition.name
+      val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
+      if (offsetString != null)
+        return offsetString.toLong
+      else
+        return -1
+    }
+    catch {
+      case e =>
+        logger.error("error in getConsumedOffset JMX ", e)
+    }
+    return -2
+  }
+
+  def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
+    var simpleConsumer: SimpleConsumer = null
+    var producedOffset: Long = -1L
+    try {
+      val cluster = ZkUtils.getCluster(zkClient)
+      val broker = cluster.getBroker(brokerId)
+      simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
+                                            ConsumerConfig.SocketBufferSize)
+      val latestOffset = simpleConsumer.getOffsetsBefore(topic, partitionId,
+                                                       OffsetRequest.LatestTime, 1)
+      producedOffset = latestOffset(0)
+    }
+    catch {
+      case e =>
+        logger.error("error in getLatestOffset jmx ", e)
+    }
+    finally {
+      if (simpleConsumer != null)
+        simpleConsumer.close
+    }
+    producedOffset
+  }
+
+  class ZKSessionExpireListenner(val dirs: ZKGroupDirs,
+                                 val consumerIdString: String,
+                                 val topicCount: TopicCount,
+                                 val loadBalancerListener: ZKRebalancerListener)
+    extends IZkStateListener {
+    @throws(classOf[Exception])
+    def handleStateChanged(state: KeeperState) {
+      // do nothing, since zkclient will do reconnect for us.
+    }
+
+    /**
+     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+     * any ephemeral nodes here.
+     *
+     * @throws Exception
+     *             On any error.
+     */
+    @throws(classOf[Exception])
+    def handleNewSession() {
+      /**
+       *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
+       *  connection for us. We need to release the ownership of the current consumer and re-register this
+       *  consumer in the consumer registry and trigger a rebalance.
+       */
+      logger.info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
+      loadBalancerListener.resetState
+      registerConsumerInZK(dirs, consumerIdString, topicCount)
+      // explicitly trigger load balancing for this consumer
+      loadBalancerListener.syncedRebalance
+
+      // There is no need to resubscribe to child and state changes.
+      // The child change watchers will be set inside rebalance when we read the children list. 
+    }
+
+  }
+
+  class ZKRebalancerListener(val group: String, val consumerIdString: String)
+    extends IZkChildListener {
+    private val dirs = new ZKGroupDirs(group)
+    private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
+    private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+      syncedRebalance
+    }
+
+    private def releasePartitionOwnership() {
+      for ((topic, infos) <- topicRegistry) {
+        val topicDirs = new ZKGroupTopicDirs(group, topic)
+        for(partition <- infos.keys) {
+          val znode = topicDirs.consumerOwnerDir + "/" + partition
+          ZkUtils.deletePath(zkClient, znode)
+          if(logger.isDebugEnabled)
+            logger.debug("Consumer " + consumerIdString + " releasing " + znode)
+        }
+      }
+    }
+
+    private def getConsumersPerTopic(group: String) : mutable.Map[String, List[String]] = {
+      val consumers = ZkUtils.getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
+      val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
+      for (consumer <- consumers) {
+        val topicCount = getTopicCount(consumer)
+        for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
+          for (consumerThreadId <- consumerThreadIdSet)
+            consumersPerTopicMap.get(topic) match {
+              case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
+              case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
+            }
+        }
+      }
+      for ( (topic, consumerList) <- consumersPerTopicMap )
+        consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
+      consumersPerTopicMap
+    }
+
+    private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
+                                    newPartMap: Map[String,List[String]],
+                                    oldPartMap: Map[String,List[String]],
+                                    newConsumerMap: Map[String,List[String]],
+                                    oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = {
+      var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]()
+      for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap )
+        if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic))
+          relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet)
+      relevantTopicThreadIdsMap
+    }
+
+    private def getTopicCount(consumerId: String) : TopicCount = {
+      val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
+      TopicCount.constructTopicCount(consumerId, topicCountJson)
+    }
+
+    def resetState() {
+      topicRegistry.clear
+      oldConsumersPerTopicMap.clear
+      oldPartitionsPerTopicMap.clear
+    }
+
+    def syncedRebalance() {
+      rebalanceLock synchronized {
+        for (i <- 0 until ZookeeperConsumerConnector.MAX_N_RETRIES) {
+          logger.info("begin rebalancing consumer " + consumerIdString + " try #" + i)
+          var done = false
+          try {
+            done = rebalance
+          }
+          catch {
+            case e =>
+              // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
+              // For example, a ZK node can disappear between the time we get all children and the time we try to get
+              // the value of a child. Just let this go since another rebalance will be triggered.
+              logger.info("exception during rebalance " + e)
+          }
+          logger.info("end rebalancing consumer " + consumerIdString + " try #" + i)
+          if (done)
+            return
+          // release all partitions, reset state and retry
+          releasePartitionOwnership
+          resetState
+          Thread.sleep(config.zkSyncTimeMs)
+        }
+      }
+
+      throw new RuntimeException(consumerIdString + " can't rebalance after " + ZookeeperConsumerConnector.MAX_N_RETRIES +" retires")
+    }
+
+    private def rebalance(): Boolean = {
+      // testing code
+      //if ("group1_consumer1" == consumerIdString) {
+      //  logger.info("sleeping " + consumerIdString)
+      //  Thread.sleep(20)
+      //}
+
+      val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic
+      val cluster = ZkUtils.getCluster(zkClient)
+      val consumersPerTopicMap = getConsumersPerTopic(group)
+      val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
+      val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
+      if (relevantTopicThreadIdsMap.size <= 0) {
+        logger.info("Consumer " + consumerIdString + " with " + consumersPerTopicMap + " doesn't need to rebalance.")
+        return true
+      }
+
+      logger.info("Committing all offsets")
+      commitOffsets
+
+      logger.info("Releasing partition ownership")
+      releasePartitionOwnership
+
+      val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]]
+      for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
+        topicRegistry.remove(topic)
+        topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
+
+        val topicDirs = new ZKGroupTopicDirs(group, topic)
+        val curConsumers = consumersPerTopicMap.get(topic).get
+        var curPartitions: List[String] = partitionsPerTopicMap.get(topic).get
+
+        val nPartsPerConsumer = curPartitions.size / curConsumers.size
+        val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
+
+        logger.info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions +
+          " for topic " + topic + " with consumers: " + curConsumers)
+
+        for (consumerThreadId <- consumerThreadIdSet) {
+          val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
+          assert(myConsumerPosition >= 0)
+          val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
+          val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
+
+          /**
+           *   Range-partition the sorted partitions to consumers for better locality.
+           *  The first few consumers pick up an extra partition, if any.
+           */
+          if (nParts <= 0)
+            logger.warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
+          else {
+            for (i <- startPart until startPart + nParts) {
+              val partition = curPartitions(i)
+              logger.info(consumerThreadId + " attempting to claim partition " + partition)
+              if (!processPartition(topicDirs, partition, topic, consumerThreadId))
+                return false
+            }
+            queuesToBeCleared += queues.get((topic, consumerThreadId))
+          }
+        }
+      }
+      updateFetcher(cluster, queuesToBeCleared)
+      oldPartitionsPerTopicMap = partitionsPerTopicMap
+      oldConsumersPerTopicMap = consumersPerTopicMap
+      true
+    }
+
+    private def updateFetcher(cluster: Cluster, queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
+      // update partitions for fetcher
+      var allPartitionInfos : List[PartitionTopicInfo] = Nil
+      for (partitionInfos <- topicRegistry.values)
+        for (partition <- partitionInfos.values)
+          allPartitionInfos ::= partition
+      logger.info("Consumer " + consumerIdString + " selected partitions : " +
+        allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(","))
+
+      fetcher match {
+        case Some(f) => f.initConnections(allPartitionInfos, cluster, queuesTobeCleared)
+        case None =>
+      }
+    }
+
+    private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String,
+                                 topic: String, consumerThreadId: String) : Boolean = {
+      val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
+      try {
+        ZkUtils.createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
+      }
+      catch {
+        case e: ZkNodeExistsException =>
+        // The node hasn't been deleted by the original owner. So wait a bit and retry.
+          logger.info("waiting for the partition ownership to be deleted: " + partition)
+          return false
+        case e2 => throw e2
+      }
+      addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
+      true
+    }
+
+    private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String,
+                                      topic: String, consumerThreadId: String) {
+      val partition = Partition.parse(partitionString)
+      val partTopicInfoMap = topicRegistry.get(topic)
+
+      val znode = topicDirs.consumerOffsetDir + "/" + partition.name
+      val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
+      // If first time starting a consumer, use default offset.
+      // TODO: handle this better (if client doesn't know initial offsets)
+      val offset : Long = if (offsetString == null) Long.MaxValue else offsetString.toLong
+      val queue = queues.get((topic, consumerThreadId))
+      val consumedOffset = new AtomicLong(offset)
+      val fetchedOffset = new AtomicLong(offset)
+      val partTopicInfo = new PartitionTopicInfo(topic,
+                                                 partition.brokerId,
+                                                 partition,
+                                                 queue,
+                                                 consumedOffset,
+                                                 fetchedOffset,
+                                                 new AtomicInteger(config.fetchSize))
+      partTopicInfoMap.put(partition, partTopicInfo)
+      if (logger.isDebugEnabled)
+        logger.debug(partTopicInfo + " selected new offset " + offset)
+    }
+  }
+}
+

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html Mon Aug  1 23:41:24 2011
@@ -0,0 +1 @@
+This is the consumer API for kafka.
\ No newline at end of file

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer.storage
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import java.util.concurrent.locks._
+
+class MemoryOffsetStorage extends OffsetStorage {
+  
+  val offsetAndLock = new ConcurrentHashMap[(Int, String), (AtomicLong, Lock)]
+
+  def reserve(node: Int, topic: String): Long = {
+    val key = (node, topic)
+    if(!offsetAndLock.containsKey(key))
+      offsetAndLock.putIfAbsent(key, (new AtomicLong(0), new ReentrantLock))
+    val (offset, lock) = offsetAndLock.get(key)
+    lock.lock
+    offset.get
+  }
+
+  def commit(node: Int, topic: String, offset: Long) = {
+    val (highwater, lock) = offsetAndLock.get((node, topic))
+    highwater.set(offset)
+    lock.unlock
+    offset
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer.storage
+
+import kafka.utils.Range
+
+/**
+ * A method for storing offsets for the consumer. 
+ * This is used to track the progress of the consumer in the stream.
+ */
+trait OffsetStorage {
+
+  /**
+   * Reserve a range of the length given by increment.
+   * @param increment The size to reserver
+   * @return The range reserved
+   */
+  def reserve(node: Int, topic: String): Long
+
+  /**
+   * Update the offset to the new offset
+   * @param offset The new offset
+   */
+  def commit(node: Int, topic: String, offset: Long)
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer.storage.sql
+
+import java.sql._
+import org.apache.log4j._
+import kafka.utils._
+import kafka.consumer.storage.OffsetStorage
+
+/**
+ * An offset storage implementation that uses an oracle database to save offsets
+ */
+@nonthreadsafe
+class OracleOffsetStorage(val connection: Connection) extends OffsetStorage {
+  
+  private val logger: Logger = Logger.getLogger(classOf[OracleOffsetStorage])
+  private val lock = new Object
+  connection.setAutoCommit(false)
+  
+  def reserve(node: Int, topic: String): Long = {
+    /* try to get and lock the offset, if it isn't there, make it */
+    val maybeOffset = selectExistingOffset(connection, node, topic)
+    val offset = maybeOffset match {
+      case Some(offset) => offset
+      case None => {
+        maybeInsertZeroOffset(connection, node, topic)
+        selectExistingOffset(connection, node, topic).get
+      }
+    }
+    
+    if(logger.isDebugEnabled)
+      logger.debug("Reserved node " + node + " for topic '" + topic + " offset " + offset)
+    
+    offset
+  }
+  
+  def commit(node: Int, topic: String, offset: Long) {
+    var success = false
+    try {
+      updateOffset(connection, node, topic, offset)
+      success = true
+    } finally {
+      commitOrRollback(connection, success)
+    }
+    if(logger.isDebugEnabled)
+      logger.debug("Updated node " + node + " for topic '" + topic + "' to " + offset)
+  }
+  
+  def close() {
+    Utils.swallow(logger.error, connection.close())
+  }
+  
+  /**
+   * Attempt to update an existing entry in the table if there isn't already one there
+   * @return true iff the row didn't already exist
+   */
+  private def maybeInsertZeroOffset(connection: Connection, node: Int, topic: String): Boolean = {
+    val stmt = connection.prepareStatement(
+      """insert into kafka_offsets (node, topic, offset) 
+         select ?, ?, 0 from dual where not exists 
+         (select null from kafka_offsets where node = ? and topic = ?)""")
+    stmt.setInt(1, node)
+    stmt.setString(2, topic)
+    stmt.setInt(3, node)
+    stmt.setString(4, topic)
+    val updated = stmt.executeUpdate()
+    if(updated > 1)
+      throw new IllegalStateException("More than one key updated by primary key!")
+    else
+      updated == 1
+  }
+  
+  /**
+   * Attempt to update an existing entry in the table
+   * @return true iff we updated an entry
+   */
+  private def selectExistingOffset(connection: Connection, node: Int, topic: String): Option[Long] = {
+    val stmt = connection.prepareStatement(
+        """select offset from kafka_offsets
+           where node = ? and topic = ?
+           for update""")
+    var results: ResultSet = null
+    try {
+      stmt.setInt(1, node)
+      stmt.setString(2, topic)
+      results = stmt.executeQuery()
+      if(!results.next()) {
+        None
+      } else {
+        val offset = results.getLong("offset")
+        if(results.next())
+          throw new IllegalStateException("More than one entry for primary key!")
+        Some(offset)
+      }
+    } finally {
+      close(stmt)
+      close(results)
+    }
+  }
+  
+  private def updateOffset(connection: Connection, 
+                           node: Int, 
+                           topic: String, 
+                           newOffset: Long): Unit = {
+    val stmt = connection.prepareStatement("update kafka_offsets set offset = ? where node = ? and topic = ?")
+    try {
+      stmt.setLong(1, newOffset)
+      stmt.setInt(2, node)
+      stmt.setString(3, topic)
+      val updated = stmt.executeUpdate()
+      if(updated != 1)
+        throw new IllegalStateException("Unexpected number of keys updated: " + updated)
+    } finally {
+      close(stmt)
+    }
+  }
+  
+  
+  private def commitOrRollback(connection: Connection, commit: Boolean) {
+    if(connection != null) {
+      if(commit)
+        Utils.swallow(logger.error, connection.commit())
+      else
+        Utils.swallow(logger.error, connection.rollback())
+    }
+  }
+  
+  private def close(rs: ResultSet) {
+    if(rs != null)
+      Utils.swallow(logger.error, rs.close())
+  }
+  
+  private def close(stmt: PreparedStatement) {
+    if(stmt != null)
+      Utils.swallow(logger.error, stmt.close())
+  }
+  
+  private def close(connection: Connection) {
+    if(connection != null)
+      Utils.swallow(logger.error, connection.close())
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.javaapi
+
+import java.nio.ByteBuffer
+import org.apache.log4j.Logger
+import kafka.serializer.Encoder
+import kafka.producer.{ProducerConfig, ProducerPool}
+import kafka.producer.async.{AsyncProducerConfig, QueueItem}
+
+private[javaapi] object Implicits {
+  private val logger = Logger.getLogger(getClass())
+
+  implicit def javaMessageSetToScalaMessageSet(messageSet: kafka.javaapi.message.ByteBufferMessageSet):
+     kafka.message.ByteBufferMessageSet = messageSet.underlying
+
+  implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
+     kafka.javaapi.message.ByteBufferMessageSet = {
+    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
+                                                   messageSet.getErrorCode)
+  }
+
+  implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
+    if(logger.isDebugEnabled)
+      logger.debug("Implicit instantiation of Java Sync Producer")
+    new kafka.javaapi.producer.SyncProducer(producer)
+  }
+
+  implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = {
+    if(logger.isDebugEnabled)
+      logger.debug("Implicit instantiation of Sync Producer")
+    producer.underlying
+  }
+
+  implicit def toScalaEventHandler[T](eventHandler: kafka.javaapi.producer.async.EventHandler[T])
+       : kafka.producer.async.EventHandler[T] = {
+    new kafka.producer.async.EventHandler[T] {
+      override def init(props: java.util.Properties) { eventHandler.init(props) }
+      override def handle(events: Seq[QueueItem[T]], producer: kafka.producer.SyncProducer, encoder: Encoder[T]) {
+        import collection.JavaConversions._
+        eventHandler.handle(asList(events), producer, encoder)
+      }
+      override def close { eventHandler.close }
+    }
+  }
+
+  implicit def toJavaEventHandler[T](eventHandler: kafka.producer.async.EventHandler[T])
+    : kafka.javaapi.producer.async.EventHandler[T] = {
+    new kafka.javaapi.producer.async.EventHandler[T] {
+      override def init(props: java.util.Properties) { eventHandler.init(props) }
+      override def handle(events: java.util.List[QueueItem[T]], producer: kafka.javaapi.producer.SyncProducer,
+                          encoder: Encoder[T]) {
+        import collection.JavaConversions._
+        eventHandler.handle(asBuffer(events), producer, encoder)
+      }
+      override def close { eventHandler.close }
+    }
+  }
+
+  implicit def toScalaCbkHandler[T](cbkHandler: kafka.javaapi.producer.async.CallbackHandler[T])
+      : kafka.producer.async.CallbackHandler[T] = {
+    new kafka.producer.async.CallbackHandler[T] {
+      import collection.JavaConversions._
+      override def init(props: java.util.Properties) { cbkHandler.init(props)}
+      override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
+        cbkHandler.beforeEnqueue(data)
+      }
+      override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) {
+        cbkHandler.afterEnqueue(data, added)
+      }
+      override def afterDequeuingExistingData(data: QueueItem[T] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
+        cbkHandler.afterDequeuingExistingData(data)
+      }
+      override def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
+        asList(cbkHandler.beforeSendingData(asList(data)))
+      }
+      override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]] = {
+        asBuffer(cbkHandler.lastBatchBeforeClose)
+      }
+      override def close { cbkHandler.close }
+    }
+  }
+
+  implicit def toJavaCbkHandler[T](cbkHandler: kafka.producer.async.CallbackHandler[T])
+      : kafka.javaapi.producer.async.CallbackHandler[T] = {
+    new kafka.javaapi.producer.async.CallbackHandler[T] {
+      import collection.JavaConversions._
+      override def init(props: java.util.Properties) { cbkHandler.init(props)}
+      override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
+        cbkHandler.beforeEnqueue(data)
+      }
+      override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) {
+        cbkHandler.afterEnqueue(data, added)
+      }
+      override def afterDequeuingExistingData(data: QueueItem[T] = null)
+      : java.util.List[QueueItem[T]] = {
+        asList(cbkHandler.afterDequeuingExistingData(data))
+      }
+      override def beforeSendingData(data: java.util.List[QueueItem[T]] = null)
+      : java.util.List[QueueItem[T]] = {
+        asBuffer(cbkHandler.beforeSendingData(asBuffer(data)))
+      }
+      override def lastBatchBeforeClose: java.util.List[QueueItem[T]] = {
+        asList(cbkHandler.lastBatchBeforeClose)
+      }
+      override def close { cbkHandler.close }
+    }
+  }
+
+  implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse =
+    response.underlying
+
+  implicit def toJavaMultiFetchResponse(response: kafka.api.MultiFetchResponse): kafka.javaapi.MultiFetchResponse =
+    new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets, response.offsets)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi
+
+import kafka.utils.IteratorTemplate
+import java.nio.ByteBuffer
+import message.ByteBufferMessageSet
+
+class MultiFetchResponse(buffer: ByteBuffer, numSets: Int, offsets: Array[Long]) extends java.lang.Iterable[ByteBufferMessageSet] {
+  val underlyingBuffer = ByteBuffer.wrap(buffer.array)
+    // this has the side effect of setting the initial position of buffer correctly
+  val errorCode = underlyingBuffer.getShort
+
+  import Implicits._
+  val underlying = new kafka.api.MultiFetchResponse(underlyingBuffer, numSets, offsets)
+
+  override def toString() = underlying.toString
+
+  def iterator : java.util.Iterator[ByteBufferMessageSet] = {
+    new IteratorTemplate[ByteBufferMessageSet] {
+      val iter = underlying.iterator
+      override def makeNext(): ByteBufferMessageSet = {
+        if(iter.hasNext)
+          iter.next
+        else
+          return allDone
+      }
+    }
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+import kafka.network.Request
+import kafka.api.RequestKeys
+import java.nio.ByteBuffer
+
+class ProducerRequest(val topic: String,
+                      val partition: Int,
+                      val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
+  import Implicits._
+  private val underlying = new kafka.api.ProducerRequest(topic, partition, messages)
+
+  def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
+
+  def sizeInBytes(): Int = underlying.sizeInBytes
+
+  def getTranslatedPartition(randomSelector: String => Int): Int =
+    underlying.getTranslatedPartition(randomSelector)
+
+  override def toString: String =
+    underlying.toString
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case that: ProducerRequest =>
+        (that canEqual this) && topic == that.topic && partition == that.partition &&
+                messages.equals(that.messages)
+      case _ => false
+    }
+  }
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
+
+  override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
+
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi.consumer;
+
+import kafka.consumer.KafkaMessageStream;
+
+import java.util.List;
+import java.util.Map;
+
+public interface ConsumerConnector {
+    /**
+     *  Create a list of MessageStreams for each topic.
+     *
+     *  @param topicCountMap  a map of (topic, #streams) pair
+     *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
+     *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
+     */
+    public Map<String, List<KafkaMessageStream>> createMessageStreams(Map<String, Integer> topicCountMap);
+
+    /**
+     *  Commit the offsets of all broker partitions connected by this connector.
+     */
+    public void commitOffsets();
+
+    /**
+     *  Shut down the connector
+     */
+    public void shutdown();
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi.consumer
+
+import kafka.utils.threadsafe
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.javaapi.MultiFetchResponse
+import kafka.api.FetchRequest
+
+/**
+ * A consumer of kafka messages
+ */
+@threadsafe
+class SimpleConsumer(val host: String,
+                     val port: Int,
+                     val soTimeout: Int,
+                     val bufferSize: Int) {
+  val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
+
+  /**
+   *  Fetch a set of messages from a topic.
+   *
+   *  @param request  specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+   *  @return a set of fetched messages
+   */
+  def fetch(request: FetchRequest): ByteBufferMessageSet = {
+    import kafka.javaapi.Implicits._
+    underlying.fetch(request)
+  }
+
+  /**
+   *  Combine multiple fetch requests in one call.
+   *
+   *  @param fetches  a sequence of fetch requests.
+   *  @return a sequence of fetch responses
+   */
+  def multifetch(fetches: java.util.List[FetchRequest]): MultiFetchResponse = {
+    import scala.collection.JavaConversions._
+    import kafka.javaapi.Implicits._
+    underlying.multifetch(asBuffer(fetches): _*)
+  }
+
+  /**
+   *  Get a list of valid offsets (up to maxSize) before the given time.
+   *  The result is a list of offsets, in descending order.
+   *
+   *  @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available)
+   *  @return an array of offsets
+   */
+  def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] =
+    underlying.getOffsetsBefore(topic, partition, time, maxNumOffsets)
+
+  def close() {
+    underlying.close
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi.consumer
+
+import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
+
+/**
+ * This class handles the consumers interaction with zookeeper
+ *
+ * Directories:
+ * 1. Consumer id registry:
+ * /consumers/[group_id]/ids[consumer_id] -> topic1,...topicN
+ * A consumer has a unique consumer id within a consumer group. A consumer registers its id as an ephemeral znode
+ * and puts all topics that it subscribes to as the value of the znode. The znode is deleted when the client is gone.
+ * A consumer subscribes to event changes of the consumer id registry within its group.
+ *
+ * The consumer id is picked up from configuration, instead of the sequential id assigned by ZK. Generated sequential
+ * ids are hard to recover during temporary connection loss to ZK, since it's difficult for the client to figure out
+ * whether the creation of a sequential znode has succeeded or not. More details can be found at
+ * (http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling)
+ *
+ * 2. Broker node registry:
+ * /brokers/[0...N] --> { "host" : "host:port",
+ *                        "topics" : {"topic1": ["partition1" ... "partitionN"], ...,
+ *                                    "topicN": ["partition1" ... "partitionN"] } }
+ * This is a list of all present broker brokers. A unique logical node id is configured on each broker node. A broker
+ * node registers itself on start-up and creates a znode with the logical node id under /brokers. The value of the znode
+ * is a JSON String that contains (1) the host name and the port the broker is listening to, (2) a list of topics that
+ * the broker serves, (3) a list of logical partitions assigned to each topic on the broker.
+ * A consumer subscribes to event changes of the broker node registry.
+ *
+ * 3. Partition owner registry:
+ * /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id
+ * This stores the mapping before broker partitions and consumers. Each partition is owned by a unique consumer
+ * within a consumer group. The mapping is reestablished after each rebalancing.
+ *
+ * 4. Consumer offset tracking:
+ * /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
+ * Each consumer tracks the offset of the latest message consumed for each partition.
+ *
+*/
+
+private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
+                                 val enableFetcher: Boolean) // for testing only
+    extends ConsumerConnector {
+
+  val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
+
+  def this(config: ConsumerConfig) = this(config, true)
+
+ // for java client
+  def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]):
+    java.util.Map[String,java.util.List[KafkaMessageStream]] = {
+    import scala.collection.JavaConversions._
+
+    val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
+    val scalaReturn = underlying.consume(scalaTopicCountMap)
+    val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream]]
+    for ((topic, streams) <- scalaReturn) {
+      var javaStreamList = new java.util.ArrayList[KafkaMessageStream]
+      for (stream <- streams)
+        javaStreamList.add(stream)
+      ret.put(topic, javaStreamList)
+    }
+    ret
+  }
+
+  def commitOffsets() {
+    underlying.commitOffsets
+  }
+
+  def shutdown() {
+    underlying.shutdown
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.javaapi.message
+
+import java.nio.ByteBuffer
+import kafka.common.ErrorMapping
+import org.apache.log4j.Logger
+import kafka.message._
+
+class ByteBufferMessageSet(private val buffer: ByteBuffer,
+                           private val initialOffset: Long = 0L,
+                           private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
+  private val logger = Logger.getLogger(getClass())
+  val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
+                                                                                              initialOffset,
+                                                                                              errorCode)
+  def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
+
+  def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
+    this(compressionCodec match {
+      case NoCompressionCodec =>
+        val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+        val messageIterator = messages.iterator
+        while(messageIterator.hasNext) {
+          val message = messageIterator.next
+          message.serializeTo(buffer)
+        }
+        buffer.rewind
+        buffer
+      case _ =>
+        import scala.collection.JavaConversions._
+        val message = CompressionUtils.compress(asBuffer(messages), compressionCodec)
+        val buffer = ByteBuffer.allocate(message.serializedSize)
+        message.serializeTo(buffer)
+        buffer.rewind
+        buffer
+    }, 0L, ErrorMapping.NoError)
+  }
+
+  def this(messages: java.util.List[Message]) {
+    this(NoCompressionCodec, messages)
+  }
+
+  def validBytes: Long = underlying.validBytes
+
+  def serialized():ByteBuffer = underlying.serialized
+
+  def getInitialOffset = initialOffset
+
+  def getBuffer = buffer
+
+  def getErrorCode = errorCode
+
+  override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
+    val underlyingIterator = underlying.iterator
+    override def hasNext(): Boolean = {
+      underlyingIterator.hasNext
+    }
+
+    override def next(): MessageAndOffset = {
+      underlyingIterator.next
+    }
+
+    override def remove = throw new UnsupportedOperationException("remove API on MessageSet is not supported")
+  }
+
+  override def toString: String = underlying.toString
+
+  def sizeInBytes: Long = underlying.sizeInBytes
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case that: ByteBufferMessageSet =>
+        (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+      case _ => false
+    }
+  }
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
+
+  override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + initialOffset.hashCode
+
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi.message
+
+import java.nio.channels.WritableByteChannel
+import kafka.message.{MessageAndOffset, InvalidMessageException, Message}
+
+/**
+ * A set of messages. A message set has a fixed serialized form, though the container
+ * for the bytes could be either in-memory or on disk. A The format of each message is
+ * as follows:
+ * 4 byte size containing an integer N
+ * N message bytes as described in the message class
+ */
+abstract class MessageSet extends java.lang.Iterable[MessageAndOffset] {
+
+  /**
+   * Provides an iterator over the messages in this set
+   */
+  def iterator: java.util.Iterator[MessageAndOffset]
+
+  /**
+   * Gives the total size of this message set in bytes
+   */
+  def sizeInBytes: Long
+
+  /**
+   * Validate the checksum of all the messages in the set. Throws an InvalidMessageException if the checksum doesn't
+   * match the payload for any message.
+   */
+  def validate(): Unit = {
+    val thisIterator = this.iterator
+    while(thisIterator.hasNext) {
+      val messageAndOffset = thisIterator.next
+      if(!messageAndOffset.message.isValid)
+        throw new InvalidMessageException
+    }
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi.producer
+
+import kafka.utils.Utils
+import kafka.producer.async.QueueItem
+import java.util.Properties
+import kafka.producer.{ProducerPool, ProducerConfig, Partitioner}
+import kafka.serializer.Encoder
+
+class Producer[K,V](config: ProducerConfig,
+                    partitioner: Partitioner[K],
+                    producerPool: ProducerPool[V],
+                    populateProducerPool: Boolean = true) /* for testing purpose only. Applications should ideally */
+                                                          /* use the other constructor*/
+{
+
+  private val underlying = new kafka.producer.Producer[K,V](config, partitioner, producerPool, populateProducerPool, null)
+
+  /**
+   * This constructor can be used when all config parameters will be specified through the
+   * ProducerConfig object
+   * @param config Producer Configuration object
+   */
+  def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass),
+    new ProducerPool[V](config, Utils.getObject(config.serializerClass)))
+
+  /**
+   * This constructor can be used to provide pre-instantiated objects for all config parameters
+   * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and
+   * callback handler
+   * @param config Producer Configuration object
+   * @param encoder Encoder used to convert an object of type V to a kafka.message.Message
+   * @param eventHandler the class that implements kafka.javaapi.producer.async.IEventHandler[T] used to
+   * dispatch a batch of produce requests, using an instance of kafka.javaapi.producer.SyncProducer
+   * @param cbkHandler the class that implements kafka.javaapi.producer.async.CallbackHandler[T] used to inject
+   * callbacks at various stages of the kafka.javaapi.producer.AsyncProducer pipeline.
+   * @param partitioner class that implements the kafka.javaapi.producer.Partitioner[K], used to supply a custom
+   * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
+   * object in the  send API
+   */
+  import kafka.javaapi.Implicits._
+  def this(config: ProducerConfig,
+           encoder: Encoder[V],
+           eventHandler: kafka.javaapi.producer.async.EventHandler[V],
+           cbkHandler: kafka.javaapi.producer.async.CallbackHandler[V],
+           partitioner: Partitioner[K]) = {
+    this(config, partitioner,
+         new ProducerPool[V](config, encoder,
+                             new kafka.producer.async.EventHandler[V] {
+                               override def init(props: Properties) { eventHandler.init(props) }
+                               override def handle(events: Seq[QueueItem[V]], producer: kafka.producer.SyncProducer,
+                                                   encoder: Encoder[V]) {
+                                 import collection.JavaConversions._
+                                 import kafka.javaapi.Implicits._
+                                 eventHandler.handle(asList(events), producer, encoder)
+                               }
+                               override def close { eventHandler.close }
+                             },
+                             new kafka.producer.async.CallbackHandler[V] {
+                               import collection.JavaConversions._
+                               override def init(props: Properties) { cbkHandler.init(props)}
+                               override def beforeEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]]): QueueItem[V] = {
+                                 cbkHandler.beforeEnqueue(data)
+                               }
+                               override def afterEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]], added: Boolean) {
+                                 cbkHandler.afterEnqueue(data, added)
+                               }
+                               override def afterDequeuingExistingData(data: QueueItem[V] = null): scala.collection.mutable.Seq[QueueItem[V]] = {
+                                 cbkHandler.afterDequeuingExistingData(data)
+                               }
+                               override def beforeSendingData(data: Seq[QueueItem[V]] = null): scala.collection.mutable.Seq[QueueItem[V]] = {
+                                 asList(cbkHandler.beforeSendingData(asList(data)))
+                               }
+                               override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[V]] = {
+                                 asBuffer(cbkHandler.lastBatchBeforeClose)
+                               }
+                               override def close { cbkHandler.close }
+                             }))
+  }
+
+  /**
+   * Sends the data to a single topic, partitioned by key, using either the
+   * synchronous or the asynchronous producer
+   * @param producerData the producer data object that encapsulates the topic, key and message data
+   */
+  def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) {
+    import collection.JavaConversions._
+    underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey,
+                                                         asBuffer(producerData.getData)))
+  }
+
+  /**
+   * Use this API to send data to multiple topics
+   * @param producerData list of producer data objects that encapsulate the topic, key and message data
+   */
+  def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) {
+    import collection.JavaConversions._
+    underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
+                                                         asBuffer(pd.getData))): _*)
+  }
+
+  /**
+   * Close API to close the producer pool connections to all Kafka brokers. Also closes
+   * the zookeeper client connection if one exists
+   */
+  def close = underlying.close
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.javaapi.producer
+
+import scala.collection.JavaConversions._
+
+class ProducerData[K, V](private val topic: String,
+                         private val key: K,
+                         private val data: java.util.List[V]) {
+
+  def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
+
+  def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d)))
+
+  def getTopic: String = topic
+
+  def getKey: K = key
+
+  def getData: java.util.List[V] = data
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi.producer
+
+import kafka.producer.SyncProducerConfig
+import kafka.javaapi.message.ByteBufferMessageSet
+
+class SyncProducer(syncProducer: kafka.producer.SyncProducer) {
+
+  def this(config: SyncProducerConfig) = this(new kafka.producer.SyncProducer(config))
+
+  val underlying = syncProducer
+
+  def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
+    import kafka.javaapi.Implicits._
+    underlying.send(topic, partition, messages)
+  }
+
+  def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic,
+                                                                       kafka.api.ProducerRequest.RandomPartition,
+                                                                       messages)
+
+  def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) {
+    import kafka.javaapi.Implicits._
+    val produceRequests = new Array[kafka.api.ProducerRequest](produces.length)
+    for(i <- 0 until produces.length)
+      produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages)
+    underlying.multiSend(produceRequests)
+  }
+
+  def close() {
+    underlying.close
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.javaapi.producer.async;
+
+import kafka.producer.async.QueueItem;
+
+import java.util.Properties;
+
+/**
+ * Callback handler APIs for use in the async producer. The purpose is to
+ * give the user some callback handles to insert custom functionality at
+ * various stages as the data flows through the pipeline of the async producer
+ */
+public interface CallbackHandler<T> {
+    /**
+     * Initializes the callback handler using a Properties object
+     * @param props the properties used to initialize the callback handler
+    */
+    public void init(Properties props);
+
+    /**
+     * Callback to process the data before it enters the batching queue
+     * of the asynchronous producer
+     * @param data the data sent to the producer
+     * @return the processed data that enters the queue
+    */
+    public QueueItem<T> beforeEnqueue(QueueItem<T> data);
+
+    /**
+     * Callback to process the data just after it enters the batching queue
+     * of the asynchronous producer
+     * @param data the data sent to the producer
+     * @param added flag that indicates if the data was successfully added to the queue
+    */
+    public void afterEnqueue(QueueItem<T> data, boolean added);
+
+    /**
+     * Callback to process the data item right after it has been dequeued by the
+     * background sender thread of the asynchronous producer
+     * @param data the data item dequeued from the async producer queue
+     * @return the processed list of data items that gets added to the data handled by the event handler
+     */
+    public java.util.List<QueueItem<T>> afterDequeuingExistingData(QueueItem<T> data);
+
+    /**
+     * Callback to process the batched data right before it is being processed by the
+     * handle API of the event handler
+     * @param data the batched data received by the event handler
+     * @return the processed batched data that gets processed by the handle() API of the event handler
+    */
+    public java.util.List<QueueItem<T>> beforeSendingData(java.util.List<QueueItem<T>> data);
+
+    /**
+     * Callback to process the last batch of data right before the producer send thread is shutdown
+     * @return the last batch of data that is sent to the EventHandler
+    */
+    public java.util.List<QueueItem<T>> lastBatchBeforeClose();
+
+    /**
+     * Cleans up and shuts down the callback handler
+    */
+    public void close();
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.javaapi.producer.async;
+
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.async.QueueItem;
+import kafka.serializer.Encoder;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Handler that dispatches the batched data from the queue of the
+ * asynchronous producer.
+ */
+public interface EventHandler<T> {
+    /**
+     * Initializes the event handler using a Properties object
+     * @param props the properties used to initialize the event handler
+    */
+    public void init(Properties props);
+
+    /**
+     * Callback to dispatch the batched data and send it to a Kafka server
+     * @param events the data sent to the producer
+     * @param producer the low-level producer used to send the data
+    */
+    public void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder);
+
+    /**
+     * Cleans up and shuts down the event handler
+    */
+    public void close();
+}



Mime
View raw message