kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1365199 [1/3] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/sca...
Date Tue, 24 Jul 2012 18:13:04 GMT
Author: nehanarkhede
Date: Tue Jul 24 18:13:01 2012
New Revision: 1365199

URL: http://svn.apache.org/viewvc?rev=1365199&view=rev
Log:
KAFKA-350 Enable message replication in the presence of failures; patched by Neha Narkhede; reviewed by Jun Rao and Jay Kreps

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoLeaderForPartitionException.scala
Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java
    incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
    incubator/kafka/branches/0.8/system_test/common/util.sh
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRecordReader.java Tue Jul 24 18:13:01 2012
@@ -19,6 +19,8 @@ package kafka.etl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+
+import kafka.common.KafkaException;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
@@ -63,14 +65,12 @@ extends SequenceFileRecordReader<KafkaET
            /*get attemp id*/
            String taskId = _job.get("mapred.task.id");
            if (taskId == null) {
-               throw new IllegalArgumentException(
-                                 "Configutaion does not contain the property mapred.task.id");
+               throw new KafkaException("Configuration does not contain the property mapred.task.id");
            }
            String[] parts = taskId.split("_");
            if (    parts.length != 6 || !parts[0].equals("attempt") 
                 || (!"m".equals(parts[3]) && !"r".equals(parts[3]))) {
-                   throw new IllegalArgumentException(
-                                 "TaskAttemptId string : " + taskId + " is not properly formed");
+                   throw new KafkaException("TaskAttemptId string : " + taskId + " is not properly formed");
            }
           _attemptId = parts[4]+parts[3];
        }catch (Exception e) {

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/Props.java Tue Jul 24 18:13:01 2012
@@ -33,6 +33,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+
+import kafka.common.KafkaException;
 import org.apache.log4j.Logger;
 
 public class Props extends Properties {
@@ -122,7 +124,7 @@ public class Props extends Properties {
 	@SuppressWarnings("unchecked")
 	public static Props of(String... args) {
 		if (args.length % 2 != 0)
-			throw new IllegalArgumentException(
+			throw new KafkaException(
 					"Must have an equal number of keys and values.");
 		Map<String, String> vals = new HashMap<String, String>(args.length / 2);
 		for (int i = 0; i < args.length; i += 2)

Modified: incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java Tue Jul 24 18:13:01 2012
@@ -20,6 +20,8 @@ package kafka.bridge.hadoop;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Properties;
+
+import kafka.common.KafkaException;
 import kafka.javaapi.producer.Producer;
 import kafka.message.Message;
 import kafka.producer.ProducerConfig;
@@ -93,7 +95,7 @@ public class KafkaOutputFormat<W extends
   {
     Path outputPath = getOutputPath(context);
     if (outputPath == null)
-      throw new IllegalArgumentException("no kafka output url specified");
+      throw new KafkaException("no kafka output url specified");
     URI uri = URI.create(outputPath.toString());
     Configuration job = context.getConfiguration();
 
@@ -135,7 +137,7 @@ public class KafkaOutputFormat<W extends
 
       topic = uri.getFragment();
       if (topic == null)
-        throw new IllegalArgumentException("no topic specified in kafka uri fragment");
+        throw new KafkaException("no topic specified in kafka uri fragment");
 
       log.info(String.format("using kafka zk.connect %s (topic %s)", zkConnect, topic));
     } else if (uri.getScheme().equals("kafka")) {
@@ -159,13 +161,13 @@ public class KafkaOutputFormat<W extends
       job.set("kafka.broker.list", brokerList);
 
       if (uri.getPath() == null || uri.getPath().length() <= 1)
-        throw new IllegalArgumentException("no topic specified in kafka uri");
+        throw new KafkaException("no topic specified in kafka uri");
 
       topic = uri.getPath().substring(1);             // ignore the initial '/' in the path
       job.set("kafka.output.topic", topic);
       log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic));
     } else
-      throw new IllegalArgumentException("missing scheme from kafka uri (must be kafka:// or kafka+zk://)");
+      throw new KafkaException("missing scheme from kafka uri (must be kafka:// or kafka+zk://)");
 
     Producer<Integer, Message> producer = new Producer<Integer, Message>(new ProducerConfig(props));
     return new KafkaRecordWriter<W>(producer, topic, queueSize);

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Tue Jul 24 18:13:01 2012
@@ -24,6 +24,7 @@ import kafka.utils.{Logging, Utils, ZkUt
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection.mutable
+import kafka.common.{LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping}
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -48,15 +49,15 @@ object AdminUtils extends Logging {
    * p7        p8        p9        p5        p6       (3nd replica)
    */
   def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
-          fixedStartIndex: Int = -1)  // for testing only
-    : Map[Int, List[String]] = {
+                              fixedStartIndex: Int = -1)  // for testing only
+  : Map[Int, List[String]] = {
     if (nPartitions <= 0)
       throw new AdministrationException("number of partitions must be larger than 0")
     if (replicationFactor <= 0)
       throw new AdministrationException("replication factor must be larger than 0")
     if (replicationFactor > brokerList.size)
       throw new AdministrationException("replication factor: " + replicationFactor +
-              " larger than available brokers: " + brokerList.size)
+        " larger than available brokers: " + brokerList.size)
     val ret = new mutable.HashMap[Int, List[String]]()
     val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
 
@@ -85,7 +86,7 @@ object AdminUtils extends Logging {
     }
   }
 
-  def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[Option[TopicMetadata]] = {
+  def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[TopicMetadata] = {
     val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
     topics.map { topic =>
       if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
@@ -99,15 +100,42 @@ object AdminUtils extends Logging {
           val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
           debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
 
-          new PartitionMetadata(partition,
-            leader.map(l => getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head),
-            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)),
-            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas),
-            None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+          var leaderInfo: Option[Broker] = None
+          var replicaInfo: Seq[Broker] = Nil
+          var isrInfo: Seq[Broker] = Nil
+          try {
+            try {
+              leaderInfo = leader match {
+                case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
+                case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
+              }
+            }catch {
+              case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d"
+                .format(topic, partition))
+            }
+
+            try {
+              replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
+              isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
+            }catch {
+              case e => throw new ReplicaNotAvailableException(e)
+            }
+
+            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError,
+              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+          }catch {
+            case e: ReplicaNotAvailableException => new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
+              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
+              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+            case le: LeaderNotAvailableException => new PartitionMetadata(partition, None, replicaInfo, isrInfo,
+              ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]),
+              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+          }
         }
-        Some(new TopicMetadata(topic, partitionMetadata))
+        new TopicMetadata(topic, partitionMetadata)
       } else {
-        None
+        // topic doesn't exist, send appropriate error code
+        new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicCode)
       }
     }
   }
@@ -120,9 +148,14 @@ object AdminUtils extends Logging {
       optionalBrokerInfo match {
         case Some(brokerInfo) => brokerInfo // return broker info from the cache
         case None => // fetch it from zookeeper
-          val brokerInfo = ZkUtils.getBrokerInfoFromIds(zkClient, List(id)).head
-          cachedBrokerInfo += (id -> brokerInfo)
-          brokerInfo
+          try {
+            val brokerInfo = ZkUtils.getBrokerInfoFromIds(zkClient, List(id)).head
+            cachedBrokerInfo += (id -> brokerInfo)
+            brokerInfo
+          }catch {
+            case e => error("Failed to fetch broker info for broker id " + id)
+            throw e
+          }
       }
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala Tue Jul 24 18:13:01 2012
@@ -20,6 +20,7 @@ package kafka.admin
 import joptsimple.OptionParser
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
+import kafka.common.ErrorMapping
 
 object ListTopicCommand {
 
@@ -77,12 +78,12 @@ object ListTopicCommand {
 
   def showTopic(topic: String, zkClient: ZkClient) {
     val topicMetaData = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-    topicMetaData match {
-      case None =>
+    topicMetaData.errorCode match {
+      case ErrorMapping.UnknownTopicCode =>
         println("topic " + topic + " doesn't exist!")
-      case Some(tmd) =>
+      case _ =>
         println("topic: " + topic)
-        for (part <- tmd.partitionsMetadata)
+        for (part <- topicMetaData.partitionsMetadata)
           println(part.toString)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Tue Jul 24 18:13:01 2012
@@ -20,7 +20,7 @@ package kafka.api
 import java.nio.ByteBuffer
 import kafka.utils.Utils
 import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
-import kafka.common.FetchRequestFormatException
+import kafka.common.{KafkaException, FetchRequestFormatException}
 
 object OffsetDetail {
 
@@ -53,7 +53,7 @@ case class OffsetDetail(topic: String, p
     Utils.writeShortString(buffer, topic, "UTF-8")
 
     if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue)
-      throw new IllegalArgumentException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".")
+      throw new KafkaException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".")
 
     buffer.putInt(partitions.length)
     partitions.foreach(buffer.putInt(_))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala Tue Jul 24 18:13:01 2012
@@ -24,8 +24,8 @@ import kafka.common.ErrorMapping
 object TopicMetaDataResponse {
 
   def readFrom(buffer: ByteBuffer): TopicMetaDataResponse = {
-    val errorCode = buffer.getShort
     val versionId = buffer.getShort
+    val errorCode = buffer.getShort
 
     val topicCount = buffer.getInt
     val topicsMetadata = new Array[TopicMetadata](topicCount)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala Tue Jul 24 18:13:01 2012
@@ -21,6 +21,7 @@ import kafka.cluster.Broker
 import java.nio.ByteBuffer
 import kafka.utils.Utils._
 import collection.mutable.ListBuffer
+import kafka.common.{KafkaException, ErrorMapping}
 
 /**
  * topic (2 bytes + topic.length)
@@ -57,24 +58,28 @@ case object LogSegmentMetadataDoesNotExi
 object TopicMetadata {
 
   def readFrom(buffer: ByteBuffer): TopicMetadata = {
+    val errorCode = getShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val topic = readShortString(buffer)
     val numPartitions = getIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
     val partitionsMetadata = new ListBuffer[PartitionMetadata]()
     for(i <- 0 until numPartitions)
       partitionsMetadata += PartitionMetadata.readFrom(buffer)
-    new TopicMetadata(topic, partitionsMetadata)
+    new TopicMetadata(topic, partitionsMetadata, errorCode)
   }
 }
 
-case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata]) {
+case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) {
   def sizeInBytes: Int = {
-    var size: Int = shortStringLength(topic)
+    var size: Int = 2   /* error code */
+    size += shortStringLength(topic)
     size += partitionsMetadata.foldLeft(4 /* number of partitions */)(_ + _.sizeInBytes)
     debug("Size of topic metadata = " + size)
     size
   }
 
   def writeTo(buffer: ByteBuffer) {
+    /* error code */
+    buffer.putShort(errorCode)
     /* topic */
     writeShortString(buffer, topic)
     /* number of partitions */
@@ -86,6 +91,7 @@ case class TopicMetadata(topic: String, 
 object PartitionMetadata {
 
   def readFrom(buffer: ByteBuffer): PartitionMetadata = {
+    val errorCode = getShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val partitionId = getIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
     val doesLeaderExist = getLeaderRequest(buffer.get)
     val leader = doesLeaderExist match {
@@ -129,18 +135,18 @@ object PartitionMetadata {
         Some(new LogMetadata(numLogSegments, totalDataSize, segmentMetadata))
       case LogSegmentMetadataDoesNotExist => None
     }
-    new PartitionMetadata(partitionId, leader, replicas, isr, logMetadata)
+    new PartitionMetadata(partitionId, leader, replicas, isr, errorCode, logMetadata)
   }
 
-  def getLeaderRequest(requestId: Byte): LeaderRequest = {
+  private def getLeaderRequest(requestId: Byte): LeaderRequest = {
     requestId match {
       case LeaderExists.requestId => LeaderExists
       case LeaderDoesNotExist.requestId => LeaderDoesNotExist
-      case _ => throw new IllegalArgumentException("Unknown leader request id " + requestId)
+      case _ => throw new KafkaException("Unknown leader request id " + requestId)
     }
   }
 
-  def getLogSegmentMetadataRequest(requestId: Byte): LogSegmentMetadataRequest = {
+  private def getLogSegmentMetadataRequest(requestId: Byte): LogSegmentMetadataRequest = {
     requestId match {
       case LogSegmentMetadataExists.requestId => LogSegmentMetadataExists
       case LogSegmentMetadataDoesNotExist.requestId => LogSegmentMetadataDoesNotExist
@@ -149,9 +155,9 @@ object PartitionMetadata {
 }
 
 case class PartitionMetadata(partitionId: Int, leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
-                             logMetadata: Option[LogMetadata] = None) {
+                             errorCode: Short = ErrorMapping.NoError, logMetadata: Option[LogMetadata] = None) {
   def sizeInBytes: Int = {
-    var size: Int = 4 /* partition id */ + 1 /* if leader exists*/
+    var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/
 
     leader match {
       case Some(l) => size += l.sizeInBytes
@@ -173,6 +179,7 @@ case class PartitionMetadata(partitionId
   }
 
   def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(errorCode)
     buffer.putInt(partitionId)
 
     /* if leader exists*/

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Tue Jul 24 18:13:01 2012
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 import kafka.utils.Utils._
 import collection.mutable.ListBuffer
 import kafka.utils._
+import kafka.common.KafkaException
 
 sealed trait DetailedMetadataRequest { def requestId: Short }
 case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] }
@@ -43,7 +44,7 @@ object TopicMetadataRequest {
     requestId match {
       case SegmentMetadata.requestId => SegmentMetadata
       case NoSegmentMetadata.requestId => NoSegmentMetadata
-      case _ => throw new IllegalArgumentException("Unknown detailed metadata request id " + requestId)
+      case _ => throw new KafkaException("Unknown detailed metadata request id " + requestId)
     }
   }
 
@@ -63,7 +64,7 @@ object TopicMetadataRequest {
       case SegmentMetadata =>
         timestamp = Some(buffer.getLong)
         count = Some(buffer.getInt)
-      case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request "
+      case _ => throw new KafkaException("Invalid value for the detailed metadata request "
                                                     + returnDetailedMetadata.requestId)
     }
     debug("topic = %s, detailed metadata request = %d"
@@ -96,7 +97,7 @@ def this(topics: Seq[String]) =
         buffer.putLong(timestamp.get)
         buffer.putInt(count.get)
       case NoSegmentMetadata =>
-      case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
+      case _ => throw new KafkaException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
     }
   }
 
@@ -107,7 +108,7 @@ def this(topics: Seq[String]) =
       case SegmentMetadata =>
         size += 8 /* timestamp */ + 4 /* count */
       case NoSegmentMetadata =>
-      case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
+      case _ => throw new KafkaException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
     }
     size
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Tue Jul 24 18:13:01 2012
@@ -19,6 +19,7 @@ package kafka.cluster
 
 import kafka.utils.Utils._
 import java.nio.ByteBuffer
+import kafka.common.KafkaException
 
 /**
  * A Kafka broker
@@ -27,7 +28,7 @@ private[kafka] object Broker {
 
   def createBroker(id: Int, brokerInfoString: String): Broker = {
     if(brokerInfoString == null)
-      throw new IllegalArgumentException("Broker id %s does not exist".format(id))
+      throw new KafkaException("Broker id %s does not exist".format(id))
     val brokerInfo = brokerInfoString.split(":")
     new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Tue Jul 24 18:13:01 2012
@@ -16,12 +16,11 @@
  */
 package kafka.cluster
 
-import kafka.common.NoLeaderForPartitionException
 import kafka.utils.{SystemTime, Time, Logging}
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.ZkUtils._
 import java.util.concurrent.locks.ReentrantLock
-import java.lang.IllegalStateException
+import kafka.common.{KafkaException, LeaderNotAvailableException}
 
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
@@ -77,10 +76,10 @@ class Partition(val topic: String,
     if(leaderReplicaId.isDefined) {
       val leaderReplica = assignedReplicas().find(_.brokerId == leaderReplicaId.get)
       if(leaderReplica.isDefined) leaderReplica.get
-      else throw new IllegalStateException("No replica for leader %d in the replica manager"
+      else throw new KafkaException("No replica for leader %d in the replica manager"
         .format(leaderReplicaId.get))
     }else
-      throw new NoLeaderForPartitionException("Leader for topic %s partition %d does not exist"
+      throw new LeaderNotAvailableException("Leader for topic %s partition %d does not exist"
         .format(topic, partitionId))
   }
 
@@ -131,12 +130,12 @@ class Partition(val topic: String,
       inSyncReplicas = newISR.map {r =>
         getReplica(r) match {
           case Some(replica) => replica
-          case None => throw new IllegalStateException("ISR update failed. No replica for id %d".format(r))
+          case None => throw new KafkaException("ISR update failed. No replica for id %d".format(r))
         }
       }
-      info("Updated ISR for for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(",")))
+      info("Updated ISR for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(",")))
     }catch {
-      case e => throw new IllegalStateException("Failed to update ISR for topic %s ".format(topic) +
+      case e => throw new KafkaException("Failed to update ISR for topic %s ".format(topic) +
         "partition %d to %s".format(partitionId, newISR.mkString(",")), e)
     }finally {
       leaderISRUpdateLock.unlock()
@@ -146,7 +145,7 @@ class Partition(val topic: String,
   private def updateISRInZk(newISR: Set[Int], zkClient: ZkClient) = {
     val replicaListAndEpochString = readDataMaybeNull(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString))
     if(replicaListAndEpochString == null) {
-      throw new NoLeaderForPartitionException(("Illegal partition state. ISR cannot be updated for topic " +
+      throw new LeaderNotAvailableException(("Illegal partition state. ISR cannot be updated for topic " +
         "%s partition %d since leader and ISR does not exist in ZK".format(topic, partitionId)))
     }
     else {
@@ -154,7 +153,7 @@ class Partition(val topic: String,
       val epoch = replicasAndEpochInfo.last
       updatePersistentPath(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString),
         "%s;%s".format(newISR.mkString(","), epoch))
-      info("Updating ISR for for topic %s partition %d to %s in ZK".format(topic, partitionId, newISR.mkString(",")))
+      info("Updated ISR for topic %s partition %d to %s in ZK".format(topic, partitionId, newISR.mkString(",")))
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Tue Jul 24 18:13:01 2012
@@ -18,8 +18,8 @@
 package kafka.cluster
 
 import kafka.log.Log
-import java.lang.IllegalStateException
 import kafka.utils.Logging
+import kafka.common.KafkaException
 
 class Replica(val brokerId: Int,
               val partition: Partition,
@@ -32,7 +32,7 @@ class Replica(val brokerId: Int,
     isLocal match {
       case true =>
         newLeo match {
-          case Some(newOffset) => throw new IllegalStateException("Trying to set the leo %d for local log".format(newOffset))
+          case Some(newOffset) => throw new KafkaException("Trying to set the leo %d for local log".format(newOffset))
           case None => log.get.logEndOffset
         }
       case false =>
@@ -71,15 +71,15 @@ class Replica(val brokerId: Int,
                                                                                    brokerId, highwaterMark))
             log.get.setHW(highwaterMark)
             highwaterMark
-          case false => throw new IllegalStateException("Unable to set highwatermark for topic %s ".format(topic) +
+          case false => throw new KafkaException("Unable to set highwatermark for topic %s ".format(topic) +
             "partition %d on broker %d, since there is no local log for this partition"
               .format(partition.partitionId, brokerId))
         }
       case None =>
         isLocal match {
           case true =>
-            log.get.highwaterMark
-          case false => throw new IllegalStateException("Unable to get highwatermark for topic %s ".format(topic) +
+            log.get.getHW()
+          case false => throw new KafkaException("Unable to get highwatermark for topic %s ".format(topic) +
             "partition %d on broker %d, since there is no local log for this partition"
               .format(partition.partitionId, brokerId))
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Tue Jul 24 18:13:01 2012
@@ -35,10 +35,11 @@ object ErrorMapping {
   val InvalidPartitionCode : Short = 3
   val InvalidFetchSizeCode  : Short = 4
   val InvalidFetchRequestFormatCode : Short = 5
-  val NoLeaderForPartitionCode : Short = 6
+  val LeaderNotAvailableCode : Short = 6
   val NotLeaderForPartitionCode : Short = 7
   val UnknownTopicCode : Short = 8
   val RequestTimedOutCode: Short = 9
+  val ReplicaNotAvailableCode: Short = 10
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -48,8 +49,9 @@ object ErrorMapping {
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
       classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
-      classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
+      classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
+      classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
       classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
     ).withDefaultValue(UnknownCode)
   

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaException.scala?rev=1365199&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/KafkaException.scala Tue Jul 24 18:13:01 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * Generic Kafka exception
+*/
+class KafkaException(message: String, t: Throwable) extends RuntimeException(message, t) {
+  def this(message: String) = this(message, null)
+  def this(t: Throwable) = this("", t)
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala?rev=1365199&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala Tue Jul 24 18:13:01 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a request is made for partition, but no leader exists for that partition
+ */
+class LeaderNotAvailableException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala?rev=1365199&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala Tue Jul 24 18:13:01 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a request is made for partition, but no leader exists for that partition
+ */
+class ReplicaNotAvailableException(cause: Throwable) extends RuntimeException(cause) {
+  def this() = this(null)
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Tue Jul 24 18:13:01 2012
@@ -156,16 +156,15 @@ object ConsoleConsumer extends Logging {
       }
     })
 
-    val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
-    val iter = if(maxMessages >= 0)
-      stream.slice(0, maxMessages)
-    else
-      stream
-
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
-
     try {
+      val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
+      val iter = if(maxMessages >= 0)
+        stream.slice(0, maxMessages)
+      else
+        stream
+
       for(messageAndTopic <- iter) {
         try {
           formatter.writeTo(messageAndTopic.message, System.out)
@@ -176,7 +175,7 @@ object ConsoleConsumer extends Logging {
             else
               throw e
         }
-        if(System.out.checkError()) { 
+        if(System.out.checkError()) {
           // This means no one is listening to our output stream any more, time to shutdown
           System.err.println("Unable to write to standard out, closing consumer.")
           formatter.close()
@@ -187,7 +186,6 @@ object ConsoleConsumer extends Logging {
     } catch {
       case e => error("Error processing message, stopping consumer: ", e)
     }
-      
     System.out.flush()
     formatter.close()
     connector.shutdown()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Tue Jul 24 18:13:01 2012
@@ -22,6 +22,7 @@ import java.util.concurrent.{TimeUnit, B
 import kafka.serializer.Decoder
 import java.util.concurrent.atomic.AtomicReference
 import kafka.message.{MessageAndOffset, MessageAndMetadata}
+import kafka.common.KafkaException
 
 
 /**
@@ -42,7 +43,7 @@ class ConsumerIterator[T](private val ch
   override def next(): MessageAndMetadata[T] = {
     val item = super.next()
     if(consumedOffset < 0)
-      throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
+      throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
     currentTopicInfo.resetConsumeOffset(consumedOffset)
     val topic = currentTopicInfo.topic
     trace("Setting %s consumed offset to %d".format(topic, consumedOffset))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Tue Jul 24 18:13:01 2012
@@ -30,9 +30,8 @@ import org.apache.zookeeper.Watcher.Even
 import kafka.api.OffsetRequest
 import java.util.UUID
 import kafka.serializer.Decoder
-import java.lang.IllegalStateException
 import kafka.utils.ZkUtils._
-import kafka.common.{NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException}
+import kafka.common.{KafkaException, NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException}
 
 
 /**
@@ -319,7 +318,7 @@ private[kafka] class ZookeeperConsumerCo
       val cluster = getCluster(zkClient)
       val broker = cluster.getBroker(brokerId) match {
         case Some(b) => b
-        case None => throw new IllegalStateException("Broker " + brokerId + " is unavailable. Cannot issue " +
+        case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " +
           "getOffsetsBefore request")
       }
       simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala Tue Jul 24 18:13:01 2012
@@ -20,6 +20,7 @@ package kafka.consumer.storage.sql
 import java.sql._
 import kafka.utils._
 import kafka.consumer.storage.OffsetStorage
+import kafka.common.KafkaException
 
 /**
  * An offset storage implementation that uses an oracle database to save offsets
@@ -76,7 +77,7 @@ class OracleOffsetStorage(val connection
     stmt.setString(4, topic)
     val updated = stmt.executeUpdate()
     if(updated > 1)
-      throw new IllegalStateException("More than one key updated by primary key!")
+      throw new KafkaException("More than one key updated by primary key!")
     else
       updated == 1
   }
@@ -100,7 +101,7 @@ class OracleOffsetStorage(val connection
       } else {
         val offset = results.getLong("offset")
         if(results.next())
-          throw new IllegalStateException("More than one entry for primary key!")
+          throw new KafkaException("More than one entry for primary key!")
         Some(offset)
       }
     } finally {
@@ -120,7 +121,7 @@ class OracleOffsetStorage(val connection
       stmt.setString(3, topic)
       val updated = stmt.executeUpdate()
       if(updated != 1)
-        throw new IllegalStateException("Unexpected number of keys updated: " + updated)
+        throw new KafkaException("Unexpected number of keys updated: " + updated)
     } finally {
       close(stmt)
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Tue Jul 24 18:13:01 2012
@@ -24,8 +24,8 @@ import java.util.concurrent.atomic.{Atom
 import kafka.utils._
 import java.text.NumberFormat
 import kafka.server.BrokerTopicStat
-import kafka.common.{InvalidMessageSizeException, OffsetOutOfRangeException}
 import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet}
+import kafka.common.{KafkaException, InvalidMessageSizeException, OffsetOutOfRangeException}
 
 object Log {
   val FileSuffix = ".kafka"
@@ -94,8 +94,22 @@ object Log {
  */
 class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
   @volatile var deleted = false
-  def size: Long = messageSet.highWaterMark
+  /* Return the size in bytes of this log segment */
+  def size: Long = messageSet.sizeInBytes()
+  /* Return the absolute end offset of this log segment */
+  def absoluteEndOffset: Long = start + messageSet.sizeInBytes()
   override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
+
+  /**
+   * Truncate this log segment upto absolute offset value. Since the offset specified is absolute, to compute the amount
+   * of data to be deleted, we have to compute the offset relative to start of the log segment
+   * @param offset Absolute offset for this partition
+   */
+  def truncateUpto(offset: Long) = {
+    assert(offset >= start, "Offset %d used for truncating this log segment cannot be smaller than the start offset %d".
+                            format(offset, start))
+    messageSet.truncateUpto(offset - start)
+  }
 }
 
 
@@ -120,11 +134,12 @@ private[kafka] class Log(val dir: File, 
   /* The actual segments of the log */
   private[log] val segments: SegmentList[LogSegment] = loadSegments()
 
-
   /* create the leader highwatermark file handle */
   private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw")
+  info("Created highwatermark file %s for log %s".format(dir.getAbsolutePath + "/" + hwFileName, name))
 
-  private var hw: Long = 0
+  /* If hw file is absent, the hw defaults to 0. If it exists, hw is set to the checkpointed value */
+  private var hw: Long = if(hwFile.length() > 0) hwFile.readLong() else { hwFile.writeLong(0); 0 }
 
   private val logStats = new LogStats(this)
 
@@ -185,7 +200,7 @@ private[kafka] class Log(val dir: File, 
         val curr = segments.get(i)
         val next = segments.get(i+1)
         if(curr.start + curr.size != next.start)
-          throw new IllegalStateException("The following segments don't validate: " +
+          throw new KafkaException("The following segments don't validate: " +
                   curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
       }
     }
@@ -259,6 +274,7 @@ private[kafka] class Log(val dir: File, 
    * Read from the log file at the given offset
    */
   def read(offset: Long, length: Int): MessageSet = {
+    trace("Reading %d bytes from offset %d in log %s of length %s bytes".format(length, offset, name, size))
     val view = segments.view
     Log.findRange(view, offset, view.length) match {
       case Some(segment) => segment.messageSet.read((offset - segment.start), length)
@@ -294,26 +310,12 @@ private[kafka] class Log(val dir: File, 
   /**
    * Get the size of the log in bytes
    */
-  def size: Long =
-    segments.view.foldLeft(0L)(_ + _.size)
-
-  /**
-   * The byte offset of the message that will be appended next.
-   */
-  def nextAppendOffset: Long = {
-    val last = segments.view.last
-    last.start + last.size
-  }
-
-  /**
-   *  get the current high watermark of the log
-   */
-  def highwaterMark: Long = segments.view.last.messageSet.highWaterMark
+  def size: Long = segments.view.foldLeft(0L)(_ + _.size)
 
   /**
-   *  get the offset of the last message in the log
+   *  Get the absolute offset of the last message in the log
    */
-  def logEndOffset: Long = segments.view.last.messageSet.getEndOffset()
+  def logEndOffset: Long = segments.view.last.start + segments.view.last.size
 
   /**
    * Roll the log over if necessary
@@ -329,7 +331,7 @@ private[kafka] class Log(val dir: File, 
   def roll() {
     lock synchronized {
       flush
-      val newOffset = nextAppendOffset
+      val newOffset = logEndOffset
       val newFile = new File(dir, nameFromOffset(newOffset))
       if (newFile.exists) {
         warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first")
@@ -376,7 +378,7 @@ private[kafka] class Log(val dir: File, 
     for (i <- 0 until segsArray.length)
       offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)
     if (segsArray.last.size > 0)
-      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
+      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(), SystemTime.milliseconds)
 
     var startIndex = -1
     request.time match {
@@ -412,7 +414,7 @@ private[kafka] class Log(val dir: File, 
     lock synchronized {
       val deletedSegments = segments.trunc(segments.view.size)
       val newFile = new File(dir, Log.nameFromOffset(newOffset))
-      debug("tuncate and start log '" + name + "' to " + newFile.getName())
+      debug("Truncate and start log '" + name + "' to " + newFile.getName())
       segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
       deleteSegments(deletedSegments)
     }
@@ -438,35 +440,27 @@ private[kafka] class Log(val dir: File, 
       // read the last checkpointed hw from disk
       hwFile.seek(0)
       val lastKnownHW = hwFile.readLong()
+      info("Recovering log %s upto highwatermark %d".format(name, lastKnownHW))
       // find the log segment that has this hw
       val segmentToBeTruncated = segments.view.find(segment =>
-        lastKnownHW >= segment.start && lastKnownHW < segment.messageSet.getEndOffset())
+        lastKnownHW >= segment.start && lastKnownHW < segment.absoluteEndOffset)
 
       segmentToBeTruncated match {
         case Some(segment) =>
-          val truncatedSegmentIndex = segments.view.indexOf(segment)
-          segments.truncLast(truncatedSegmentIndex)
-        case None =>
-      }
-
-      segmentToBeTruncated match {
-        case Some(segment) =>
-          segment.messageSet.truncateUpto(lastKnownHW)
+          segment.truncateUpto(lastKnownHW)
           info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw))
         case None =>
-          assert(lastKnownHW <= segments.view.last.messageSet.size,
+          assert(lastKnownHW <= logEndOffset,
             "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
-              format(lastKnownHW, segments.view.last.messageSet.size, segments.view.last.file.getAbsolutePath))
+              format(lastKnownHW, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
           error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
-            .format(lastKnownHW, segments.view.head.start, segments.view.last.messageSet.size))
+            .format(lastKnownHW, segments.view.head.start, logEndOffset))
       }
 
-      val segmentsToBeDeleted = segments.view.filter(segment => segment.start >= lastKnownHW)
-      if(segmentsToBeDeleted.size < segments.view.size) {
+      val segmentsToBeDeleted = segments.view.filter(segment => segment.start > lastKnownHW)
       val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
       if(numSegmentsDeleted != segmentsToBeDeleted.size)
         error("Failed to delete some segments during log recovery")
-      }
     }else
       info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name)
   }
@@ -475,10 +469,13 @@ private[kafka] class Log(val dir: File, 
     hw = latestLeaderHW
   }
 
+  def getHW(): Long = hw
+
   def checkpointHW() {
     hwFile.seek(0)
     hwFile.writeLong(hw)
     hwFile.getChannel.force(true)
+    info("Checkpointed highwatermark %d for log %s".format(hw, name))
   }
 
   def topicName():String = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Tue Jul 24 18:13:01 2012
@@ -20,11 +20,10 @@ package kafka.log
 import java.io._
 import kafka.utils._
 import scala.collection._
-import java.util.concurrent.CountDownLatch
 import kafka.server.KafkaConfig
-import kafka.common.{InvalidTopicException, InvalidPartitionException}
 import kafka.api.OffsetRequest
 import kafka.log.Log._
+import kafka.common.{KafkaException, InvalidTopicException, InvalidPartitionException}
 
 /**
  * The guy who creates and hands out logs
@@ -54,7 +53,7 @@ private[kafka] class LogManager(val conf
     logDir.mkdirs()
   }
   if(!logDir.isDirectory() || !logDir.canRead())
-    throw new IllegalArgumentException(logDir.getAbsolutePath() + " is not a readable log directory.")
+    throw new KafkaException(logDir.getAbsolutePath() + " is not a readable log directory.")
   val subDirs = logDir.listFiles()
   if(subDirs != null) {
     for(dir <- subDirs) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala Tue Jul 24 18:13:01 2012
@@ -36,7 +36,7 @@ class LogStats(val log: Log) extends Log
   
   def getNumberOfSegments: Int = log.numberOfSegments
   
-  def getCurrentOffset: Long = log.highwaterMark
+  def getCurrentOffset: Long = log.getHW()
   
   def getNumAppendedMessages: Long = numCumulatedMessages.get
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala Tue Jul 24 18:13:01 2012
@@ -20,6 +20,7 @@ package kafka.log
 import java.util.concurrent.atomic._
 import reflect._
 import scala.math._
+import kafka.common.KafkaException
 
 private[log] object SegmentList {
   val MaxAttempts: Int = 20
@@ -56,7 +57,7 @@ private[log] class SegmentList[T](seq: S
    */
   def trunc(newStart: Int): Seq[T] = {
     if(newStart < 0)
-      throw new IllegalArgumentException("Starting index must be positive.");
+      throw new KafkaException("Starting index must be positive.");
     var deleted: Array[T] = null
     var done = false
     while(!done) {
@@ -78,7 +79,7 @@ private[log] class SegmentList[T](seq: S
    */
   def truncLast(newEnd: Int): Seq[T] = {
     if(newEnd >= contents.get().size-1)
-      throw new IllegalArgumentException("End index must be segment list size - 1");
+      throw new KafkaException("End index must be segment list size - 1");
     var deleted: Array[T] = null
     var done = false
     while(!done) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Tue Jul 24 18:13:01 2012
@@ -23,6 +23,7 @@ import java.nio.channels._
 import java.util.concurrent.atomic._
 
 import kafka.utils._
+import kafka.common.KafkaException
 
 /**
  * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
@@ -38,30 +39,26 @@ class FileMessageSet private[kafka](priv
                                     val needRecover: AtomicBoolean) extends MessageSet with Logging {
   
   private val setSize = new AtomicLong()
-  private val setHighWaterMark = new AtomicLong()
-  
+
   def getSerialized(): ByteBuffer = throw new java.lang.UnsupportedOperationException()
 
   if(mutable) {
     if(limit < Long.MaxValue || offset > 0)
-      throw new IllegalArgumentException("Attempt to open a mutable message set with a view or offset, which is not allowed.")
+      throw new KafkaException("Attempt to open a mutable message set with a view or offset, which is not allowed.")
 
     if (needRecover.get) {
       // set the file position to the end of the file for appending messages
       val startMs = System.currentTimeMillis
       val truncated = recover()
       info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 +
-                " seconds. " + truncated + " bytes truncated.")
+                " seconds. " + truncated + " bytes truncated. New log size is " + sizeInBytes() + " bytes")
     }
     else {
       setSize.set(channel.size())
-      setHighWaterMark.set(sizeInBytes)
       channel.position(channel.size)
     }
   } else {
     setSize.set(scala.math.min(channel.size(), limit) - offset)
-    setHighWaterMark.set(sizeInBytes)
-    debug("initializing high water mark in immutable mode: " + highWaterMark)
   }
   
   /**
@@ -93,7 +90,7 @@ class FileMessageSet private[kafka](priv
    * Return a message set which is a view into this set starting from the given offset and with the given size limit.
    */
   def read(readOffset: Long, size: Long): FileMessageSet = {
-    new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark),
+    new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, sizeInBytes()),
       false, new AtomicBoolean(false))
   }
   
@@ -140,17 +137,10 @@ class FileMessageSet private[kafka](priv
    * The number of bytes taken up by this file set
    */
   def sizeInBytes(): Long = setSize.get()
-  
-  /**
-    * The high water mark
-    */
-  def highWaterMark(): Long = setHighWaterMark.get()
-
-  def getEndOffset(): Long = offset + sizeInBytes()
 
   def checkMutable(): Unit = {
     if(!mutable)
-      throw new IllegalStateException("Attempt to invoke mutation on immutable message set.")
+      throw new KafkaException("Attempt to invoke mutation on immutable message set.")
   }
   
   /**
@@ -173,9 +163,6 @@ class FileMessageSet private[kafka](priv
     channel.force(true)
     val elapsedTime = SystemTime.milliseconds - startTime
     LogFlushStats.recordFlushRequest(elapsedTime)
-    debug("flush time " + elapsedTime)
-    setHighWaterMark.set(sizeInBytes)
-    debug("flush high water mark:" + highWaterMark)
   }
   
   /**
@@ -203,8 +190,6 @@ class FileMessageSet private[kafka](priv
     } while(next >= 0)
     channel.truncate(validUpTo)
     setSize.set(validUpTo)
-    setHighWaterMark.set(validUpTo)
-    info("recover high water mark:" + highWaterMark)
     /* This should not be necessary, but fixes bug 6191269 on some OSs. */
     channel.position(validUpTo)
     needRecover.set(false)    
@@ -214,7 +199,6 @@ class FileMessageSet private[kafka](priv
   def truncateUpto(hw: Long) = {
     channel.truncate(hw)
     setSize.set(hw)
-    setHighWaterMark.set(hw)
   }
 
   /**
@@ -242,7 +226,7 @@ class FileMessageSet private[kafka](priv
     while(messageBuffer.hasRemaining) {
       read = channel.read(messageBuffer, curr)
       if(read < 0)
-        throw new IllegalStateException("File size changed during recovery!")
+        throw new KafkaException("File size changed during recovery!")
       else
         curr += read
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala Tue Jul 24 18:13:01 2012
@@ -29,7 +29,7 @@ private[kafka] class BoundedByteBufferSe
 
   // Avoid possibility of overflow for 2GB-4 byte buffer
   if(buffer.remaining > Int.MaxValue - sizeBuffer.limit)
-    throw new IllegalArgumentException("Attempt to create a bounded buffer of " + buffer.remaining + " bytes, but the maximum " +
+    throw new IllegalStateException("Attempt to create a bounded buffer of " + buffer.remaining + " bytes, but the maximum " +
                                        "allowable size for a bounded buffer is " + (Int.MaxValue - sizeBuffer.limit) + ".")    
   sizeBuffer.putInt(buffer.limit)
   sizeBuffer.rewind()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Tue Jul 24 18:13:01 2012
@@ -49,7 +49,7 @@ class SocketServer(val port: Int,
   def startup() {
     for(i <- 0 until numProcessorThreads) {
       processors(i) = new Processor(i, time, maxRequestSize, requestChannel, stats)
-      Utils.newThread("kafka-processor-" + i, processors(i), false).start()
+      Utils.newThread("kafka-processor-%d-%d".format(port, i), processors(i), false).start()
     }
     // register the processor threads for notification of responses
     requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
@@ -68,9 +68,8 @@ class SocketServer(val port: Int,
     acceptor.shutdown
     for(processor <- processors)
       processor.shutdown
-    info("Shut down socket server.")
+    info("Shut down socket server complete")
   }
-
 }
 
 /**
@@ -84,7 +83,7 @@ private[kafka] abstract class AbstractSe
   private val alive = new AtomicBoolean(false)
 
   /**
-   * Initiates a graceful shutdown by signeling to stop and waiting for the shutdown to complete
+   * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
    */
   def shutdown(): Unit = {
     alive.set(false)
@@ -244,12 +243,13 @@ private[kafka] class Processor(val id: I
       try {
         key.interestOps(SelectionKey.OP_WRITE)
         key.attach(curr.response)
-        curr = requestChannel.receiveResponse(id)
       } catch {
         case e: CancelledKeyException => {
           debug("Ignoring response for closed socket.")
           close(key)
         }
+      }finally {
+        curr = requestChannel.receiveResponse(id)
       }
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala Tue Jul 24 18:13:01 2012
@@ -20,6 +20,7 @@ package kafka.network
 import java.nio._
 import java.nio.channels._
 import kafka.utils.Logging
+import kafka.common.KafkaException
 
 /**
  * Represents a stateful transfer of data to or from the network
@@ -30,12 +31,12 @@ private[network] trait Transmission exte
   
   protected def expectIncomplete(): Unit = {
     if(complete)
-      throw new IllegalStateException("This operation cannot be completed on a complete request.")
+      throw new KafkaException("This operation cannot be completed on a complete request.")
   }
   
   protected def expectComplete(): Unit = {
     if(!complete)
-      throw new IllegalStateException("This operation cannot be completed on an incomplete request.")
+      throw new KafkaException("This operation cannot be completed on an incomplete request.")
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Tue Jul 24 18:13:01 2012
@@ -13,14 +13,15 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package kafka.producer
 
 import collection.mutable.HashMap
 import kafka.api.{TopicMetadataRequest, TopicMetadata}
-import java.lang.IllegalStateException
+import kafka.common.KafkaException
 import kafka.utils.Logging
 import kafka.cluster.{Replica, Partition}
+import kafka.common.{LeaderNotAvailableException, ErrorMapping, UnknownTopicException}
 
 class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
   val topicPartitionInfo = new HashMap[String, TopicMetadata]()
@@ -31,7 +32,7 @@ class BrokerPartitionInfo(producerPool: 
    * @param topic the topic for which this information is to be returned
    * @return a sequence of (brokerId, numPartitions). Returns a zero-length
    * sequence if no brokers are available.
-   */  
+   */
   def getBrokerPartitionInfo(topic: String): Seq[Partition] = {
     debug("Getting broker partition info for topic %s".format(topic))
     // check if the cache has metadata for this topic
@@ -41,12 +42,11 @@ class BrokerPartitionInfo(producerPool: 
         case Some(m) => m
         case None =>
           // refresh the topic metadata cache
-          info("Fetching metadata for topic %s".format(topic))
-          updateInfo(topic)
+          updateInfo(List(topic))
           val topicMetadata = topicPartitionInfo.get(topic)
           topicMetadata match {
             case Some(m) => m
-            case None => throw new IllegalStateException("Failed to fetch topic metadata for topic: " + topic)
+            case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic)
           }
       }
     val partitionMetadata = metadata.partitionsMetadata
@@ -69,24 +69,33 @@ class BrokerPartitionInfo(producerPool: 
    * It updates the cache by issuing a get topic metadata request to a random broker.
    * @param topic the topic for which the metadata is to be fetched
    */
-  def updateInfo(topic: String = null) = {
+  def updateInfo(topics: Seq[String] = Seq.empty[String]) = {
     val producer = producerPool.getAnyProducer
-    if(topic != null) {
+    val topicList = if(topics.size > 0) topics else topicPartitionInfo.keySet.toList
+    topicList.foreach { topic =>
+      info("Fetching metadata for topic %s".format(topic))
       val topicMetadataRequest = new TopicMetadataRequest(List(topic))
-      val topicMetadataList = producer.send(topicMetadataRequest)
-      val topicMetadata:Option[TopicMetadata] = if(topicMetadataList.size > 0) Some(topicMetadataList.head) else None
+      var topicMetaDataResponse: Seq[TopicMetadata] = Nil
+      try {
+        topicMetaDataResponse = producer.send(topicMetadataRequest)
+        // throw topic specific exception
+        topicMetaDataResponse.foreach(metadata => ErrorMapping.maybeThrowException(metadata.errorCode))
+        // throw partition specific exception
+        topicMetaDataResponse.foreach(metadata =>
+          metadata.partitionsMetadata.foreach(partitionMetadata => ErrorMapping.maybeThrowException(partitionMetadata.errorCode)))
+      }catch {
+        case te: UnknownTopicException => throw te
+        case e: LeaderNotAvailableException => throw e
+        case oe => warn("Ignoring non leader related error while fetching metadata", oe)  // swallow non leader related errors
+      }
+      val topicMetadata:Option[TopicMetadata] = if(topicMetaDataResponse.size > 0) Some(topicMetaDataResponse.head) else None
       topicMetadata match {
         case Some(metadata) =>
           info("Fetched metadata for topics %s".format(topic))
+          topicMetadata.foreach(metadata => trace("Metadata for topic %s is %s".format(metadata.topic, metadata.toString)))
           topicPartitionInfo += (topic -> metadata)
         case None =>
       }
-    }else {
-      // refresh cache for all topics
-      val topics = topicPartitionInfo.keySet.toList
-      val topicMetadata = producer.send(new TopicMetadataRequest(topics))
-      topicMetadata.foreach(metadata => topicPartitionInfo += (metadata.topic -> metadata))
-
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Tue Jul 24 18:13:01 2012
@@ -16,7 +16,7 @@
  */
 package kafka.producer
 
-import async._
+import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, EventHandler}
 import kafka.utils._
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
 import kafka.serializer.Encoder
@@ -34,6 +34,7 @@ extends Logging {
     throw new InvalidConfigException("Batch size can't be larger than queue size.")
 
   private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize)
+
   private var sync: Boolean = true
   private var producerSendThread: ProducerSendThread[K,V] = null
   config.producerType match {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Tue Jul 24 18:13:01 2012
@@ -87,5 +87,5 @@ class ProducerConfig(val props: Properti
    */
   val producerRetries = Utils.getInt(props, "producer.num.retries", 3)
 
-  val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 5)
+  val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 100)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerData.scala Tue Jul 24 18:13:01 2012
@@ -18,11 +18,11 @@
 package kafka.producer
 
 /**
- * Represents the data to be sent using the Producer send API
- * @param topic the topic under which the message is to be published
- * @param key the key used by the partitioner to pick a broker partition
- * @param data variable length data to be published as Kafka messages under topic
- */
+* Represents the data to be sent using the Producer send API
+* @param topic the topic under which the message is to be published
+* @param key the key used by the partitioner to pick a broker partition
+* @param data variable length data to be published as Kafka messages under topic
+*/
 case class ProducerData[K,V](topic: String,
                              key: K,
                              data: Seq[V]) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Tue Jul 24 18:13:01 2012
@@ -22,7 +22,7 @@ import kafka.message.MessageSet
 import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
 import kafka.utils._
 import java.util.Random
-import kafka.common.MessageSizeTooLargeException
+import kafka.common.{ErrorMapping, MessageSizeTooLargeException}
 
 object SyncProducer {
   val RequestKey: Short = 0
@@ -42,7 +42,8 @@ class SyncProducer(val config: SyncProdu
 
   private val lock = new Object()
   @volatile private var shutdown: Boolean = false
-  private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, config.bufferSize, config.socketTimeoutMs)
+  private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
+    config.bufferSize, config.requestTimeoutMs)
 
   trace("Instantiating Scala Sync Producer")
 
@@ -114,6 +115,8 @@ class SyncProducer(val config: SyncProdu
   def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {
     val response = doSend(request)
     val topicMetaDataResponse = TopicMetaDataResponse.readFrom(response.buffer)
+    // try to throw exception based on global error codes
+    ErrorMapping.maybeThrowException(topicMetaDataResponse.errorCode)
     topicMetaDataResponse.topicsMetadata
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Tue Jul 24 18:13:01 2012
@@ -35,9 +35,6 @@ trait SyncProducerConfigShared {
 
   val connectTimeoutMs = Utils.getInt(props, "connect.timeout.ms", 5000)
 
-  /** the socket timeout for network requests */
-  val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", 30000)  
-
   val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
 
   /** negative reconnect time interval means disabling this time-based reconnect feature */
@@ -59,15 +56,15 @@ trait SyncProducerConfigShared {
   val requiredAcks = Utils.getShort(props,"producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
 
   /*
-   * The ack timeout of the producer requests - negative value means wait
-   * indefinitely (or until an ack is received).
+   * The ack timeout of the producer requests. Value must be non-negative and non-zero
    */
-  val ackTimeoutMs = Utils.getInt(props,"producer.request.ack.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs)
+  val requestTimeoutMs = Utils.getIntInRange(props,"producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
+                                             (1, Integer.MAX_VALUE))
 }
 
 object SyncProducerConfig {
   val DefaultCorrelationId = -1
   val DefaultClientId = ""
   val DefaultRequiredAcks : Short = 0
-  val DefaultAckTimeoutMs = -1
+  val DefaultAckTimeoutMs = 500
 }
\ No newline at end of file



Mime
View raw message