kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1397765 [2/2] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/client/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/con...
Date Sat, 13 Oct 2012 03:35:05 GMT
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Sat Oct 13 03:35:02 2012
@@ -33,7 +33,10 @@ object ReplicaManager {
   val UnknownLogEndOffset = -1L
 }
 
-class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler,
+class ReplicaManager(val config: KafkaConfig, 
+                     time: Time, 
+                     val zkClient: ZkClient, 
+                     kafkaScheduler: KafkaScheduler,
                      val logManager: LogManager) extends Logging with KafkaMetricsGroup {
   private val allPartitions = new Pool[(String, Int), Partition]
   private var leaderPartitions = new mutable.HashSet[Partition]()
@@ -85,7 +88,7 @@ class ReplicaManager(val config: KafkaCo
 
   def startup() {
     // start ISR expiration thread
-    kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
+    kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
   }
 
   def stopReplica(topic: String, partitionId: Int): Short  = {
@@ -221,17 +224,17 @@ class ReplicaManager(val config: KafkaCo
     }
   }
 
-  private def maybeShrinkISR(): Unit = {
+  private def maybeShrinkIsr(): Unit = {
     trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
     leaderPartitionsLock synchronized {
-      leaderPartitions.foreach(partition => partition.maybeShrinkISR(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes))
+      leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes))
     }
   }
 
   def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
     val partitionOpt = getPartition(topic, partitionId)
     if(partitionOpt.isDefined){
-      partitionOpt.get.updateLeaderHWAndMaybeExpandISR(replicaId, offset)
+      partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
     } else {
       warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId))
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala Sat Oct 13 03:35:02 2012
@@ -69,8 +69,8 @@ object DumpLogSegments {
         print(" keysize: " + msg.keySize)
       if(printContents) {
         if(msg.hasKey)
-          print(" key: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
-        print(" payload: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+          print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
+        print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
       }
       println()
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala Sat Oct 13 03:35:02 2012
@@ -19,7 +19,7 @@ package kafka.tools
 
 import kafka.message.Message
 import joptsimple.OptionParser
-import kafka.utils.{Utils, Logging}
+import kafka.utils.{Utils, CommandLineUtils, Logging}
 import kafka.producer.{ProducerData, ProducerConfig, Producer}
 import scala.collection.JavaConversions._
 import java.util.concurrent.CountDownLatch
@@ -81,8 +81,7 @@ object MirrorMaker extends Logging {
       System.exit(0)
     }
 
-    Utils.checkRequiredArgs(
-      parser, options, consumerConfigOpt, producerConfigOpt)
+    CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
     if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
       println("Exactly one of whitelist or blacklist is required.")
       System.exit(1)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Sat Oct 13 03:35:02 2012
@@ -22,14 +22,14 @@ import java.util.concurrent.{Executors, 
 import java.util.Properties
 import kafka.producer.{ProducerData, ProducerConfig, Producer}
 import kafka.consumer._
-import kafka.utils.{ZKStringSerializer, Logging}
+import kafka.utils.{ZKStringSerializer, Logging, ZkUtils}
 import kafka.api.OffsetRequest
 import org.I0Itec.zkclient._
 import kafka.message.{CompressionCodec, Message}
 
 object ReplayLogProducer extends Logging {
 
-  private val GROUPID: String = "replay-log-producer"
+  private val GroupId: String = "replay-log-producer"
 
   def main(args: Array[String]) {
     val config = new Config(args)
@@ -38,12 +38,12 @@ object ReplayLogProducer extends Logging
     val allDone = new CountDownLatch(config.numThreads)
 
     // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
-    tryCleanupZookeeper(config.zkConnect, GROUPID)
+    ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId)
     Thread.sleep(500)
 
     // consumer properties
     val consumerProps = new Properties
-    consumerProps.put("groupid", GROUPID)
+    consumerProps.put("groupid", GroupId)
     consumerProps.put("zk.connect", config.zkConnect)
     consumerProps.put("consumer.timeout.ms", "10000")
     consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString)
@@ -137,18 +137,6 @@ object ReplayLogProducer extends Logging
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
   }
 
-  def tryCleanupZookeeper(zkUrl: String, groupId: String) {
-    try {
-      val dir = "/consumers/" + groupId
-      info("Cleaning up temporary zookeeper data under " + dir + ".")
-      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
-      zk.deleteRecursive(dir)
-      zk.close()
-    } catch {
-      case _ => // swallow
-    }
-  }
-
   class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
     val props = new Properties()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Sat Oct 13 03:35:02 2012
@@ -20,6 +20,7 @@ package kafka.tools
 import joptsimple._
 import kafka.utils._
 import kafka.consumer._
+import kafka.client.ClientUtils
 import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
 import kafka.cluster.Broker
 import scala.collection.JavaConversions._
@@ -30,7 +31,7 @@ import scala.collection.JavaConversions.
  */
 object SimpleConsumerShell extends Logging {
 
-  def USE_LEADER_REPLICA = -1
+  def UseLeaderReplica = -1
 
   def main(args: Array[String]): Unit = {
 
@@ -52,7 +53,7 @@ object SimpleConsumerShell extends Loggi
                            .withRequiredArg
                            .describedAs("replica id")
                            .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(USE_LEADER_REPLICA)
+                           .defaultsTo(UseLeaderReplica)
     val offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end")
                            .withOptionalArg()
                            .describedAs("consume offset")
@@ -115,8 +116,8 @@ object SimpleConsumerShell extends Loggi
 
     // getting topic metadata
     info("Getting topic metatdata...")
-    val metadataTargetBrokers = Utils.getAllBrokersFromBrokerList(options.valueOf(brokerListOpt))
-    val topicsMetadata = Utils.getTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata
+    val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata
     if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
       System.exit(1)
@@ -133,7 +134,7 @@ object SimpleConsumerShell extends Loggi
     // validating replica id and initializing target broker
     var fetchTargetBroker: Broker = null
     var replicaOpt: Option[Broker] = null
-    if(replicaId == USE_LEADER_REPLICA) {
+    if(replicaId == UseLeaderReplica) {
       replicaOpt = partitionMetadataOpt.get.leader
       if(!replicaOpt.isDefined) {
         System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId))

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/CommandLineUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/CommandLineUtils.scala?rev=1397765&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/CommandLineUtils.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/CommandLineUtils.scala Sat Oct 13 03:35:02 2012
@@ -0,0 +1,20 @@
+package kafka.utils
+
+import joptsimple.{OptionSpec, OptionSet, OptionParser}
+
+/**
+ * Helper functions for dealing with command line utilities
+ */
+object CommandLineUtils {
+
+    def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
+      for(arg <- required) {
+        if(!options.has(arg)) {
+          error("Missing required argument \"" + arg + "\"")
+          parser.printHelpOn(System.err)
+          System.exit(1)
+        }
+      }
+    }
+  
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Json.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Json.scala?rev=1397765&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Json.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Json.scala Sat Oct 13 03:35:02 2012
@@ -0,0 +1,24 @@
+package kafka.utils
+
+import kafka.common._
+import util.parsing.json.JSON
+
+/**
+ *  A wrapper that synchronizes JSON in scala, which is not threadsafe.
+ */
+object Json extends Logging {
+  val myConversionFunc = {input : String => input.toInt}
+  JSON.globalNumberParser = myConversionFunc
+  val lock = new Object
+
+  def parseFull(input: String): Option[Any] = {
+    lock synchronized {
+      try {
+        JSON.parseFull(input)
+      } catch {
+        case t =>
+          throw new KafkaException("Can't parse json string: %s".format(input), t)
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Sat Oct 13 03:35:02 2012
@@ -26,26 +26,22 @@ import java.util.zip.CRC32
 import javax.management._
 import scala.collection._
 import scala.collection.mutable
-import org.I0Itec.zkclient.ZkClient
-import java.util.{Random, Properties}
-import joptsimple.{OptionSpec, OptionSet, OptionParser}
+import java.util.Properties
 import kafka.common.KafkaException
-import kafka.cluster.Broker
-import util.parsing.json.JSON
-import kafka.api.RequestOrResponse
-import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
-import kafka.producer.{ProducerPool, SyncProducer}
 
 
 /**
- * Helper functions!
+ * General helper functions!
+ * 
+ * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in
+ * the standard library etc. 
+ * 
+ * If you are making a new helper function and want to add it to this class please ensure the following:
+ * 1. It has documentation
+ * 2. It is the most general possible utility, not just the thing you needed in one particular place
+ * 3. You have tests for it if it is nontrivial in any way
  */
 object Utils extends Logging {
-  val random = new Random
-
-  def getNextRandomInt(): Int = random.nextInt
-  
-  def getNextRandomInt(upper: Int): Int = random.nextInt(upper)
 
   /**
    * Wrap the given function in a java.lang.Runnable
@@ -151,55 +147,6 @@ object Utils extends Logging {
     }
     bytes
   }
-  
-  /**
-   * Read size prefixed string where the size is stored as a 2 byte short.
-   * @param buffer The buffer to read from
-   * @param encoding The encoding in which to read the string
-   */
-  def readShortString(buffer: ByteBuffer, encoding: String = RequestOrResponse.DefaultCharset): String = {
-    val size: Int = buffer.getShort()
-    if(size < 0)
-      return null
-    val bytes = new Array[Byte](size)
-    buffer.get(bytes)
-    new String(bytes, encoding)
-  }
-  
-  /**
-   * Write a size prefixed string where the size is stored as a 2 byte short
-   * @param buffer The buffer to write to
-   * @param string The string to write
-   * @param encoding The encoding in which to write the string
-   */
-  def writeShortString(buffer: ByteBuffer, string: String, encoding: String = RequestOrResponse.DefaultCharset) {
-    if(string == null) {
-      buffer.putShort(-1)
-    } else if(string.length > Short.MaxValue) {
-      throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
-    } else {
-      buffer.putShort(string.length.asInstanceOf[Short])
-      buffer.put(string.getBytes(encoding))
-    }
-  }
-  
-  /**
-   * Return size of a size prefixed string where the size is stored as a 2 byte short
-   * @param string The string to write
-   * @param encoding The encoding in which to write the string
-   */
-  def shortStringLength(string: String, encoding: String = RequestOrResponse.DefaultCharset): Int = {
-    if(string == null) {
-      2
-    } else {
-      val encodedString = string.getBytes(encoding)
-      if(encodedString.length > Short.MaxValue) {
-        throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
-      } else {
-        2 + encodedString.length
-      }
-    }
-  }
 
   /**
    * Read a properties file from the given path
@@ -212,27 +159,6 @@ object Utils extends Logging {
     props
   }
 
-  def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
-    val value = buffer.getInt
-    if(value < range._1 || value > range._2)
-      throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
-    else value
-  }
-
-  def getShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = {
-    val value = buffer.getShort
-    if(value < range._1 || value > range._2)
-      throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
-    else value
-  }
-
-  def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
-    val value = buffer.getLong
-    if(value < range._1 || value > range._2)
-      throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
-    else value
-  }
-
   /**
    * Open a channel for the given file
    */
@@ -278,7 +204,7 @@ object Utils extends Logging {
    * @param buffer The buffer to translate
    * @param encoding The encoding to use in translating bytes to characters
    */
-  def toString(buffer: ByteBuffer, encoding: String): String = {
+  def readString(buffer: ByteBuffer, encoding: String): String = {
     val bytes = new Array[Byte](buffer.remaining)
     buffer.get(bytes)
     new String(bytes, encoding)
@@ -365,7 +291,7 @@ object Utils extends Logging {
    * @param buffer The buffer to read from
    * @return The integer read, as a long to avoid signedness
    */
-  def getUnsignedInt(buffer: ByteBuffer): Long = 
+  def readUnsignedInt(buffer: ByteBuffer): Long = 
     buffer.getInt() & 0xffffffffL
   
   /**
@@ -375,7 +301,7 @@ object Utils extends Logging {
    * @param index the index from which to read the integer
    * @return The integer read, as a long to avoid signedness
    */
-  def getUnsignedInt(buffer: ByteBuffer, index: Int): Long = 
+  def readUnsignedInt(buffer: ByteBuffer, index: Int): Long = 
     buffer.getInt(index) & 0xffffffffL
   
   /**
@@ -383,7 +309,7 @@ object Utils extends Logging {
    * @param buffer The buffer to write to
    * @param value The value to write
    */
-  def putUnsignedInt(buffer: ByteBuffer, value: Long): Unit = 
+  def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit = 
     buffer.putInt((value & 0xffffffffL).asInstanceOf[Int])
   
   /**
@@ -392,7 +318,7 @@ object Utils extends Logging {
    * @param index The position in the buffer at which to begin writing
    * @param value The value to write
    */
-  def putUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = 
+  def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = 
     buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int])
   
   /**
@@ -458,6 +384,10 @@ object Utils extends Logging {
     }
   } 
   
+  /**
+   * Throw an exception if the given value is null, else return it. You can use this like:
+   * val myValue = Utils.notNull(expressionThatShouldntBeNull)
+   */
   def notNull[V](v: V) = {
     if(v == null)
       throw new KafkaException("Value cannot be null.")
@@ -465,16 +395,17 @@ object Utils extends Logging {
       v
   }
 
-  def getHostPort(hostport: String) : (String, Int) = {
+  /**
+   * Parse a host and port out of a string
+   */
+  def parseHostPort(hostport: String) : (String, Int) = {
     val splits = hostport.split(":")
     (splits(0), splits(1).toInt)
   }
 
-  def getTopicPartition(topicPartition: String) : (String, Int) = {
-    val index = topicPartition.lastIndexOf('-')
-    (topicPartition.substring(0,index), topicPartition.substring(index+1).toInt)
-  }
-
+  /**
+   * Get the stack trace from an exception as a string
+   */
   def stackTrace(e: Throwable): String = {
     val sw = new StringWriter;
     val pw = new PrintWriter(sw);
@@ -486,113 +417,30 @@ object Utils extends Logging {
    * This method gets comma seperated values which contains key,value pairs and returns a map of
    * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
    */
-  private def getCSVMap[K, V](allCSVals: String, exceptionMsg:String, successMsg:String) :Map[K, V] = {
-    val map = new mutable.HashMap[K, V]
-    if("".equals(allCSVals))
-      return map
-    val csVals = allCSVals.split(",")
-    for(i <- 0 until csVals.length)
-    {
-     try{
-      val tempSplit = csVals(i).split(":")
-      info(successMsg + tempSplit(0) + " : " + Integer.parseInt(tempSplit(1).trim))
-      map += tempSplit(0).asInstanceOf[K] -> Integer.parseInt(tempSplit(1).trim).asInstanceOf[V]
-      } catch {
-          case _ =>  error(exceptionMsg + ": " + csVals(i))
-        }
-    }
-    map
+  def parseCsvMap(str: String): Map[String, String] = {
+    val map = new mutable.HashMap[String, String]
+    if("".equals(str))
+      return map    
+    val keyVals = str.split("\\s*,\\s*").map(s => s.split("\\s*:\\s*"))
+    keyVals.map(pair => (pair(0), pair(1))).toMap
   }
-
-  def getCSVList(csvList: String): Seq[String] = {
+  
+  /**
+   * Parse a comma separated string into a sequence of strings.
+   * Whitespace surrounding the comma will be removed.
+   */
+  def parseCsvList(csvList: String): Seq[String] = {
     if(csvList == null)
       Seq.empty[String]
     else {
-      csvList.split(",").filter(v => !v.equals(""))
+      csvList.split("\\s*,\\s*").filter(v => !v.equals(""))
     }
   }
 
-  def seqToCSV(seq: Seq[String]): String = {
-    var csvString = ""
-    for (i <- 0 until seq.size) {
-      if (i > 0)
-        csvString = csvString + ','
-      csvString = csvString + seq(i)
-    }
-    csvString
-  }
-
-  def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = {
-    val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
-    val successMsg =  "The retention hours for "
-    val map: Map[String, Int] = getCSVMap(retentionHours, exceptionMsg, successMsg)
-    map.foreach{case(topic, hrs) =>
-      require(hrs > 0, "Log retention hours value for topic " + topic + " is " + hrs +
-        " which is not greater than 0.")}
-    map
-  }
-
-  def getTopicRollHours(rollHours: String) : Map[String, Int] = {
-    val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: "
-    val successMsg =  "The roll hours for "
-    val map: Map[String, Int] = getCSVMap(rollHours, exceptionMsg, successMsg)
-    map.foreach{case(topic, hrs) =>
-      require(hrs > 0, "Log roll hours value for topic " + topic + " is " + hrs +
-        " which is not greater than 0.")}
-    map
-  }
-
-  def getTopicFileSize(fileSizes: String): Map[String, Int] = {
-    val exceptionMsg = "Malformed token for topic.log.file.size in server.properties: "
-    val successMsg =  "The log file size for "
-    val map: Map[String, Int] = getCSVMap(fileSizes, exceptionMsg, successMsg)
-    map.foreach{case(topic, size) =>
-      require(size > 0, "Log file size value for topic " + topic + " is " + size +
-        " which is not greater than 0.")}
-    map
-  }
-
-  def getTopicRetentionSize(retentionSizes: String): Map[String, Long] = {
-    val exceptionMsg = "Malformed token for topic.log.retention.size in server.properties: "
-    val successMsg =  "The log retention size for "
-    val map: Map[String, Long] = getCSVMap(retentionSizes, exceptionMsg, successMsg)
-    map.foreach{case(topic, size) =>
-      require(size > 0, "Log retention size value for topic " + topic + " is " + size +
-        " which is not greater than 0.")}
-    map
-  }
-
-  def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = {
-    val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: "
-    val successMsg =  "The flush interval for "
-    val map: Map[String, Int] = getCSVMap(allIntervals, exceptionMsg, successMsg)
-    map.foreach{case(topic, interval) =>
-      require(interval > 0, "Flush interval value for topic " + topic + " is " + interval +
-        " ms which is not greater than 0.")}
-    map
-  }
-
-  def getTopicPartitions(allPartitions: String) : Map[String, Int] = {
-    val exceptionMsg = "Malformed token for topic.partition.counts in server.properties: "
-    val successMsg =  "The number of partitions for topic  "
-    val map: Map[String, Int] = getCSVMap(allPartitions, exceptionMsg, successMsg)
-    map.foreach{case(topic, count) =>
-      require(count > 0, "The number of partitions for topic " + topic + " is " + count +
-        " which is not greater than 0.")}
-    map
-  }
-
-  def getConsumerTopicMap(consumerTopicString: String) : Map[String, Int] = {
-    val exceptionMsg = "Malformed token for embeddedconsumer.topics in consumer.properties: "
-    val successMsg =  "The number of consumer threads for topic  "
-    val map: Map[String, Int] = getCSVMap(consumerTopicString, exceptionMsg, successMsg)
-    map.foreach{case(topic, count) =>
-      require(count > 0, "The number of consumer threads for topic " + topic + " is " + count +
-        " which is not greater than 0.")}
-    map
-  }
-
-  def getObject[T<:AnyRef](className: String): T = {
+  /**
+   * Create an instance of the class with the given class name
+   */
+  def createObject[T<:AnyRef](className: String): T = {
     className match {
       case null => null.asInstanceOf[T]
       case _ =>
@@ -604,27 +452,15 @@ object Utils extends Logging {
     }
   }
 
-  def propertyExists(prop: String): Boolean = {
-    if(prop == null)
-      false
-    else if(prop.compareTo("") == 0)
-      false
-    else true
-  }
-
-  def tryCleanupZookeeper(zkUrl: String, groupId: String) {
-    try {
-      val dir = "/consumers/" + groupId
-      info("Cleaning up temporary zookeeper data under " + dir + ".")
-      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
-      zk.deleteRecursive(dir)
-      zk.close()
-    } catch {
-      case _ => // swallow
-    }
-  }
+  /**
+   * Is the given string null or empty ("")?
+   */
+  def nullOrEmpty(s: String): Boolean = s == null || s.equals("")
 
-  def stringMapToJsonString(jsonDataMap: Map[String, String]): String = {
+  /**
+   * Format a Map[String, String] as JSON
+   */
+  def stringMapToJson(jsonDataMap: Map[String, String]): String = {
     val builder = new StringBuilder
     builder.append("{ ")
     var numElements = 0
@@ -639,6 +475,9 @@ object Utils extends Logging {
     builder.toString
   }
 
+  /**
+   * Format an arbitrary map as JSON
+   */
   def mapToJson[T <: Any](map: Map[String, Seq[String]]): String = {
     val builder = new StringBuilder
     builder.append("{ ")
@@ -654,6 +493,9 @@ object Utils extends Logging {
     builder.toString
   }
 
+  /**
+   * Format a string array as json
+   */
   def arrayToJson[T <: Any](arr: Array[String]): String = {
     val builder = new StringBuilder
     builder.append("[ ")
@@ -668,57 +510,6 @@ object Utils extends Logging {
     builder.toString
   }
 
-  def getAllBrokersFromBrokerList(brokerListStr: String): Seq[Broker] = {
-    val brokersStr = Utils.getCSVList(brokerListStr)
-
-    brokersStr.zipWithIndex.map(b =>{
-      val brokerStr = b._1
-      val brokerId = b._2
-      val brokerInfos = brokerStr.split(":")
-      val hostName = brokerInfos(0)
-      val port = brokerInfos(1).toInt
-      val creatorId = hostName + "-" + System.currentTimeMillis()
-      new Broker(brokerId, creatorId, hostName, port)
-    })
-  }
-
-  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
-    for(arg <- required) {
-      if(!options.has(arg)) {
-        error("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-  }
-
-  def getTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = {
-    var fetchMetaDataSucceeded: Boolean = false
-    var i: Int = 0
-    val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
-    var topicMetadataResponse: TopicMetadataResponse = null
-    var t: Throwable = null
-    while(i < brokers.size && !fetchMetaDataSucceeded) {
-      val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i))
-      info("Fetching metadata for topic %s".format(topics))
-      try {
-        topicMetadataResponse = producer.send(topicMetadataRequest)
-        fetchMetaDataSucceeded = true
-      }
-      catch {
-        case e =>
-          warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e)
-          t = e
-      } finally {
-        i = i + 1
-        producer.close()
-      }
-    }
-    if(!fetchMetaDataSucceeded){
-      throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t)
-    }
-    return topicMetadataResponse
-  }
 
   /**
    * Create a circular (looping) iterator over a collection.
@@ -731,35 +522,25 @@ object Utils extends Logging {
     stream.iterator
   }
 
-  def readFileIntoString(path: String): String = {
+  /**
+   * Attempt to read a file as a string
+   */
+  def readFileAsString(path: String, charset: Charset = Charset.defaultCharset()): String = {
     val stream = new FileInputStream(new File(path))
     try {
       val fc = stream.getChannel()
       val bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size())
-      Charset.defaultCharset().decode(bb).toString()
+      charset.decode(bb).toString()
     }
     finally {
       stream.close()
     }
   }
-}
-
-/**
- *  A wrapper that synchronizes JSON in scala, which is not threadsafe.
- */
-object SyncJSON extends Logging {
-  val myConversionFunc = {input : String => input.toInt}
-  JSON.globalNumberParser = myConversionFunc
-  val lock = new Object
-
-  def parseFull(input: String): Option[Any] = {
-    lock synchronized {
-      try {
-        JSON.parseFull(input)
-      } catch {
-        case t =>
-          throw new KafkaException("Can't parse json string: %s".format(input), t)
-      }
-    }
-  }
+  
+  /**
+   * Get the absolute value of the given number. If the number is Int.MinValue return 0.
+   * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
+   */
+  def abs(n: Int) = n & 0x7fffffff
+  
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala Sat Oct 13 03:35:02 2012
@@ -18,7 +18,7 @@
 package kafka.utils
 
 import java.util.Properties
-import collection.mutable
+import scala.collection._
 
 class VerifiableProperties(val props: Properties) extends Logging {
   private val referenceSet = mutable.HashSet[String]()
@@ -156,6 +156,23 @@ class VerifiableProperties(val props: Pr
     require(containsKey(name), "Missing required property '" + name + "'")
     getProperty(name)
   }
+  
+  /**
+   * Get a Map[String, String] from a property list in the form k1:v2, k2:v2, ...
+   */
+  def getMap(name: String, valid: String => Boolean): Map[String, String] = {
+    try {
+      val m = Utils.parseCsvMap(getString(name, ""))
+      m.foreach {
+        case(key, value) => 
+          if(!valid(value))
+            throw new IllegalArgumentException("Invalid entry '%s' = '%s' for property '%s'".format(key, value, name))
+      }
+      m
+    } catch {
+      case e: Exception => throw new IllegalArgumentException("Error parsing configuration property '%s': %s".format(name, e.getMessage))
+    }
+  }
 
   def verify() {
     info("Verifying properties")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Sat Oct 13 03:35:02 2012
@@ -75,8 +75,8 @@ object ZkUtils extends Logging {
   }
 
   def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
-    val leaderAndISRPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
-    val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndISRPath)
+    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
+    val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath)
     val leaderAndIsrOpt = leaderAndIsrInfo._1
     val stat = leaderAndIsrInfo._2
     leaderAndIsrOpt match {
@@ -86,12 +86,12 @@ object ZkUtils extends Logging {
   }
 
   def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = {
-    SyncJSON.parseFull(leaderAndIsrStr) match {
+    Json.parseFull(leaderAndIsrStr) match {
       case Some(m) =>
         val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
         val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
         val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
-        val isr = Utils.getCSVList(isrString).map(r => r.toInt)
+        val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
         val zkPathVersion = stat.getVersion
         debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
           isr.toString(), zkPathVersion, topic, partition))
@@ -104,7 +104,7 @@ object ZkUtils extends Logging {
     val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1
     leaderAndIsrOpt match {
       case Some(leaderAndIsr) =>
-        SyncJSON.parseFull(leaderAndIsr) match {
+        Json.parseFull(leaderAndIsr) match {
           case Some(m) =>
             Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
           case None => None
@@ -122,7 +122,7 @@ object ZkUtils extends Logging {
     val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1
     leaderAndIsrOpt match {
       case Some(leaderAndIsr) =>
-        SyncJSON.parseFull(leaderAndIsr) match {
+        Json.parseFull(leaderAndIsr) match {
           case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition))
           case Some(m) => m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
         }
@@ -138,10 +138,10 @@ object ZkUtils extends Logging {
     val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1
     leaderAndIsrOpt match {
       case Some(leaderAndIsr) =>
-        SyncJSON.parseFull(leaderAndIsr) match {
+        Json.parseFull(leaderAndIsr) match {
           case Some(m) =>
-            val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
-            Utils.getCSVList(ISRString).map(r => r.toInt)
+            val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
+            Utils.parseCsvList(isrString).map(r => r.toInt)
           case None => Seq.empty[Int]
         }
       case None => Seq.empty[Int]
@@ -155,7 +155,7 @@ object ZkUtils extends Logging {
     val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
     jsonPartitionMapOpt match {
       case Some(jsonPartitionMap) =>
-        SyncJSON.parseFull(jsonPartitionMap) match {
+        Json.parseFull(jsonPartitionMap) match {
           case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match {
             case None => Seq.empty[Int]
             case Some(seq) => seq.map(_.toInt)
@@ -328,7 +328,7 @@ object ZkUtils extends Logging {
       case e2 => throw e2
     }
   }
-
+  
   def deletePath(client: ZkClient, path: String): Boolean = {
     try {
       client.delete(path)
@@ -351,6 +351,16 @@ object ZkUtils extends Logging {
       case e2 => throw e2
     }
   }
+  
+  def maybeDeletePath(zkUrl: String, dir: String) {
+    try {
+      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+      zk.deleteRecursive(dir)
+      zk.close()
+    } catch {
+      case _ => // swallow
+    }
+  }
 
   def readData(client: ZkClient, path: String): (String, Stat) = {
     val stat: Stat = new Stat()
@@ -413,7 +423,7 @@ object ZkUtils extends Logging {
       val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
       jsonPartitionMapOpt match {
         case Some(jsonPartitionMap) =>
-          SyncJSON.parseFull(jsonPartitionMap) match {
+          Json.parseFull(jsonPartitionMap) match {
             case Some(m) =>
               val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
               for((partition, replicas) <- replicaMap){
@@ -449,7 +459,7 @@ object ZkUtils extends Logging {
       val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
       jsonPartitionMapOpt match {
         case Some(jsonPartitionMap) =>
-          SyncJSON.parseFull(jsonPartitionMap) match {
+          Json.parseFull(jsonPartitionMap) match {
             case Some(m) =>
               val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
               for((partition, replicas) <- replicaMap){
@@ -471,7 +481,7 @@ object ZkUtils extends Logging {
       val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
       val partitionMap = jsonPartitionMapOpt match {
         case Some(jsonPartitionMap) =>
-          SyncJSON.parseFull(jsonPartitionMap) match {
+          Json.parseFull(jsonPartitionMap) match {
             case Some(m) =>
               val m1 = m.asInstanceOf[Map[String, Seq[String]]]
               m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
@@ -535,7 +545,7 @@ object ZkUtils extends Logging {
   }
 
   def parsePartitionReassignmentData(jsonData: String):Map[TopicAndPartition, Seq[Int]] = {
-    SyncJSON.parseFull(jsonData) match {
+    Json.parseFull(jsonData) match {
       case Some(m) =>
         val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
         replicaMap.map { reassignedPartitions =>
@@ -590,7 +600,7 @@ object ZkUtils extends Logging {
   }
 
   def parsePreferredReplicaElectionData(jsonData: String):Set[TopicAndPartition] = {
-    SyncJSON.parseFull(jsonData) match {
+    Json.parseFull(jsonData) match {
       case Some(m) =>
         val topicAndPartitions = m.asInstanceOf[Array[Map[String, String]]]
         val partitions = topicAndPartitions.map { p =>

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala Sat Oct 13 03:35:02 2012
@@ -62,7 +62,7 @@ private class ConsumerThread(stream: Kaf
   override def run() {
     println("Starting consumer thread..")
     for (messageAndMetadata <- stream) {
-      println("consumed: " + Utils.toString(messageAndMetadata.message.payload, "UTF-8"))
+      println("consumed: " + Utils.readString(messageAndMetadata.message.payload, "UTF-8"))
     }
     shutdownLatch.countDown
     println("thread shutdown !" )

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala Sat Oct 13 03:35:02 2012
@@ -82,18 +82,18 @@ object SerializationTestUtils{
   private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
   private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
 
-  def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = {
-    val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1)
-    val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2)
-    val map = Map(((topic1, 0), PartitionStateInfo(leaderAndISR1, 3)),
-                  ((topic2, 0), PartitionStateInfo(leaderAndISR2, 3)))
+  def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = {
+    val leaderAndIsr1 = new LeaderAndIsr(leader1, 1, isr1, 1)
+    val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2)
+    val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
+                  ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
     new LeaderAndIsrRequest(map)
   }
 
-  def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
+  def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
     val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
                           ((topic2, 0), ErrorMapping.NoError))
-    new LeaderAndISRResponse(1, responseMap)
+    new LeaderAndIsrResponse(1, responseMap)
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
@@ -145,8 +145,8 @@ object SerializationTestUtils{
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
-  private val leaderAndISRRequest = SerializationTestUtils.createTestLeaderAndISRRequest
-  private val leaderAndISRResponse = SerializationTestUtils.createTestLeaderAndISRResponse
+  private val leaderAndIsrRequest = SerializationTestUtils.createTestLeaderAndIsrRequest
+  private val leaderAndIsrResponse = SerializationTestUtils.createTestLeaderAndIsrResponse
   private val stopReplicaRequest = SerializationTestUtils.createTestStopReplicaRequest
   private val stopReplicaResponse = SerializationTestUtils.createTestStopReplicaResponse
   private val producerRequest = SerializationTestUtils.createTestProducerRequest
@@ -160,19 +160,19 @@ class RequestResponseSerializationTest e
 
   @Test
   def testSerializationAndDeserialization() {
-    var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes())
-    leaderAndISRRequest.writeTo(buffer)
+    var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndIsrRequest.sizeInBytes())
+    leaderAndIsrRequest.writeTo(buffer)
     buffer.rewind()
-    val deserializedLeaderAndISRRequest = LeaderAndIsrRequest.readFrom(buffer)
-    assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndISRRequest,
-                 deserializedLeaderAndISRRequest)
-
-    buffer = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes())
-    leaderAndISRResponse.writeTo(buffer)
-    buffer.rewind()
-    val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(buffer)
-    assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndISRResponse,
-                 deserializedLeaderAndISRResponse)
+    val deserializedLeaderAndIsrRequest = LeaderAndIsrRequest.readFrom(buffer)
+    assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndIsrRequest,
+                 deserializedLeaderAndIsrRequest)
+
+    buffer = ByteBuffer.allocate(leaderAndIsrResponse.sizeInBytes())
+    leaderAndIsrResponse.writeTo(buffer)
+    buffer.rewind()
+    val deserializedLeaderAndIsrResponse = LeaderAndIsrResponse.readFrom(buffer)
+    assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndIsrResponse,
+                 deserializedLeaderAndIsrResponse)
 
     buffer = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes())
     stopReplicaRequest.writeTo(buffer)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Sat Oct 13 03:35:02 2012
@@ -286,7 +286,7 @@ class ZookeeperConsumerConnectorTest ext
     // send some messages to each broker
     val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec)
     val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
-    val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.toString(m.payload, "UTF-8")).
+    val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.readString(m.payload, "UTF-8")).
       sortWith((s, t) => s.compare(t) == -1)
 
     val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -401,7 +401,7 @@ class ZookeeperConsumerConnectorTest ext
           assertTrue(iterator.hasNext)
           val message = iterator.next.message
           messages ::= message
-          debug("received message: " + Utils.toString(message.payload, "UTF-8"))
+          debug("received message: " + Utils.readString(message.payload, "UTF-8"))
         }
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Sat Oct 13 03:35:02 2012
@@ -105,7 +105,7 @@ class ZookeeperConsumerConnectorTest ext
           assertTrue(iterator.hasNext)
           val message = iterator.next.message
           messages ::= message
-          debug("received message: " + Utils.toString(message.payload, "UTF-8"))
+          debug("received message: " + Utils.readString(message.payload, "UTF-8"))
         }
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Sat Oct 13 03:35:02 2012
@@ -159,7 +159,7 @@ class LogManagerTest extends JUnit3Suite
                    override val logFileSize = 1024 *1024 *1024
                    override val flushSchedulerThreadRate = 50
                    override val flushInterval = Int.MaxValue
-                   override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
+                   override val flushIntervalMap = Map("timebasedflush" -> 100)
                  }
     logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false)
     logManager.startup

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala Sat Oct 13 03:35:02 2012
@@ -66,7 +66,7 @@ class MessageTest extends JUnitSuite {
       assertTrue("Auto-computed checksum should be valid", v.message.isValid)
       // garble checksum
       val badChecksum: Int = (v.message.checksum + 1 % Int.MaxValue).toInt
-      Utils.putUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum)
+      Utils.writeUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum)
       assertFalse("Message with invalid checksum should be invalid", v.message.isValid)
     }
   }

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1397765&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Sat Oct 13 03:35:02 2012
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import collection.mutable.HashMap
+import collection.mutable.Map
+import kafka.cluster.{Partition, Replica}
+import org.easymock.EasyMock
+import kafka.log.Log
+import org.junit.Assert._
+import kafka.utils._
+
+class IsrExpirationTest extends JUnit3Suite {
+
+  var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
+  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+    override val replicaMaxLagTimeMs = 100L
+    override val replicaMaxLagBytes = 10L
+  })
+  val topic = "foo"
+
+  def testIsrExpirationForStuckFollowers() {
+    val time = new MockTime
+    val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
+
+    // create one partition and all replicas
+    val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+
+    // let the follower catch up to 10
+    (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10)
+    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+
+    // let some time pass
+    time.sleep(150)
+
+    // now follower (broker id 1) has caught up to only 10, while the leader is at 15 AND the follower hasn't
+    // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
+    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+    EasyMock.verify(log)
+  }
+
+  def testIsrExpirationForSlowFollowers() {
+    val time = new MockTime
+    // create leader replica
+    val log = getLogWithLogEndOffset(15L, 1)
+    // add one partition
+    val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+    // set remote replicas leo to something low, like 4
+    (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 4L)
+
+    // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than
+    // replicaMaxLagBytes, the follower is out of sync.
+    val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+
+    EasyMock.verify(log)
+  }
+
+  private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
+                                               localLog: Log): Partition = {
+    val leaderId=config.brokerId
+    val replicaManager = new ReplicaManager(config, time, null, null, null)
+    val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1)
+    val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
+
+    val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
+    allReplicas.foreach(r => partition.addReplicaIfNotExists(r))
+    // set in sync replicas for this partition to all the assigned replicas
+    partition.inSyncReplicas = allReplicas.toSet
+    // set the leader and its hw and the hw update time
+    partition.leaderReplicaIdOpt = Some(leaderId)
+    partition
+  }
+
+  private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = {
+    val log1 = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.expect(log1.logEndOffset).andReturn(logEndOffset).times(expectedCalls)
+    EasyMock.replay(log1)
+
+    log1
+  }
+
+  private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
+    configs.filter(_.brokerId != leaderId).map { config =>
+      new Replica(config.brokerId, partition, time)
+    }
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sat Oct 13 03:35:02 2012
@@ -379,18 +379,18 @@ object TestUtils extends Logging {
         val partition = leaderForPartition._1
         val leader = leaderForPartition._2
         try{
-          val currentLeaderAndISROpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
-          var newLeaderAndISR: LeaderAndIsr = null
-          if(currentLeaderAndISROpt == None)
-            newLeaderAndISR = new LeaderAndIsr(leader, List(leader))
+          val currentLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+          var newLeaderAndIsr: LeaderAndIsr = null
+          if(currentLeaderAndIsrOpt == None)
+            newLeaderAndIsr = new LeaderAndIsr(leader, List(leader))
           else{
-            newLeaderAndISR = currentLeaderAndISROpt.get
-            newLeaderAndISR.leader = leader
-            newLeaderAndISR.leaderEpoch += 1
-            newLeaderAndISR.zkVersion += 1
+            newLeaderAndIsr = currentLeaderAndIsrOpt.get
+            newLeaderAndIsr.leader = leader
+            newLeaderAndIsr.leaderEpoch += 1
+            newLeaderAndIsr.zkVersion += 1
           }
           ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-            newLeaderAndISR.toString)
+            newLeaderAndIsr.toString)
         } catch {
           case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe)
         }

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala Sat Oct 13 03:35:02 2012
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.Atomi
 import java.nio.channels.ClosedByInterruptException
 import org.apache.log4j.Logger
 import kafka.message.Message
-import kafka.utils.Utils
+import kafka.utils.ZkUtils
 import java.util.{Random, Properties}
 import kafka.consumer._
 import java.text.SimpleDateFormat
@@ -48,7 +48,7 @@ object ConsumerPerformance {
     }
 
     // clean up zookeeper state for this group id for every perf run
-    Utils.tryCleanupZookeeper(config.consumerConfig.zkConnect, config.consumerConfig.groupId)
+    ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId)
 
     val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)
 



Mime
View raw message