Return-Path: X-Original-To: apmail-incubator-kafka-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-kafka-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3E4CF72E2 for ; Mon, 1 Aug 2011 23:43:36 +0000 (UTC) Received: (qmail 81303 invoked by uid 500); 1 Aug 2011 23:43:36 -0000 Delivered-To: apmail-incubator-kafka-commits-archive@incubator.apache.org Received: (qmail 81282 invoked by uid 500); 1 Aug 2011 23:43:36 -0000 Mailing-List: contact kafka-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: kafka-dev@incubator.apache.org Delivered-To: mailing list kafka-commits@incubator.apache.org Received: (qmail 81275 invoked by uid 99); 1 Aug 2011 23:43:36 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2011 23:43:36 +0000 X-ASF-Spam-Status: No, hits=-1998.9 required=5.0 tests=ALL_TRUSTED,FRT_OFFER2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2011 23:43:32 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 857382388C06; Mon, 1 Aug 2011 23:42:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1152970 [16/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr... Date: Mon, 01 Aug 2011 23:42:17 -0000 To: kafka-commits@incubator.apache.org From: jkreps@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110801234227.857382388C06@eris.apache.org> Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,569 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import java.util.concurrent._ +import java.util.concurrent.atomic._ +import scala.collection._ +import org.apache.log4j.Logger +import kafka.cluster._ +import kafka.utils._ +import org.I0Itec.zkclient.exception.ZkNodeExistsException +import java.net.InetAddress +import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} +import org.apache.zookeeper.Watcher.Event.KeeperState +import kafka.api.OffsetRequest + +/** + * This class handles the consumers interaction with zookeeper + * + * Directories: + * 1. Consumer id registry: + * /consumers/[group_id]/ids[consumer_id] -> topic1,...topicN + * A consumer has a unique consumer id within a consumer group. A consumer registers its id as an ephemeral znode + * and puts all topics that it subscribes to as the value of the znode. The znode is deleted when the client is gone. + * A consumer subscribes to event changes of the consumer id registry within its group. + * + * The consumer id is picked up from configuration, instead of the sequential id assigned by ZK. Generated sequential + * ids are hard to recover during temporary connection loss to ZK, since it's difficult for the client to figure out + * whether the creation of a sequential znode has succeeded or not. More details can be found at + * (http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling) + * + * 2. Broker node registry: + * /brokers/[0...N] --> { "host" : "host:port", + * "topics" : {"topic1": ["partition1" ... "partitionN"], ..., + * "topicN": ["partition1" ... "partitionN"] } } + * This is a list of all present broker brokers. A unique logical node id is configured on each broker node. A broker + * node registers itself on start-up and creates a znode with the logical node id under /brokers. The value of the znode + * is a JSON String that contains (1) the host name and the port the broker is listening to, (2) a list of topics that + * the broker serves, (3) a list of logical partitions assigned to each topic on the broker. + * A consumer subscribes to event changes of the broker node registry. + * + * 3. Partition owner registry: + * /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id + * This stores the mapping before broker partitions and consumers. Each partition is owned by a unique consumer + * within a consumer group. The mapping is reestablished after each rebalancing. + * + * 4. Consumer offset tracking: + * /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value + * Each consumer tracks the offset of the latest message consumed for each partition. + * + */ +private[kafka] object ZookeeperConsumerConnector { + val MAX_N_RETRIES = 4 + val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L) +} + +/** + * JMX interface for monitoring consumer + */ +trait ZookeeperConsumerConnectorMBean { + def getPartOwnerStats: String + def getConsumerGroup: String + def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long + def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long + def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long +} + +private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, + val enableFetcher: Boolean) // for testing only + extends ConsumerConnector with ZookeeperConsumerConnectorMBean { + + private val logger = Logger.getLogger(getClass()) + private val isShuttingDown = new AtomicBoolean(false) + private val rebalanceLock = new Object + private var fetcher: Option[Fetcher] = None + private var zkClient: ZkClient = null + private val topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] + // queues : (topic,consumerThreadId) -> queue + private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] + private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false) + connectZk + createFetcher + if (config.autoCommit) { + logger.info("starting auto committer every " + config.autoCommitIntervalMs + " ms") + scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs) + } + + def this(config: ConsumerConfig) = this(config, true) + + def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]] = { + consume(topicCountMap) + } + + private def createFetcher() { + if (enableFetcher) + fetcher = Some(new Fetcher(config, zkClient)) + } + + private def connectZk() { + logger.info("Connecting to zookeeper instance at " + config.zkConnect) + zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer) + } + + def shutdown() { + val canShutdown = isShuttingDown.compareAndSet(false, true); + if (canShutdown) { + logger.info("ZKConsumerConnector shutting down") + try { + scheduler.shutdown + fetcher match { + case Some(f) => f.shutdown + case None => + } + sendShudownToAllQueues + if (zkClient != null) { + zkClient.close() + zkClient = null + } + } + catch { + case e => + logger.fatal(e) + logger.fatal(Utils.stackTrace(e)) + } + logger.info("ZKConsumerConnector shut down completed") + } + } + + def consume(topicCountMap: scala.collection.Map[String,Int]): Map[String,List[KafkaMessageStream]] = { + logger.debug("entering consume ") + if (topicCountMap == null) + throw new RuntimeException("topicCountMap is null") + + val dirs = new ZKGroupDirs(config.groupId) + var ret = new mutable.HashMap[String,List[KafkaMessageStream]] + + var consumerUuid : String = null + config.consumerId match { + case Some(consumerId) // for testing only + => consumerUuid = consumerId + case None // generate unique consumerId automatically + => consumerUuid = InetAddress.getLocalHost.getHostName + "-" + System.currentTimeMillis + } + val consumerIdString = config.groupId + "_" + consumerUuid + val topicCount = new TopicCount(consumerIdString, topicCountMap) + + // listener to consumer and partition changes + val loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString) + registerConsumerInZK(dirs, consumerIdString, topicCount) + zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) + + // create a queue per topic per consumer thread + val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic + for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) { + var streamList: List[KafkaMessageStream] = Nil + for (threadId <- threadIdSet) { + val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) + queues.put((topic, threadId), stream) + streamList ::= new KafkaMessageStream(stream, config.consumerTimeoutMs) + } + ret += (topic -> streamList) + logger.debug("adding topic " + topic + " and stream to map..") + + // register on broker partition path changes + val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic + ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath) + zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) + } + + // register listener for session expired event + zkClient.subscribeStateChanges( + new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener)) + + // explicitly trigger load balancing for this consumer + loadBalancerListener.syncedRebalance + ret + } + + private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = { + logger.info("begin registering consumer " + consumerIdString + " in ZK") + ZkUtils.createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString) + logger.info("end registering consumer " + consumerIdString + " in ZK") + } + + private def sendShudownToAllQueues() = { + for (queue <- queues.values) { + logger.debug("Clearing up queue") + queue.clear + queue.put(ZookeeperConsumerConnector.shutdownCommand) + logger.debug("Cleared queue and sent shutdown command") + } + } + + def autoCommit() { + if(logger.isTraceEnabled) + logger.trace("auto committing") + try { + commitOffsets + } + catch { + case t: Throwable => + // log it and let it go + logger.error("exception during autoCommit: ", t) + } + } + + def commitOffsets() { + if (zkClient == null) + return + for ((topic, infos) <- topicRegistry) { + val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) + for (info <- infos.values) { + val newOffset = info.getConsumeOffset + try { + ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name, + newOffset.toString) + } + catch { + case t: Throwable => + // log it and let it go + logger.warn("exception during commitOffsets: " + t + Utils.stackTrace(t)) + } + if(logger.isDebugEnabled) + logger.debug("Committed offset " + newOffset + " for topic " + info) + } + } + } + + // for JMX + def getPartOwnerStats(): String = { + val builder = new StringBuilder + for ((topic, infos) <- topicRegistry) { + builder.append("\n" + topic + ": [") + val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) + for(partition <- infos.values) { + builder.append("\n {") + builder.append{partition.partition.name} + builder.append(",fetch offset:" + partition.getFetchOffset) + builder.append(",consumer offset:" + partition.getConsumeOffset) + builder.append("}") + } + builder.append("\n ]") + } + builder.toString + } + + // for JMX + def getConsumerGroup(): String = config.groupId + + def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long = + getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId) + + def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = { + val partition = new Partition(brokerId, partitionId) + val partitionInfos = topicRegistry.get(topic) + if (partitionInfos != null) { + val partitionInfo = partitionInfos.get(partition) + if (partitionInfo != null) + return partitionInfo.getConsumeOffset + } + + //otherwise, try to get it from zookeeper + try { + val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) + val znode = topicDirs.consumerOffsetDir + "/" + partition.name + val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode) + if (offsetString != null) + return offsetString.toLong + else + return -1 + } + catch { + case e => + logger.error("error in getConsumedOffset JMX ", e) + } + return -2 + } + + def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = { + var simpleConsumer: SimpleConsumer = null + var producedOffset: Long = -1L + try { + val cluster = ZkUtils.getCluster(zkClient) + val broker = cluster.getBroker(brokerId) + simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout, + ConsumerConfig.SocketBufferSize) + val latestOffset = simpleConsumer.getOffsetsBefore(topic, partitionId, + OffsetRequest.LatestTime, 1) + producedOffset = latestOffset(0) + } + catch { + case e => + logger.error("error in getLatestOffset jmx ", e) + } + finally { + if (simpleConsumer != null) + simpleConsumer.close + } + producedOffset + } + + class ZKSessionExpireListenner(val dirs: ZKGroupDirs, + val consumerIdString: String, + val topicCount: TopicCount, + val loadBalancerListener: ZKRebalancerListener) + extends IZkStateListener { + @throws(classOf[Exception]) + def handleStateChanged(state: KeeperState) { + // do nothing, since zkclient will do reconnect for us. + } + + /** + * Called after the zookeeper session has expired and a new session has been created. You would have to re-create + * any ephemeral nodes here. + * + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleNewSession() { + /** + * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a + * connection for us. We need to release the ownership of the current consumer and re-register this + * consumer in the consumer registry and trigger a rebalance. + */ + logger.info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString) + loadBalancerListener.resetState + registerConsumerInZK(dirs, consumerIdString, topicCount) + // explicitly trigger load balancing for this consumer + loadBalancerListener.syncedRebalance + + // There is no need to resubscribe to child and state changes. + // The child change watchers will be set inside rebalance when we read the children list. + } + + } + + class ZKRebalancerListener(val group: String, val consumerIdString: String) + extends IZkChildListener { + private val dirs = new ZKGroupDirs(group) + private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() + private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() + + @throws(classOf[Exception]) + def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + syncedRebalance + } + + private def releasePartitionOwnership() { + for ((topic, infos) <- topicRegistry) { + val topicDirs = new ZKGroupTopicDirs(group, topic) + for(partition <- infos.keys) { + val znode = topicDirs.consumerOwnerDir + "/" + partition + ZkUtils.deletePath(zkClient, znode) + if(logger.isDebugEnabled) + logger.debug("Consumer " + consumerIdString + " releasing " + znode) + } + } + } + + private def getConsumersPerTopic(group: String) : mutable.Map[String, List[String]] = { + val consumers = ZkUtils.getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir) + val consumersPerTopicMap = new mutable.HashMap[String, List[String]] + for (consumer <- consumers) { + val topicCount = getTopicCount(consumer) + for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) { + for (consumerThreadId <- consumerThreadIdSet) + consumersPerTopicMap.get(topic) match { + case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers) + case _ => consumersPerTopicMap.put(topic, List(consumerThreadId)) + } + } + } + for ( (topic, consumerList) <- consumersPerTopicMap ) + consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t)) + consumersPerTopicMap + } + + private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]], + newPartMap: Map[String,List[String]], + oldPartMap: Map[String,List[String]], + newConsumerMap: Map[String,List[String]], + oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = { + var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]() + for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap ) + if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic)) + relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet) + relevantTopicThreadIdsMap + } + + private def getTopicCount(consumerId: String) : TopicCount = { + val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId) + TopicCount.constructTopicCount(consumerId, topicCountJson) + } + + def resetState() { + topicRegistry.clear + oldConsumersPerTopicMap.clear + oldPartitionsPerTopicMap.clear + } + + def syncedRebalance() { + rebalanceLock synchronized { + for (i <- 0 until ZookeeperConsumerConnector.MAX_N_RETRIES) { + logger.info("begin rebalancing consumer " + consumerIdString + " try #" + i) + var done = false + try { + done = rebalance + } + catch { + case e => + // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. + // For example, a ZK node can disappear between the time we get all children and the time we try to get + // the value of a child. Just let this go since another rebalance will be triggered. + logger.info("exception during rebalance " + e) + } + logger.info("end rebalancing consumer " + consumerIdString + " try #" + i) + if (done) + return + // release all partitions, reset state and retry + releasePartitionOwnership + resetState + Thread.sleep(config.zkSyncTimeMs) + } + } + + throw new RuntimeException(consumerIdString + " can't rebalance after " + ZookeeperConsumerConnector.MAX_N_RETRIES +" retires") + } + + private def rebalance(): Boolean = { + // testing code + //if ("group1_consumer1" == consumerIdString) { + // logger.info("sleeping " + consumerIdString) + // Thread.sleep(20) + //} + + val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic + val cluster = ZkUtils.getCluster(zkClient) + val consumersPerTopicMap = getConsumersPerTopic(group) + val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator) + val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap) + if (relevantTopicThreadIdsMap.size <= 0) { + logger.info("Consumer " + consumerIdString + " with " + consumersPerTopicMap + " doesn't need to rebalance.") + return true + } + + logger.info("Committing all offsets") + commitOffsets + + logger.info("Releasing partition ownership") + releasePartitionOwnership + + val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]] + for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) { + topicRegistry.remove(topic) + topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo]) + + val topicDirs = new ZKGroupTopicDirs(group, topic) + val curConsumers = consumersPerTopicMap.get(topic).get + var curPartitions: List[String] = partitionsPerTopicMap.get(topic).get + + val nPartsPerConsumer = curPartitions.size / curConsumers.size + val nConsumersWithExtraPart = curPartitions.size % curConsumers.size + + logger.info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + + " for topic " + topic + " with consumers: " + curConsumers) + + for (consumerThreadId <- consumerThreadIdSet) { + val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId) + assert(myConsumerPosition >= 0) + val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) + val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) + + /** + * Range-partition the sorted partitions to consumers for better locality. + * The first few consumers pick up an extra partition, if any. + */ + if (nParts <= 0) + logger.warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) + else { + for (i <- startPart until startPart + nParts) { + val partition = curPartitions(i) + logger.info(consumerThreadId + " attempting to claim partition " + partition) + if (!processPartition(topicDirs, partition, topic, consumerThreadId)) + return false + } + queuesToBeCleared += queues.get((topic, consumerThreadId)) + } + } + } + updateFetcher(cluster, queuesToBeCleared) + oldPartitionsPerTopicMap = partitionsPerTopicMap + oldConsumersPerTopicMap = consumersPerTopicMap + true + } + + private def updateFetcher(cluster: Cluster, queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) { + // update partitions for fetcher + var allPartitionInfos : List[PartitionTopicInfo] = Nil + for (partitionInfos <- topicRegistry.values) + for (partition <- partitionInfos.values) + allPartitionInfos ::= partition + logger.info("Consumer " + consumerIdString + " selected partitions : " + + allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(",")) + + fetcher match { + case Some(f) => f.initConnections(allPartitionInfos, cluster, queuesTobeCleared) + case None => + } + } + + private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String, + topic: String, consumerThreadId: String) : Boolean = { + val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition + try { + ZkUtils.createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId) + } + catch { + case e: ZkNodeExistsException => + // The node hasn't been deleted by the original owner. So wait a bit and retry. + logger.info("waiting for the partition ownership to be deleted: " + partition) + return false + case e2 => throw e2 + } + addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId) + true + } + + private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String, + topic: String, consumerThreadId: String) { + val partition = Partition.parse(partitionString) + val partTopicInfoMap = topicRegistry.get(topic) + + val znode = topicDirs.consumerOffsetDir + "/" + partition.name + val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode) + // If first time starting a consumer, use default offset. + // TODO: handle this better (if client doesn't know initial offsets) + val offset : Long = if (offsetString == null) Long.MaxValue else offsetString.toLong + val queue = queues.get((topic, consumerThreadId)) + val consumedOffset = new AtomicLong(offset) + val fetchedOffset = new AtomicLong(offset) + val partTopicInfo = new PartitionTopicInfo(topic, + partition.brokerId, + partition, + queue, + consumedOffset, + fetchedOffset, + new AtomicInteger(config.fetchSize)) + partTopicInfoMap.put(partition, partTopicInfo) + if (logger.isDebugEnabled) + logger.debug(partTopicInfo + " selected new offset " + offset) + } + } +} + Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/package.html Mon Aug 1 23:41:24 2011 @@ -0,0 +1 @@ +This is the consumer API for kafka. \ No newline at end of file Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/MemoryOffsetStorage.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,43 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer.storage + +import java.util.concurrent._ +import java.util.concurrent.atomic._ +import java.util.concurrent.locks._ + +class MemoryOffsetStorage extends OffsetStorage { + + val offsetAndLock = new ConcurrentHashMap[(Int, String), (AtomicLong, Lock)] + + def reserve(node: Int, topic: String): Long = { + val key = (node, topic) + if(!offsetAndLock.containsKey(key)) + offsetAndLock.putIfAbsent(key, (new AtomicLong(0), new ReentrantLock)) + val (offset, lock) = offsetAndLock.get(key) + lock.lock + offset.get + } + + def commit(node: Int, topic: String, offset: Long) = { + val (highwater, lock) = offsetAndLock.get((node, topic)) + highwater.set(offset) + lock.unlock + offset + } + +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,40 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer.storage + +import kafka.utils.Range + +/** + * A method for storing offsets for the consumer. + * This is used to track the progress of the consumer in the stream. + */ +trait OffsetStorage { + + /** + * Reserve a range of the length given by increment. + * @param increment The size to reserver + * @return The range reserved + */ + def reserve(node: Int, topic: String): Long + + /** + * Update the offset to the new offset + * @param offset The new offset + */ + def commit(node: Int, topic: String, offset: Long) + +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,157 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer.storage.sql + +import java.sql._ +import org.apache.log4j._ +import kafka.utils._ +import kafka.consumer.storage.OffsetStorage + +/** + * An offset storage implementation that uses an oracle database to save offsets + */ +@nonthreadsafe +class OracleOffsetStorage(val connection: Connection) extends OffsetStorage { + + private val logger: Logger = Logger.getLogger(classOf[OracleOffsetStorage]) + private val lock = new Object + connection.setAutoCommit(false) + + def reserve(node: Int, topic: String): Long = { + /* try to get and lock the offset, if it isn't there, make it */ + val maybeOffset = selectExistingOffset(connection, node, topic) + val offset = maybeOffset match { + case Some(offset) => offset + case None => { + maybeInsertZeroOffset(connection, node, topic) + selectExistingOffset(connection, node, topic).get + } + } + + if(logger.isDebugEnabled) + logger.debug("Reserved node " + node + " for topic '" + topic + " offset " + offset) + + offset + } + + def commit(node: Int, topic: String, offset: Long) { + var success = false + try { + updateOffset(connection, node, topic, offset) + success = true + } finally { + commitOrRollback(connection, success) + } + if(logger.isDebugEnabled) + logger.debug("Updated node " + node + " for topic '" + topic + "' to " + offset) + } + + def close() { + Utils.swallow(logger.error, connection.close()) + } + + /** + * Attempt to update an existing entry in the table if there isn't already one there + * @return true iff the row didn't already exist + */ + private def maybeInsertZeroOffset(connection: Connection, node: Int, topic: String): Boolean = { + val stmt = connection.prepareStatement( + """insert into kafka_offsets (node, topic, offset) + select ?, ?, 0 from dual where not exists + (select null from kafka_offsets where node = ? and topic = ?)""") + stmt.setInt(1, node) + stmt.setString(2, topic) + stmt.setInt(3, node) + stmt.setString(4, topic) + val updated = stmt.executeUpdate() + if(updated > 1) + throw new IllegalStateException("More than one key updated by primary key!") + else + updated == 1 + } + + /** + * Attempt to update an existing entry in the table + * @return true iff we updated an entry + */ + private def selectExistingOffset(connection: Connection, node: Int, topic: String): Option[Long] = { + val stmt = connection.prepareStatement( + """select offset from kafka_offsets + where node = ? and topic = ? + for update""") + var results: ResultSet = null + try { + stmt.setInt(1, node) + stmt.setString(2, topic) + results = stmt.executeQuery() + if(!results.next()) { + None + } else { + val offset = results.getLong("offset") + if(results.next()) + throw new IllegalStateException("More than one entry for primary key!") + Some(offset) + } + } finally { + close(stmt) + close(results) + } + } + + private def updateOffset(connection: Connection, + node: Int, + topic: String, + newOffset: Long): Unit = { + val stmt = connection.prepareStatement("update kafka_offsets set offset = ? where node = ? and topic = ?") + try { + stmt.setLong(1, newOffset) + stmt.setInt(2, node) + stmt.setString(3, topic) + val updated = stmt.executeUpdate() + if(updated != 1) + throw new IllegalStateException("Unexpected number of keys updated: " + updated) + } finally { + close(stmt) + } + } + + + private def commitOrRollback(connection: Connection, commit: Boolean) { + if(connection != null) { + if(commit) + Utils.swallow(logger.error, connection.commit()) + else + Utils.swallow(logger.error, connection.rollback()) + } + } + + private def close(rs: ResultSet) { + if(rs != null) + Utils.swallow(logger.error, rs.close()) + } + + private def close(stmt: PreparedStatement) { + if(stmt != null) + Utils.swallow(logger.error, stmt.close()) + } + + private def close(connection: Connection) { + if(connection != null) + Utils.swallow(logger.error, connection.close()) + } + +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,128 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.javaapi + +import java.nio.ByteBuffer +import org.apache.log4j.Logger +import kafka.serializer.Encoder +import kafka.producer.{ProducerConfig, ProducerPool} +import kafka.producer.async.{AsyncProducerConfig, QueueItem} + +private[javaapi] object Implicits { + private val logger = Logger.getLogger(getClass()) + + implicit def javaMessageSetToScalaMessageSet(messageSet: kafka.javaapi.message.ByteBufferMessageSet): + kafka.message.ByteBufferMessageSet = messageSet.underlying + + implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet): + kafka.javaapi.message.ByteBufferMessageSet = { + new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset, + messageSet.getErrorCode) + } + + implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = { + if(logger.isDebugEnabled) + logger.debug("Implicit instantiation of Java Sync Producer") + new kafka.javaapi.producer.SyncProducer(producer) + } + + implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = { + if(logger.isDebugEnabled) + logger.debug("Implicit instantiation of Sync Producer") + producer.underlying + } + + implicit def toScalaEventHandler[T](eventHandler: kafka.javaapi.producer.async.EventHandler[T]) + : kafka.producer.async.EventHandler[T] = { + new kafka.producer.async.EventHandler[T] { + override def init(props: java.util.Properties) { eventHandler.init(props) } + override def handle(events: Seq[QueueItem[T]], producer: kafka.producer.SyncProducer, encoder: Encoder[T]) { + import collection.JavaConversions._ + eventHandler.handle(asList(events), producer, encoder) + } + override def close { eventHandler.close } + } + } + + implicit def toJavaEventHandler[T](eventHandler: kafka.producer.async.EventHandler[T]) + : kafka.javaapi.producer.async.EventHandler[T] = { + new kafka.javaapi.producer.async.EventHandler[T] { + override def init(props: java.util.Properties) { eventHandler.init(props) } + override def handle(events: java.util.List[QueueItem[T]], producer: kafka.javaapi.producer.SyncProducer, + encoder: Encoder[T]) { + import collection.JavaConversions._ + eventHandler.handle(asBuffer(events), producer, encoder) + } + override def close { eventHandler.close } + } + } + + implicit def toScalaCbkHandler[T](cbkHandler: kafka.javaapi.producer.async.CallbackHandler[T]) + : kafka.producer.async.CallbackHandler[T] = { + new kafka.producer.async.CallbackHandler[T] { + import collection.JavaConversions._ + override def init(props: java.util.Properties) { cbkHandler.init(props)} + override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = { + cbkHandler.beforeEnqueue(data) + } + override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) { + cbkHandler.afterEnqueue(data, added) + } + override def afterDequeuingExistingData(data: QueueItem[T] = null): scala.collection.mutable.Seq[QueueItem[T]] = { + cbkHandler.afterDequeuingExistingData(data) + } + override def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]] = { + asList(cbkHandler.beforeSendingData(asList(data))) + } + override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]] = { + asBuffer(cbkHandler.lastBatchBeforeClose) + } + override def close { cbkHandler.close } + } + } + + implicit def toJavaCbkHandler[T](cbkHandler: kafka.producer.async.CallbackHandler[T]) + : kafka.javaapi.producer.async.CallbackHandler[T] = { + new kafka.javaapi.producer.async.CallbackHandler[T] { + import collection.JavaConversions._ + override def init(props: java.util.Properties) { cbkHandler.init(props)} + override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = { + cbkHandler.beforeEnqueue(data) + } + override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) { + cbkHandler.afterEnqueue(data, added) + } + override def afterDequeuingExistingData(data: QueueItem[T] = null) + : java.util.List[QueueItem[T]] = { + asList(cbkHandler.afterDequeuingExistingData(data)) + } + override def beforeSendingData(data: java.util.List[QueueItem[T]] = null) + : java.util.List[QueueItem[T]] = { + asBuffer(cbkHandler.beforeSendingData(asBuffer(data))) + } + override def lastBatchBeforeClose: java.util.List[QueueItem[T]] = { + asList(cbkHandler.lastBatchBeforeClose) + } + override def close { cbkHandler.close } + } + } + + implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse = + response.underlying + + implicit def toJavaMultiFetchResponse(response: kafka.api.MultiFetchResponse): kafka.javaapi.MultiFetchResponse = + new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets, response.offsets) +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,44 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.javaapi + +import kafka.utils.IteratorTemplate +import java.nio.ByteBuffer +import message.ByteBufferMessageSet + +class MultiFetchResponse(buffer: ByteBuffer, numSets: Int, offsets: Array[Long]) extends java.lang.Iterable[ByteBufferMessageSet] { + val underlyingBuffer = ByteBuffer.wrap(buffer.array) + // this has the side effect of setting the initial position of buffer correctly + val errorCode = underlyingBuffer.getShort + + import Implicits._ + val underlying = new kafka.api.MultiFetchResponse(underlyingBuffer, numSets, offsets) + + override def toString() = underlying.toString + + def iterator : java.util.Iterator[ByteBufferMessageSet] = { + new IteratorTemplate[ByteBufferMessageSet] { + val iter = underlying.iterator + override def makeNext(): ByteBufferMessageSet = { + if(iter.hasNext) + iter.next + else + return allDone + } + } + } +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/ProducerRequest.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,51 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.javaapi + +import kafka.network.Request +import kafka.api.RequestKeys +import java.nio.ByteBuffer + +class ProducerRequest(val topic: String, + val partition: Int, + val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) { + import Implicits._ + private val underlying = new kafka.api.ProducerRequest(topic, partition, messages) + + def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } + + def sizeInBytes(): Int = underlying.sizeInBytes + + def getTranslatedPartition(randomSelector: String => Int): Int = + underlying.getTranslatedPartition(randomSelector) + + override def toString: String = + underlying.toString + + override def equals(other: Any): Boolean = { + other match { + case that: ProducerRequest => + (that canEqual this) && topic == that.topic && partition == that.partition && + messages.equals(that.messages) + case _ => false + } + } + + def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] + + override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode + +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Mon Aug 1 23:41:24 2011 @@ -0,0 +1,43 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.javaapi.consumer; + +import kafka.consumer.KafkaMessageStream; + +import java.util.List; +import java.util.Map; + +public interface ConsumerConnector { + /** + * Create a list of MessageStreams for each topic. + * + * @param topicCountMap a map of (topic, #streams) pair + * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the + * list is #streams. Each KafkaMessageStream supports an iterator of messages. + */ + public Map> createMessageStreams(Map topicCountMap); + + /** + * Commit the offsets of all broker partitions connected by this connector. + */ + public void commitOffsets(); + + /** + * Shut down the connector + */ + public void shutdown(); +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,70 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.javaapi.consumer + +import kafka.utils.threadsafe +import kafka.javaapi.message.ByteBufferMessageSet +import kafka.javaapi.MultiFetchResponse +import kafka.api.FetchRequest + +/** + * A consumer of kafka messages + */ +@threadsafe +class SimpleConsumer(val host: String, + val port: Int, + val soTimeout: Int, + val bufferSize: Int) { + val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize) + + /** + * Fetch a set of messages from a topic. + * + * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. + * @return a set of fetched messages + */ + def fetch(request: FetchRequest): ByteBufferMessageSet = { + import kafka.javaapi.Implicits._ + underlying.fetch(request) + } + + /** + * Combine multiple fetch requests in one call. + * + * @param fetches a sequence of fetch requests. + * @return a sequence of fetch responses + */ + def multifetch(fetches: java.util.List[FetchRequest]): MultiFetchResponse = { + import scala.collection.JavaConversions._ + import kafka.javaapi.Implicits._ + underlying.multifetch(asBuffer(fetches): _*) + } + + /** + * Get a list of valid offsets (up to maxSize) before the given time. + * The result is a list of offsets, in descending order. + * + * @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available) + * @return an array of offsets + */ + def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = + underlying.getOffsetsBefore(topic, partition, time, maxNumOffsets) + + def close() { + underlying.close + } +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,88 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.javaapi.consumer + +import kafka.consumer.{KafkaMessageStream, ConsumerConfig} + +/** + * This class handles the consumers interaction with zookeeper + * + * Directories: + * 1. Consumer id registry: + * /consumers/[group_id]/ids[consumer_id] -> topic1,...topicN + * A consumer has a unique consumer id within a consumer group. A consumer registers its id as an ephemeral znode + * and puts all topics that it subscribes to as the value of the znode. The znode is deleted when the client is gone. + * A consumer subscribes to event changes of the consumer id registry within its group. + * + * The consumer id is picked up from configuration, instead of the sequential id assigned by ZK. Generated sequential + * ids are hard to recover during temporary connection loss to ZK, since it's difficult for the client to figure out + * whether the creation of a sequential znode has succeeded or not. More details can be found at + * (http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling) + * + * 2. Broker node registry: + * /brokers/[0...N] --> { "host" : "host:port", + * "topics" : {"topic1": ["partition1" ... "partitionN"], ..., + * "topicN": ["partition1" ... "partitionN"] } } + * This is a list of all present broker brokers. A unique logical node id is configured on each broker node. A broker + * node registers itself on start-up and creates a znode with the logical node id under /brokers. The value of the znode + * is a JSON String that contains (1) the host name and the port the broker is listening to, (2) a list of topics that + * the broker serves, (3) a list of logical partitions assigned to each topic on the broker. + * A consumer subscribes to event changes of the broker node registry. + * + * 3. Partition owner registry: + * /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id + * This stores the mapping before broker partitions and consumers. Each partition is owned by a unique consumer + * within a consumer group. The mapping is reestablished after each rebalancing. + * + * 4. Consumer offset tracking: + * /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value + * Each consumer tracks the offset of the latest message consumed for each partition. + * +*/ + +private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, + val enableFetcher: Boolean) // for testing only + extends ConsumerConnector { + + val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher) + + def this(config: ConsumerConfig) = this(config, true) + + // for java client + def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): + java.util.Map[String,java.util.List[KafkaMessageStream]] = { + import scala.collection.JavaConversions._ + + val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]]) + val scalaReturn = underlying.consume(scalaTopicCountMap) + val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream]] + for ((topic, streams) <- scalaReturn) { + var javaStreamList = new java.util.ArrayList[KafkaMessageStream] + for (stream <- streams) + javaStreamList.add(stream) + ret.put(topic, javaStreamList) + } + ret + } + + def commitOffsets() { + underlying.commitOffsets + } + + def shutdown() { + underlying.shutdown + } +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,96 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.javaapi.message + +import java.nio.ByteBuffer +import kafka.common.ErrorMapping +import org.apache.log4j.Logger +import kafka.message._ + +class ByteBufferMessageSet(private val buffer: ByteBuffer, + private val initialOffset: Long = 0L, + private val errorCode: Int = ErrorMapping.NoError) extends MessageSet { + private val logger = Logger.getLogger(getClass()) + val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, + initialOffset, + errorCode) + def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError) + + def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { + this(compressionCodec match { + case NoCompressionCodec => + val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) + val messageIterator = messages.iterator + while(messageIterator.hasNext) { + val message = messageIterator.next + message.serializeTo(buffer) + } + buffer.rewind + buffer + case _ => + import scala.collection.JavaConversions._ + val message = CompressionUtils.compress(asBuffer(messages), compressionCodec) + val buffer = ByteBuffer.allocate(message.serializedSize) + message.serializeTo(buffer) + buffer.rewind + buffer + }, 0L, ErrorMapping.NoError) + } + + def this(messages: java.util.List[Message]) { + this(NoCompressionCodec, messages) + } + + def validBytes: Long = underlying.validBytes + + def serialized():ByteBuffer = underlying.serialized + + def getInitialOffset = initialOffset + + def getBuffer = buffer + + def getErrorCode = errorCode + + override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] { + val underlyingIterator = underlying.iterator + override def hasNext(): Boolean = { + underlyingIterator.hasNext + } + + override def next(): MessageAndOffset = { + underlyingIterator.next + } + + override def remove = throw new UnsupportedOperationException("remove API on MessageSet is not supported") + } + + override def toString: String = underlying.toString + + def sizeInBytes: Long = underlying.sizeInBytes + + override def equals(other: Any): Boolean = { + other match { + case that: ByteBufferMessageSet => + (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset + case _ => false + } + } + + def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet] + + override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + initialOffset.hashCode + +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,53 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.javaapi.message + +import java.nio.channels.WritableByteChannel +import kafka.message.{MessageAndOffset, InvalidMessageException, Message} + +/** + * A set of messages. A message set has a fixed serialized form, though the container + * for the bytes could be either in-memory or on disk. A The format of each message is + * as follows: + * 4 byte size containing an integer N + * N message bytes as described in the message class + */ +abstract class MessageSet extends java.lang.Iterable[MessageAndOffset] { + + /** + * Provides an iterator over the messages in this set + */ + def iterator: java.util.Iterator[MessageAndOffset] + + /** + * Gives the total size of this message set in bytes + */ + def sizeInBytes: Long + + /** + * Validate the checksum of all the messages in the set. Throws an InvalidMessageException if the checksum doesn't + * match the payload for any message. + */ + def validate(): Unit = { + val thisIterator = this.iterator + while(thisIterator.hasNext) { + val messageAndOffset = thisIterator.next + if(!messageAndOffset.message.isValid) + throw new InvalidMessageException + } + } +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,122 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.javaapi.producer + +import kafka.utils.Utils +import kafka.producer.async.QueueItem +import java.util.Properties +import kafka.producer.{ProducerPool, ProducerConfig, Partitioner} +import kafka.serializer.Encoder + +class Producer[K,V](config: ProducerConfig, + partitioner: Partitioner[K], + producerPool: ProducerPool[V], + populateProducerPool: Boolean = true) /* for testing purpose only. Applications should ideally */ + /* use the other constructor*/ +{ + + private val underlying = new kafka.producer.Producer[K,V](config, partitioner, producerPool, populateProducerPool, null) + + /** + * This constructor can be used when all config parameters will be specified through the + * ProducerConfig object + * @param config Producer Configuration object + */ + def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass), + new ProducerPool[V](config, Utils.getObject(config.serializerClass))) + + /** + * This constructor can be used to provide pre-instantiated objects for all config parameters + * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and + * callback handler + * @param config Producer Configuration object + * @param encoder Encoder used to convert an object of type V to a kafka.message.Message + * @param eventHandler the class that implements kafka.javaapi.producer.async.IEventHandler[T] used to + * dispatch a batch of produce requests, using an instance of kafka.javaapi.producer.SyncProducer + * @param cbkHandler the class that implements kafka.javaapi.producer.async.CallbackHandler[T] used to inject + * callbacks at various stages of the kafka.javaapi.producer.AsyncProducer pipeline. + * @param partitioner class that implements the kafka.javaapi.producer.Partitioner[K], used to supply a custom + * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T] + * object in the send API + */ + import kafka.javaapi.Implicits._ + def this(config: ProducerConfig, + encoder: Encoder[V], + eventHandler: kafka.javaapi.producer.async.EventHandler[V], + cbkHandler: kafka.javaapi.producer.async.CallbackHandler[V], + partitioner: Partitioner[K]) = { + this(config, partitioner, + new ProducerPool[V](config, encoder, + new kafka.producer.async.EventHandler[V] { + override def init(props: Properties) { eventHandler.init(props) } + override def handle(events: Seq[QueueItem[V]], producer: kafka.producer.SyncProducer, + encoder: Encoder[V]) { + import collection.JavaConversions._ + import kafka.javaapi.Implicits._ + eventHandler.handle(asList(events), producer, encoder) + } + override def close { eventHandler.close } + }, + new kafka.producer.async.CallbackHandler[V] { + import collection.JavaConversions._ + override def init(props: Properties) { cbkHandler.init(props)} + override def beforeEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]]): QueueItem[V] = { + cbkHandler.beforeEnqueue(data) + } + override def afterEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]], added: Boolean) { + cbkHandler.afterEnqueue(data, added) + } + override def afterDequeuingExistingData(data: QueueItem[V] = null): scala.collection.mutable.Seq[QueueItem[V]] = { + cbkHandler.afterDequeuingExistingData(data) + } + override def beforeSendingData(data: Seq[QueueItem[V]] = null): scala.collection.mutable.Seq[QueueItem[V]] = { + asList(cbkHandler.beforeSendingData(asList(data))) + } + override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[V]] = { + asBuffer(cbkHandler.lastBatchBeforeClose) + } + override def close { cbkHandler.close } + })) + } + + /** + * Sends the data to a single topic, partitioned by key, using either the + * synchronous or the asynchronous producer + * @param producerData the producer data object that encapsulates the topic, key and message data + */ + def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) { + import collection.JavaConversions._ + underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey, + asBuffer(producerData.getData))) + } + + /** + * Use this API to send data to multiple topics + * @param producerData list of producer data objects that encapsulate the topic, key and message data + */ + def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) { + import collection.JavaConversions._ + underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey, + asBuffer(pd.getData))): _*) + } + + /** + * Close API to close the producer pool connections to all Kafka brokers. Also closes + * the zookeeper client connection if one exists + */ + def close = underlying.close +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,33 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.javaapi.producer + +import scala.collection.JavaConversions._ + +class ProducerData[K, V](private val topic: String, + private val key: K, + private val data: java.util.List[V]) { + + def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d) + + def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d))) + + def getTopic: String = topic + + def getKey: K = key + + def getData: java.util.List[V] = data +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala Mon Aug 1 23:41:24 2011 @@ -0,0 +1,47 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.javaapi.producer + +import kafka.producer.SyncProducerConfig +import kafka.javaapi.message.ByteBufferMessageSet + +class SyncProducer(syncProducer: kafka.producer.SyncProducer) { + + def this(config: SyncProducerConfig) = this(new kafka.producer.SyncProducer(config)) + + val underlying = syncProducer + + def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { + import kafka.javaapi.Implicits._ + underlying.send(topic, partition, messages) + } + + def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, + kafka.api.ProducerRequest.RandomPartition, + messages) + + def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) { + import kafka.javaapi.Implicits._ + val produceRequests = new Array[kafka.api.ProducerRequest](produces.length) + for(i <- 0 until produces.length) + produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages) + underlying.multiSend(produceRequests) + } + + def close() { + underlying.close + } +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java Mon Aug 1 23:41:24 2011 @@ -0,0 +1,76 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.javaapi.producer.async; + +import kafka.producer.async.QueueItem; + +import java.util.Properties; + +/** + * Callback handler APIs for use in the async producer. The purpose is to + * give the user some callback handles to insert custom functionality at + * various stages as the data flows through the pipeline of the async producer + */ +public interface CallbackHandler { + /** + * Initializes the callback handler using a Properties object + * @param props the properties used to initialize the callback handler + */ + public void init(Properties props); + + /** + * Callback to process the data before it enters the batching queue + * of the asynchronous producer + * @param data the data sent to the producer + * @return the processed data that enters the queue + */ + public QueueItem beforeEnqueue(QueueItem data); + + /** + * Callback to process the data just after it enters the batching queue + * of the asynchronous producer + * @param data the data sent to the producer + * @param added flag that indicates if the data was successfully added to the queue + */ + public void afterEnqueue(QueueItem data, boolean added); + + /** + * Callback to process the data item right after it has been dequeued by the + * background sender thread of the asynchronous producer + * @param data the data item dequeued from the async producer queue + * @return the processed list of data items that gets added to the data handled by the event handler + */ + public java.util.List> afterDequeuingExistingData(QueueItem data); + + /** + * Callback to process the batched data right before it is being processed by the + * handle API of the event handler + * @param data the batched data received by the event handler + * @return the processed batched data that gets processed by the handle() API of the event handler + */ + public java.util.List> beforeSendingData(java.util.List> data); + + /** + * Callback to process the last batch of data right before the producer send thread is shutdown + * @return the last batch of data that is sent to the EventHandler + */ + public java.util.List> lastBatchBeforeClose(); + + /** + * Cleans up and shuts down the callback handler + */ + public void close(); +} Added: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java (added) +++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java Mon Aug 1 23:41:24 2011 @@ -0,0 +1,47 @@ +/* + * Copyright 2010 LinkedIn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.javaapi.producer.async; + +import kafka.javaapi.producer.SyncProducer; +import kafka.producer.async.QueueItem; +import kafka.serializer.Encoder; + +import java.util.List; +import java.util.Properties; + +/** + * Handler that dispatches the batched data from the queue of the + * asynchronous producer. + */ +public interface EventHandler { + /** + * Initializes the event handler using a Properties object + * @param props the properties used to initialize the event handler + */ + public void init(Properties props); + + /** + * Callback to dispatch the batched data and send it to a Kafka server + * @param events the data sent to the producer + * @param producer the low-level producer used to send the data + */ + public void handle(List> events, SyncProducer producer, Encoder encoder); + + /** + * Cleans up and shuts down the event handler + */ + public void close(); +}