kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-513 Add state change log to Kafka brokers; reviewed by Neha Narkhede
Date Wed, 06 Mar 2013 17:38:13 GMT
Updated Branches:
  refs/heads/0.8 2e64c6a5f -> 2457bc49e


KAFKA-513 Add state change log to Kafka brokers; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2457bc49
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2457bc49
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2457bc49

Branch: refs/heads/0.8
Commit: 2457bc49ef39c70622816250025eefc3bfcc7640
Parents: 2e64c6a
Author: Swapnil Ghike <sghike@linkedin.com>
Authored: Wed Mar 6 09:33:57 2013 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed Mar 6 09:35:35 2013 -0800

----------------------------------------------------------------------
 bin/kafka-run-class.sh                             |    2 +-
 config/log4j.properties                            |   13 +-
 core/src/main/scala/kafka/api/FetchRequest.scala   |    4 +-
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |   21 +-
 .../scala/kafka/api/LeaderAndIsrResponse.scala     |    4 +-
 core/src/main/scala/kafka/api/OffsetRequest.scala  |    4 +-
 core/src/main/scala/kafka/api/OffsetResponse.scala |    4 +-
 .../src/main/scala/kafka/api/ProducerRequest.scala |    4 +-
 .../main/scala/kafka/api/ProducerResponse.scala    |    5 +-
 .../main/scala/kafka/api/RequestOrResponse.scala   |    2 +-
 .../main/scala/kafka/api/StopReplicaRequest.scala  |    4 +-
 .../main/scala/kafka/api/StopReplicaResponse.scala |    5 +-
 .../scala/kafka/api/TopicMetadataRequest.scala     |    4 +-
 .../scala/kafka/api/TopicMetadataResponse.scala    |    3 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |   37 ++--
 core/src/main/scala/kafka/common/Topic.scala       |    2 +-
 .../controller/ControllerChannelManager.scala      |   34 ++-
 .../scala/kafka/controller/KafkaController.scala   |    4 +-
 .../kafka/controller/PartitionStateMachine.scala   |   81 ++++---
 .../kafka/controller/ReplicaStateMachine.scala     |   33 ++-
 .../scala/kafka/javaapi/TopicMetadataRequest.scala |    7 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   71 ++++--
 .../scala/kafka/tools/StateChangeLogMerger.scala   |  188 +++++++++++++++
 .../api/RequestResponseSerializationTest.scala     |    2 +-
 .../unit/kafka/server/LeaderElectionTest.scala     |   12 +-
 25 files changed, 414 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index decba0e..e055d67 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -80,4 +80,4 @@ else
   JAVA="$JAVA_HOME/bin/java"
 fi
 
-$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@
+$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index 5692da0..b76bc94 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -36,6 +36,12 @@ log4j.appender.requestAppender.File=kafka-request.log
 log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
 # Turn on all our debugging info
 #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
 #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
@@ -53,5 +59,10 @@ log4j.additivity.kafka.network.RequestChannel$=false
 log4j.logger.kafka.request.logger=TRACE, requestAppender
 log4j.additivity.kafka.request.logger=false
 
-log4j.logger.kafka.controller=TRACE, stateChangeAppender
+log4j.logger.kafka.controller=TRACE, controllerAppender
 log4j.additivity.kafka.controller=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index dc4ed8e..a807c1f 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -59,13 +59,13 @@ object FetchRequest {
 }
 
 case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
-                                        correlationId: Int = FetchRequest.DefaultCorrelationId,
+                                        override val correlationId: Int = FetchRequest.DefaultCorrelationId,
                                         clientId: String = ConsumerConfig.DefaultClientId,
                                         replicaId: Int = Request.OrdinaryConsumerId,
                                         maxWait: Int = FetchRequest.DefaultMaxWait,
                                         minBytes: Int = FetchRequest.DefaultMinBytes,
                                         requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
-        extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
+        extends RequestOrResponse(Some(RequestKeys.FetchKey), correlationId) {
 
   /**
    * Partitions the request info into a map of maps (one for each topic).

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index d146b14..b40522d 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -93,6 +93,7 @@ object LeaderAndIsrRequest {
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerId = buffer.getInt
     val controllerEpoch = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
     val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo]
@@ -110,23 +111,24 @@ object LeaderAndIsrRequest {
     for (i <- 0 until leadersCount)
       leaders += Broker.readFrom(buffer)
 
-    new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
+    new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders)
   }
 }
 
 case class LeaderAndIsrRequest (versionId: Short,
-                                correlationId: Int,
+                                override val correlationId: Int,
                                 clientId: String,
                                 ackTimeoutMs: Int,
+                                controllerId: Int,
+                                controllerEpoch: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                leaders: Set[Broker],
-                                controllerEpoch: Int)
-        extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
+                                leaders: Set[Broker])
+    extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker],
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int,
            controllerEpoch: Int, correlationId: Int) = {
     this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
-      partitionStateInfos, liveBrokers, controllerEpoch)
+      controllerId, controllerEpoch, partitionStateInfos, liveBrokers)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -134,6 +136,7 @@ case class LeaderAndIsrRequest (versionId: Short,
     buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerId)
     buffer.putInt(controllerEpoch)
     buffer.putInt(partitionStateInfos.size)
     for((key, value) <- partitionStateInfos){
@@ -151,6 +154,7 @@ case class LeaderAndIsrRequest (versionId: Short,
       4 /* correlation id */ + 
       (2 + clientId.length) /* client id */ +
       4 /* ack timeout */ +
+      4 /* controller id */ +
       4 /* controller epoch */ +
       4 /* number of partitions */
     for((key, value) <- partitionStateInfos)
@@ -165,10 +169,11 @@ case class LeaderAndIsrRequest (versionId: Short,
     val leaderAndIsrRequest = new StringBuilder
     leaderAndIsrRequest.append("Name: " + this.getClass.getSimpleName)
     leaderAndIsrRequest.append("; Version: " + versionId)
+    leaderAndIsrRequest.append("; Controller: " + controllerId)
+    leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
     leaderAndIsrRequest.append("; CorrelationId: " + correlationId)
     leaderAndIsrRequest.append("; ClientId: " + clientId)
     leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
-    leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
     leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(","))
     leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(","))
     leaderAndIsrRequest.toString()

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
index dbd85d0..b4cfae8 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
@@ -41,10 +41,10 @@ object LeaderAndIsrResponse {
 }
 
 
-case class LeaderAndIsrResponse(correlationId: Int,
+case class LeaderAndIsrResponse(override val correlationId: Int,
                                 responseMap: Map[(String, Int), Short],
                                 errorCode: Short = ErrorMapping.NoError)
-        extends RequestOrResponse {
+    extends RequestOrResponse(correlationId = correlationId) {
   def sizeInBytes(): Int ={
     var size =
       4 /* correlation id */ + 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 6360a98..32ebfd4 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -57,10 +57,10 @@ case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
 
 case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo],
                          versionId: Short = OffsetRequest.CurrentVersion,
-                         correlationId: Int = 0,
+                         override val correlationId: Int = 0,
                          clientId: String = OffsetRequest.DefaultClientId,
                          replicaId: Int = Request.OrdinaryConsumerId)
-        extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
+    extends RequestOrResponse(Some(RequestKeys.OffsetsKey), correlationId) {
 
   def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index 264e200..08dc3cd 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -47,9 +47,9 @@ object OffsetResponse {
 case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long])
 
 
-case class OffsetResponse(correlationId: Int,
+case class OffsetResponse(override val correlationId: Int,
                           partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
-        extends RequestOrResponse {
+    extends RequestOrResponse(correlationId = correlationId) {
 
   lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 916fb59..1e05d7e 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -54,12 +54,12 @@ object ProducerRequest {
 }
 
 case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
-                           correlationId: Int,
+                           override val correlationId: Int,
                            clientId: String,
                            requiredAcks: Short,
                            ackTimeoutMs: Int,
                            data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
-    extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
+    extends RequestOrResponse(Some(RequestKeys.ProduceKey), correlationId) {
 
   /**
    * Partitions the data into a map of maps (one for each topic).

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 5bff709..d59c5bb 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -43,8 +43,9 @@ object ProducerResponse {
 
 case class ProducerResponseStatus(error: Short, offset: Long)
 
-case class ProducerResponse(correlationId: Int,
-                            status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse {
+case class ProducerResponse(override val correlationId: Int,
+                            status: Map[TopicAndPartition, ProducerResponseStatus])
+    extends RequestOrResponse(correlationId = correlationId) {
 
   /**
    * Partitions the status map into a map of maps (one for each topic).

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/RequestOrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index 3175e1c..b62330b 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -27,7 +27,7 @@ object Request {
 }
 
 
-private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Logging{
+private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val correlationId: Int) extends Logging{
 
   def sizeInBytes: Int
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index be3c7be..5107488 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -53,13 +53,13 @@ object StopReplicaRequest extends Logging {
 }
 
 case class StopReplicaRequest(versionId: Short,
-                              correlationId: Int,
+                              override val correlationId: Int,
                               clientId: String,
                               ackTimeoutMs: Int,
                               deletePartitions: Boolean,
                               partitions: Set[(String, Int)],
                               controllerEpoch: Int)
-        extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
+        extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) {
 
   def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int, correlationId: Int) = {
     this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/StopReplicaResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
index fa66b99..c82eadd 100644
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
@@ -42,9 +42,10 @@ object StopReplicaResponse {
 }
 
 
-case class StopReplicaResponse(val correlationId: Int,
+case class StopReplicaResponse(override val correlationId: Int,
                                val responseMap: Map[(String, Int), Short],
-                               val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
+                               val errorCode: Short = ErrorMapping.NoError)
+    extends RequestOrResponse(correlationId = correlationId) {
   def sizeInBytes(): Int ={
     var size =
       4 /* correlation id */ + 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 88007b1..7477cfd 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -47,10 +47,10 @@ object TopicMetadataRequest extends Logging {
 }
 
 case class TopicMetadataRequest(val versionId: Short,
-                                val correlationId: Int,
+                                override val correlationId: Int,
                                 val clientId: String,
                                 val topics: Seq[String])
- extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
+ extends RequestOrResponse(Some(RequestKeys.MetadataKey), correlationId){
 
   def this(topics: Seq[String], correlationId: Int) =
     this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index af76776..290f263 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -34,7 +34,8 @@ object TopicMetadataResponse {
 }
 
 case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata],
-                                 correlationId: Int) extends RequestOrResponse {
+                                 override val correlationId: Int)
+    extends RequestOrResponse(correlationId = correlationId) {
   val sizeInBytes: Int = {
     val brokers = extractBrokers(topicsMetadata).values
     4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 469ac79..39266b5 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -25,6 +25,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.ErrorMapping
 import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
+import org.apache.log4j.Logger
 
 
 /**
@@ -51,7 +52,8 @@ class Partition(val topic: String,
    * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
    * each partition. */
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
-  this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
+  this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
+  private val stateChangeLogger = Logger.getLogger("state.change.logger")
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
@@ -124,15 +126,17 @@ class Partition(val topic: String,
    *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
    *  4. set the new leader and ISR
    */
-  def makeLeader(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Boolean = {
+  def makeLeader(controllerId: Int, topic: String, partitionId: Int,
+                 leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request"
-          .format(leaderEpoch, leaderAndIsr.leaderEpoch))
+        stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation id %d from " +
+                                 "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d")
+                                   .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic,
+                                           partitionId, leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
-      trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
@@ -159,22 +163,21 @@ class Partition(val topic: String,
    *  3. set the leader and set ISR to empty
    *  4. start a fetcher to the new leader
    */
-  def makeFollower(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                   liveBrokers: Set[Broker]): Boolean = {
+  def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                   liveBrokers: Set[Broker], correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-      if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request"
-          .format(leaderEpoch, leaderAndIsr.leaderEpoch))
+      if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
+        stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation id %d from " +
+                                 "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d")
+                                   .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic,
+                                           partitionId, leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
-      trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
       val newLeaderBrokerId: Int = leaderAndIsr.leader
-      info("Starting the follower state transition to follow leader %d for topic %s partition %d"
-        .format(newLeaderBrokerId, topic, partitionId))
       liveBrokers.find(_.id == newLeaderBrokerId) match {
         case Some(leaderBroker) =>
           // stop fetcher thread to previous leader
@@ -189,8 +192,10 @@ class Partition(val topic: String,
           // start fetcher thread to current leader
           replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
         case None => // leader went down
-          warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) +
-          " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId))
+          stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " +
+            " controller %d epoch %d since leader %d for partition [%s,%d] became unavailable during the state change operation"
+                                     .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+                                              newLeaderBrokerId, topic, partitionId))
       }
       true
     }
@@ -198,7 +203,7 @@ class Partition(val topic: String,
 
   def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
     leaderIsrUpdateLock synchronized {
-      debug("Recording follower %d position %d for topic %s partition %d.".format(replicaId, offset, topic, partitionId))
+      debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
       val replica = getOrCreateReplica(replicaId)
       replica.logEndOffset = offset
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 5bd8f6b..c1b9f65 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -20,7 +20,7 @@ package kafka.common
 import util.matching.Regex
 
 object Topic {
-  private val legalChars = "[a-zA-Z0-9\\._\\-]"
+  val legalChars = "[a-zA-Z0-9\\._\\-]"
   private val maxNameLength = 255
   private val rgx = new Regex(legalChars + "+")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e2ca1d6..2e50b8d 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -24,15 +24,18 @@ import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
 import kafka.server.KafkaConfig
 import collection.mutable
 import kafka.api._
+import org.apache.log4j.Logger
 
 class ControllerChannelManager private (config: KafkaConfig) extends Logging {
+  private var controllerContext: ControllerContext = null
   private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
-  this.logIdent = "[Channel manager on controller " + config.brokerId + "], "
+  this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
 
-  def this(allBrokers: Set[Broker], config : KafkaConfig) {
+  def this(controllerContext: ControllerContext, config : KafkaConfig) {
     this(config)
-    allBrokers.foreach(addNewBroker(_))
+    this.controllerContext = controllerContext
+    controllerContext.liveBrokers.foreach(addNewBroker(_))
   }
 
   def startup() = {
@@ -82,7 +85,7 @@ class ControllerChannelManager private (config: KafkaConfig) extends Logging {
       BlockingChannel.UseDefaultBufferSize,
       config.controllerSocketTimeoutMs)
     channel.connect()
-    val requestThread = new RequestSendThread(config.brokerId, broker.id, messageQueue, channel)
+    val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker.id, messageQueue, channel)
     requestThread.setDaemon(false)
     brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread))
   }
@@ -105,11 +108,13 @@ class ControllerChannelManager private (config: KafkaConfig) extends Logging {
 }
 
 class RequestSendThread(val controllerId: Int,
+                        val controllerContext: ControllerContext,
                         val toBrokerId: Int,
                         val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
                         val channel: BlockingChannel)
   extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
   private val lock = new Object()
+  private val stateChangeLogger = Logger.getLogger("state.change.logger")
 
   override def doWork(): Unit = {
     val queueItem = queue.take()
@@ -129,7 +134,8 @@ class RequestSendThread(val controllerId: Int,
           case RequestKeys.StopReplicaKey =>
             response = StopReplicaResponse.readFrom(receive.buffer)
         }
-        trace("Controller %d request to broker %d got a response %s".format(controllerId, toBrokerId, response))
+        stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %d"
+                                  .format(controllerId, controllerContext.epoch, response.correlationId, toBrokerId))
 
         if(callback != null){
           callback(response)
@@ -143,11 +149,12 @@ class RequestSendThread(val controllerId: Int,
   }
 }
 
-class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit)
+class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit, controllerId: Int)
   extends  Logging {
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
   val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
+  private val stateChangeLogger = Logger.getLogger("state.change.logger")
 
   def newBatch() {
     // raise error if the previous batch is not empty
@@ -162,10 +169,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicationFactor: Int) {
     brokerIds.foreach { brokerId =>
-      leaderAndIsrRequestMap.getOrElseUpdate(brokerId,
-                                             new mutable.HashMap[(String, Int), PartitionStateInfo])
-      leaderAndIsrRequestMap(brokerId).put((topic, partition),
-                                           PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor))
+      leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
+      leaderAndIsrRequestMap(brokerId).put((topic, partition), PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor))
     }
   }
 
@@ -190,8 +195,13 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
       val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch, correlationId)
-      debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId)
+      for (p <- partitionStateInfos) {
+        val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
+        stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " +
+                                 "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, correlationId, broker,
+                                                                 p._1._1, p._1._2))
+      }
       sendRequest(broker, leaderAndIsrRequest, null)
     }
     leaderAndIsrRequestMap.clear()

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 48eae7e..e18ab07 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -89,7 +89,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, config.brokerId)
   registerControllerChangedListener()
 
   newGauge(
@@ -491,7 +491,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   private def startChannelManager() {
-    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext.liveBrokers, config)
+    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config)
     controllerContext.controllerChannelManager.startup()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 4078604..ce4b9e8 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -24,6 +24,7 @@ import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOff
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import org.apache.log4j.Logger
 
 /**
  * This class represents the state machine for partitions. It defines the states that a partition can be in, and
@@ -38,13 +39,15 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
  *                          moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
  */
 class PartitionStateMachine(controller: KafkaController) extends Logging {
-  this.logIdent = "[Partition state machine on Controller " + controller.config.brokerId + "]: "
   private val controllerContext = controller.controllerContext
+  private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val isShuttingDown = new AtomicBoolean(false)
+  this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
+  private val stateChangeLogger = Logger.getLogger("state.change.logger")
 
   /**
    * Invoked on successful controller election. First registers a topic change listener since that triggers all
@@ -126,12 +129,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       targetState match {
         case NewPartition =>
           // pre: partition did not exist before this
-          // post: partition has been assigned replicas
           assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
           assignReplicasToPartitions(topic, partition)
           partitionState.put(topicAndPartition, NewPartition)
-          info("Partition %s state changed from NotExists to New with assigned replicas ".format(topicAndPartition) +
-            "%s".format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")))
+          val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from NotExists to New with assigned replicas %s"
+                                    .format(controllerId, controller.epoch, topicAndPartition, assignedReplicas))
+          // post: partition has been assigned replicas
         case OnlinePartition =>
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
           partitionState(topicAndPartition) match {
@@ -144,27 +148,31 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
               electLeaderForPartition(topic, partition, leaderSelector)
             case _ => // should never come here since illegal previous states are checked above
           }
-          info("Partition %s state changed from %s to OnlinePartition with leader %d".format(topicAndPartition,
-            partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader))
           partitionState.put(topicAndPartition, OnlinePartition)
+          val leader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to OnlinePartition with leader %d"
+                                    .format(controllerId, controller.epoch, topicAndPartition, partitionState(topicAndPartition), leader))
            // post: partition has a leader
         case OfflinePartition =>
-          // pre: partition should be in Online state
+          // pre: partition should be in New or Online state
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
           // should be called when the leader for a partition is no longer alive
-          info("Partition %s state changed from Online to Offline".format(topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Online to Offline"
+                                    .format(controllerId, controller.epoch, topicAndPartition))
           partitionState.put(topicAndPartition, OfflinePartition)
           // post: partition has no alive leader
         case NonExistentPartition =>
-          // pre: partition could be in either of the above states
+          // pre: partition should be in Offline state
           assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
-          info("Partition %s state changed from Offline to NotExists".format(topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Offline to NotExists"
+                                    .format(controllerId, controller.epoch, topicAndPartition))
           partitionState.put(topicAndPartition, NonExistentPartition)
           // post: partition state is deleted from all brokers and zookeeper
       }
     } catch {
-      case t: Throwable => error("State change for partition %s ".format(topicAndPartition) +
-        "from %s to %s failed".format(currState, targetState), t)
+      case t: Throwable =>
+        stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed"
+                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
     }
   }
 
@@ -225,9 +233,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     liveAssignedReplicas.size match {
       case 0 =>
         ControllerStats.offlinePartitionRate.mark()
-        throw new StateChangeFailedException(("During state change of partition %s from NEW to ONLINE, assigned replicas are " +
-          "[%s], live brokers are [%s]. No assigned replica is alive").format(topicAndPartition,
-          replicaAssignment.mkString(","), controllerContext.liveBrokerIds))
+        val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " +
+                       "live brokers are [%s]. No assigned replica is alive.")
+                         .format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
+        stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
+        throw new StateChangeFailedException(failMsg)
       case _ =>
         debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
         // make the first replica in the list of assigned replicas, the leader
@@ -251,9 +261,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
               topicAndPartition.partition).get
             ControllerStats.offlinePartitionRate.mark()
-            throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
-              .format(topicAndPartition) + " since Leader and isr path already exists with value " +
-              "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch))
+            val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +
+                           "exists with value %s and controller epoch %d")
+                             .format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)
+            stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
+            throw new StateChangeFailedException(failMsg)
         }
     }
   }
@@ -268,7 +280,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
     val topicAndPartition = TopicAndPartition(topic, partition)
     // handle leader election for the partitions whose leader is no longer alive
-    info("Electing leader for partition %s".format(topicAndPartition))
+    stateChangeLogger.trace("Controller %d epoch %d started leader election for partition %s"
+                              .format(controllerId, controller.epoch, topicAndPartition))
     try {
       var zookeeperPathUpdateSucceeded: Boolean = false
       var newLeaderAndIsr: LeaderAndIsr = null
@@ -277,10 +290,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
         val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
         val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
-        if(controllerEpoch > controller.epoch)
-          throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
-            "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) +
-            "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
+        if (controllerEpoch > controller.epoch) {
+          val failMsg = ("aborted leader election for partition [%s,%d] since the LeaderAndIsr path was " +
+                         "already written by another controller. This probably means that the current controller %d went through " +
+                         "a soft failure and another controller was elected with epoch %d.")
+                           .format(topic, partition, controllerId, controllerEpoch)
+          stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
+          throw new StateChangeFailedException(failMsg)
+        }
         // elect new leader or throw exception
         val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
         val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
@@ -294,7 +311,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
       // update the leader cache
       controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
-      info("Elected leader %d for Offline partition %s".format(newLeaderAndIsr.leader, topicAndPartition))
+      stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
+                                .format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
       // store new leader and isr info in cache
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
         newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
@@ -302,8 +320,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas %s for partition %s are dead."
         .format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(","), topicAndPartition) +
         " Marking this partition offline", poe)
-      case sce => throw new StateChangeFailedException(("Error while electing leader for partition " +
-        " %s due to: %s.").format(topicAndPartition, sce.getMessage), sce)
+      case sce =>
+        val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
+        stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
+        throw new StateChangeFailedException(failMsg, sce)
     }
     debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
   }
@@ -321,8 +341,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
       case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
       case None =>
-        throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
-          "%s in %s state".format(topicAndPartition, partitionState(topicAndPartition)))
+        val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state"
+                        .format(topicAndPartition, partitionState(topicAndPartition))
+        throw new StateChangeFailedException(failMsg)
     }
   }
 
@@ -362,7 +383,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   class PartitionChangeListener(topic: String) extends IZkChildListener with Logging {
-    this.logIdent = "[Controller " + controller.config.brokerId + "], "
+    this.logIdent = "[Controller " + controller.config.brokerId + "]: "
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, children : java.util.List[String]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 20d9c4f..43d60cf 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.{TopicAndPartition, StateChangeFailedException}
 import kafka.utils.{ZkUtils, Logging}
 import org.I0Itec.zkclient.IZkChildListener
+import org.apache.log4j.Logger
 
 /**
  * This class represents the state machine for replicas. It defines the states that a replica can be in, and
@@ -37,12 +38,14 @@ import org.I0Itec.zkclient.IZkChildListener
  * 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica
  */
 class ReplicaStateMachine(controller: KafkaController) extends Logging {
-  this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: "
   private val controllerContext = controller.controllerContext
+  private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
   var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId)
   private val isShuttingDown = new AtomicBoolean(false)
+  this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
+  private val stateChangeLogger = Logger.getLogger("state.change.logger")
 
   /**
    * Invoked on successful controller election. First registers a broker change listener since that triggers all
@@ -117,17 +120,18 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put((topic, partition, replicaId), NewReplica)
-          info("Replica %d for partition %s state changed to NewReplica".format(replicaId, topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NewReplica"
+                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
         case NonExistentReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
           // send stop replica command
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
           // remove this replica from the assigned replicas list for its partition
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-          controllerContext.partitionReplicaAssignment.put(topicAndPartition,
-            currentAssignedReplicas.filterNot(_ == replicaId))
-          info("Replica %d for partition %s state changed to NonExistentReplica".format(replicaId, topicAndPartition))
+          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
           replicaState.remove((topic, partition, replicaId))
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica"
+                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
         case OnlineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
           replicaState((topic, partition, replicaId)) match {
@@ -135,7 +139,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               // add this replica to the assigned replicas list for its partition
               val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
               controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
-              info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
+              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
+                                        .format(controllerId, controller.epoch, replicaId, topicAndPartition))
             case _ =>
               // check if the leader for this partition is alive or even exists
                 controllerContext.allLeaders.get(topicAndPartition) match {
@@ -146,7 +151,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                                                                           topic, partition, leaderIsrAndControllerEpoch,
                                                                           replicaAssignment.size)
                       replicaState.put((topic, partition, replicaId), OnlineReplica)
-                      info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
+                      stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
+                                                .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                     case false => // ignore partitions whose leader is not alive
                   }
                 case None => // ignore partitions who don't have a leader yet
@@ -167,8 +173,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                         topic, partition, updatedLeaderIsrAndControllerEpoch,
                         replicaAssignment.size)
                       replicaState.put((topic, partition, replicaId), OfflineReplica)
-                      info("Replica %d for partition %s state changed to OfflineReplica".format(replicaId, topicAndPartition))
-                      info("Removed offline replica %d from ISR for partition %s".format(replicaId, topicAndPartition))
+                      stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
+                                                .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                       false
                     case None =>
                       true
@@ -184,15 +190,16 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       }
     }
     catch {
-      case t: Throwable => error("Error while changing state of replica %d for partition ".format(replicaId) +
-        "[%s, %d] to %s".format(topic, partition, targetState), t)
+      case t: Throwable =>
+        stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] to %s failed"
+                                  .format(controllerId, controller.epoch, replicaId, topic, partition, targetState), t)
     }
   }
 
   private def assertValidPreviousStates(topic: String, partition: Int, replicaId: Int, fromStates: Seq[ReplicaState],
                                         targetState: ReplicaState) {
     assert(fromStates.contains(replicaState((topic, partition, replicaId))),
-      "Replica %s for partition [%s, %d] should be in the %s states before moving to %s state"
+      "Replica %s for partition [%s,%d] should be in the %s states before moving to %s state"
         .format(replicaId, topic, partition, fromStates.mkString(","), targetState) +
         ". Instead it is in %s state".format(replicaState((topic, partition, replicaId))))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 3d92569..5f80df7 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -21,9 +21,11 @@ import java.nio.ByteBuffer
 import scala.collection.JavaConversions
 
 class TopicMetadataRequest(val versionId: Short,
-                           val correlationId: Int,
+                           override val correlationId: Int,
                            val clientId: String,
-                           val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
+                           val topics: java.util.List[String])
+    extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) {
+
   val underlying: kafka.api.TopicMetadataRequest =
     new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics))
 
@@ -36,4 +38,5 @@ class TopicMetadataRequest(val versionId: Short,
   def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
 
   def sizeInBytes: Int = underlying.sizeInBytes()
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f7fe0de..c10cbde 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit
 import kafka.common._
 import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
 import kafka.controller.KafkaController
+import org.apache.log4j.Logger
 
 
 object ReplicaManager {
@@ -42,14 +43,16 @@ class ReplicaManager(val config: KafkaConfig,
                      val logManager: LogManager) extends Logging with KafkaMetricsGroup {
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
+  private val localBrokerId = config.brokerId
   private val allPartitions = new Pool[(String, Int), Partition]
   private var leaderPartitions = new mutable.HashSet[Partition]()
   private val leaderPartitionsLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
-  this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
   private var hwThreadInitialized = false
+  this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
+  private val stateChangeLogger = Logger.getLogger("state.change.logger")
 
   newGauge(
     "LeaderCount",
@@ -102,7 +105,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
-    trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId))
+    stateChangeLogger.trace("Broker %d handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId))
     val errorCode = ErrorMapping.NoError
     getReplica(topic, partitionId) match {
       case Some(replica) =>
@@ -114,18 +117,19 @@ class ReplicaManager(val config: KafkaConfig,
           leaderPartitions -= replica.partition
         }
         allPartitions.remove((topic, partitionId))
-        info("After removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
+        info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
       case None => //do nothing if replica no longer exists
     }
-    trace("Finish handling stop replica [%s, %d]".format(topic, partitionId))
+    stateChangeLogger.trace("Broker %d finished handling stop replica [%s,%d]".format(localBrokerId, topic, partitionId))
     errorCode
   }
 
   def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = {
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
     if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
-      error("Received stop replica request from an old controller epoch %d.".format(stopReplicaRequest.controllerEpoch) +
-        " Latest known controller epoch is %d " + controllerEpoch)
+      stateChangeLogger.error("Broker %d received stop replica request from an old controller epoch %d."
+        .format(localBrokerId, stopReplicaRequest.controllerEpoch) +
+            " Latest known controller epoch is %d " + controllerEpoch)
       (responseMap, ErrorMapping.StaleControllerEpochCode)
     } else {
       controllerEpoch = stopReplicaRequest.controllerEpoch
@@ -160,7 +164,7 @@ class ReplicaManager(val config: KafkaConfig,
     if(replicaOpt.isDefined)
       return replicaOpt.get
     else
-      throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s, %d] yet".format(config.brokerId, topic, partition))
+      throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s,%d] yet".format(config.brokerId, topic, partition))
   }
 
   def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica =  {
@@ -187,13 +191,19 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
-    info("Handling leader and isr request %s".format(leaderAndISRRequest))
+    leaderAndISRRequest.partitionStateInfos.foreach(p =>
+      stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]"
+                                .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
+                                        leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2)))
+    info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest))
+
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
     if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-      error("Received leader and isr request from an old controller epoch %d.".format(leaderAndISRRequest.controllerEpoch) +
-        " Latest known controller epoch is %d " + controllerEpoch)
+      stateChangeLogger.error("Broker %d received LeaderAndIsr request correlationId %d with an old controllerEpoch %d, latest known controllerEpoch is %d"
+                                .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch))
       (responseMap, ErrorMapping.StaleControllerEpochCode)
     }else {
+      val controllerId = leaderAndISRRequest.controllerId
       controllerEpoch = leaderAndISRRequest.controllerEpoch
       for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) {
         var errorCode = ErrorMapping.NoError
@@ -203,17 +213,25 @@ class ReplicaManager(val config: KafkaConfig,
         val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
         try {
           if(requestedLeaderId == config.brokerId)
-            makeLeader(topic, partitionId, partitionStateInfo)
+            makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId)
           else
-            makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
+            makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
+                         leaderAndISRRequest.correlationId)
         } catch {
           case e =>
-            error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
+            val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " +
+                            "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
+                                                                leaderAndISRRequest.controllerEpoch, topicAndPartition)
+            stateChangeLogger.error(errorMsg, e)
             errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
         }
         responseMap.put(topicAndPartition, errorCode)
+        leaderAndISRRequest.partitionStateInfos.foreach(p =>
+          stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]"
+                                    .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
+                                            leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2)))
       }
-      info("Completed leader and isr request %s".format(leaderAndISRRequest))
+      info("Handled leader and isr request %s".format(leaderAndISRRequest))
       // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
       // have been completely populated before starting the checkpointing there by avoiding weird race conditions
       if (!hwThreadInitialized) {
@@ -225,33 +243,38 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
+  private def makeLeader(controllerId: Int, epoch:Int, topic: String, partitionId: Int,
+                         partitionStateInfo: PartitionStateInfo, correlationId: Int) = {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
-    info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
+    stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
+                             "starting the become-leader transition for partition [%s,%d]")
+                               .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeLeader(topic, partitionId, leaderIsrAndControllerEpoch)) {
+    if (partition.makeLeader(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, correlationId)) {
       // also add this partition to the list of partitions for which the leader is the current broker
       leaderPartitionsLock synchronized {
         leaderPartitions += partition
       } 
     }
-    info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
+    stateChangeLogger.trace("Broker %d completed become-leader transition for partition [%s,%d]".format(localBrokerId, topic, partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo,
-                           liveBrokers: Set[Broker]) {
+  private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int,
+                           partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker], correlationId: Int) {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
     val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader
-    info("Starting the follower state transition to follow leader %d for topic %s partition %d"
-                 .format(leaderBrokerId, topic, partitionId))
+    stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
+                             "starting the become-follower transition for partition [%s,%d]")
+                               .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeFollower(topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers)) {
+    if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers, correlationId)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition
       }
     }
+    stateChangeLogger.trace("Broker %d completed the become-follower transition for partition [%s,%d]".format(localBrokerId, topic, partitionId))
   }
 
   private def maybeShrinkIsr(): Unit = {
@@ -266,7 +289,7 @@ class ReplicaManager(val config: KafkaConfig,
     if(partitionOpt.isDefined) {
       partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
     } else {
-      warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId))
+      warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
new file mode 100644
index 0000000..97970fb
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple._
+import scala.util.matching.Regex
+import collection.mutable
+import java.util.Date
+import java.text.SimpleDateFormat
+import kafka.utils.Logging
+import kafka.common.Topic
+import java.io.{BufferedOutputStream, OutputStream}
+
+/**
+ * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days).
+ *
+ * This utility expects at least one of the following two arguments -
+ * 1. A list of state change log files
+ * 2. A regex to specify state change log file names.
+ *
+ * This utility optionally also accepts the following arguments -
+ * 1. The topic whose state change logs should be merged
+ * 2. A list of partitions whose state change logs should be merged (can be specified only when the topic argument
+ * is explicitly specified)
+ * 3. Start time from when the logs should be merged
+ * 4. End time until when the logs should be merged
+ */
+
+object StateChangeLogMerger extends Logging {
+
+  val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS"
+  val topicPartitionRegex = new Regex("\\[(" + Topic.legalChars + "+),( )*([0-9]+)\\]")
+  val dateRegex = new Regex("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}")
+  val dateFormat = new SimpleDateFormat(dateFormatString)
+  var files: List[String] = List()
+  var topic: String = null
+  var partitions: List[Int] = List()
+  var startDate: Date = null
+  var endDate: Date = null
+
+  def main(args: Array[String]) {
+
+    // Parse input arguments.
+    val parser = new OptionParser
+    val filesOpt = parser.accepts("logs", "Comma separated list of state change logs or a regex for the log file names")
+                              .withRequiredArg
+                              .describedAs("file1,file2,...")
+                              .ofType(classOf[String])
+    val regexOpt = parser.accepts("logs-regex", "Regex to match the state change log files to be merged")
+                              .withRequiredArg
+                              .describedAs("for example: /tmp/state-change.log*")
+                              .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "The topic whose state change logs should be merged")
+                              .withRequiredArg
+                              .describedAs("topic")
+                              .ofType(classOf[String])
+    val partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids whose state change logs should be merged")
+                              .withRequiredArg
+                              .describedAs("0,1,2,...")
+                              .ofType(classOf[String])
+    val startTimeOpt = parser.accepts("start-time", "The earliest timestamp of state change log entries to be merged")
+                              .withRequiredArg
+                              .describedAs("start timestamp in the format " + dateFormat)
+                              .ofType(classOf[String])
+                              .defaultsTo("0000-00-00 00:00:00,000")
+    val endTimeOpt = parser.accepts("end-time", "The latest timestamp of state change log entries to be merged")
+                              .withRequiredArg
+                              .describedAs("end timestamp in the format " + dateFormat)
+                              .ofType(classOf[String])
+                              .defaultsTo("9999-12-31 23:59:59,999")
+
+
+    val options = parser.parse(args : _*)
+    if ((!options.has(filesOpt) && !options.has(regexOpt)) || (options.has(filesOpt) && options.has(regexOpt))) {
+      System.err.println("Provide arguments to exactly one of the two options \"" + filesOpt + "\" or \"" + regexOpt + "\"")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+    if (options.has(partitionsOpt) && !options.has(topicOpt)) {
+      System.err.println("The option \"" + topicOpt + "\" needs to be provided an argument when specifying partition ids")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+
+    // Populate data structures.
+    if (options.has(filesOpt)) {
+      files :::= options.valueOf(filesOpt).split(",").toList
+    } else if (options.has(regexOpt)) {
+      val regex = options.valueOf(regexOpt)
+      val fileNameIndex = regex.lastIndexOf('/') + 1
+      val dirName = if (fileNameIndex == 0) "." else regex.substring(0, fileNameIndex - 1)
+      val fileNameRegex = new Regex(regex.substring(fileNameIndex))
+      files :::= new java.io.File(dirName).listFiles.filter(f => fileNameRegex.findFirstIn(f.getName) != None).map(dirName + "/" + _.getName).toList
+    }
+    if (options.has(topicOpt)) {
+      topic = options.valueOf(topicOpt)
+    }
+    if (options.has(partitionsOpt)) {
+      partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt)
+    }
+    startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim)
+    endDate = dateFormat.parse(options.valueOf(endTimeOpt).replace('\"', ' ').trim)
+
+    /**
+     * n-way merge from m input files:
+     * 1. Read a line that matches the specified topic/partitions and date range from every input file in a priority queue.
+     * 2. Take the line from the file with the earliest date and add it to a buffered output stream.
+     * 3. Add another line from the file selected in step 2 in the priority queue.
+     * 4. Flush the output buffer at the end. (The buffer will also be automatically flushed every K bytes.)
+     */
+    val pqueue = new mutable.PriorityQueue[LineIterator]()(dateBasedOrdering)
+    val output: OutputStream = new BufferedOutputStream(System.out, 1024*1024)
+    val lineIterators = files.map(io.Source.fromFile(_).getLines)
+    var lines: List[LineIterator] = List()
+
+    for (itr <- lineIterators) {
+      val lineItr = getNextLine(itr)
+      if (!lineItr.isEmpty)
+        lines ::= lineItr
+    }
+    if (!lines.isEmpty) pqueue.enqueue(lines:_*)
+
+    while (!pqueue.isEmpty) {
+      val lineItr = pqueue.dequeue()
+      output.write((lineItr.line + "\n").getBytes)
+      val nextLineItr = getNextLine(lineItr.itr)
+      if (!nextLineItr.isEmpty)
+        pqueue.enqueue(nextLineItr)
+    }
+
+    output.flush()
+  }
+
+  /**
+   * Returns the next line that matches the specified topic/partitions from the file that has the earliest date
+   * from the specified date range.
+   * @param itr Line iterator of a file
+   * @return (line from a file, line iterator for the same file)
+   */
+  def getNextLine(itr: Iterator[String]): LineIterator = {
+    while (itr != null && itr.hasNext) {
+      val nextLine = itr.next
+      dateRegex.findFirstIn(nextLine) match {
+        case Some(d) =>
+          val date = dateFormat.parse(d)
+          if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) {
+            topicPartitionRegex.findFirstMatchIn(nextLine) match {
+              case Some(matcher) =>
+                if ((topic == null || topic == matcher.group(1)) && (partitions.isEmpty || partitions.contains(matcher.group(3).toInt)))
+                  return new LineIterator(nextLine, itr)
+              case None =>
+            }
+          }
+        case None =>
+      }
+    }
+    new LineIterator()
+  }
+
+  class LineIterator(val line: String, val itr: Iterator[String]) {
+    def this() = this("", null)
+    def isEmpty = (line == "" && itr == null)
+  }
+
+  implicit object dateBasedOrdering extends Ordering[LineIterator] {
+    def compare(first: LineIterator, second: LineIterator) = {
+      val firstDate = dateRegex.findFirstIn(first.line).get
+      val secondDate = dateRegex.findFirstIn(second.line).get
+      secondDate.compareTo(firstDate)
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index d0c7b90..4c209f1 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -89,7 +89,7 @@ object SerializationTestUtils{
     val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1, 0)
+    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0)
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2457bc49/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 129bc56..ec1db2d 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -23,7 +23,7 @@ import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, Utils, TestUtils}
-import kafka.controller.{LeaderIsrAndControllerEpoch, ControllerChannelManager}
+import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, ControllerChannelManager}
 import kafka.cluster.Broker
 import kafka.common.ErrorMapping
 import kafka.api._
@@ -120,18 +120,20 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
     assertEquals("First epoch value should be 0", 0, leaderEpoch1)
 
-
     // start another controller
-    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
+    val controllerId = 2
+    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
-    val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
+    val controllerContext = new ControllerContext(zkClient)
+    controllerContext.liveBrokers = brokers.toSet
+    val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
     controllerChannelManager.startup()
     val staleControllerEpoch = 0
     val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch]
     leaderAndIsr.put((topic, partitionId),
       new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
     val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
-    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch, 0)
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0)
 
     controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
     TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)


Mime
View raw message