kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject [1/2] git commit: change to standardize on [%s,%d] for partition
Date Tue, 30 Apr 2013 00:04:52 GMT
Updated Branches:
  refs/heads/0.8 e37a46442 -> 103aef8ca


change to standardize on [%s,%d] for partition


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a61e7381
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a61e7381
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a61e7381

Branch: refs/heads/0.8
Commit: a61e7381382ada885debef15379675fea6974ec1
Parents: e37a464
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Mon Apr 29 11:09:14 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Mon Apr 29 11:09:19 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminUtils.scala   |    2 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |   21 ++++++++------
 core/src/main/scala/kafka/cluster/Replica.scala    |   10 +++---
 .../kafka/consumer/ConsumerFetcherThread.scala     |    2 +-
 core/src/main/scala/kafka/log/LogManager.scala     |    2 +-
 .../scala/kafka/producer/BrokerPartitionInfo.scala |    4 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |    6 ++--
 .../kafka/server/HighwaterMarkCheckpoint.scala     |    5 +--
 core/src/main/scala/kafka/server/KafkaApis.scala   |    9 +++---
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    2 +-
 .../kafka/tools/VerifyConsumerRebalance.scala      |   10 +++---
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    8 +++---
 .../consumer/ZookeeperConsumerConnectorTest.scala  |    4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   12 ++++----
 14 files changed, 50 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 63f5bc8..c7652ad 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -117,7 +117,7 @@ object AdminUtils extends Logging {
               try {
                 Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
               } catch {
-                case e => throw new LeaderNotAvailableException("Leader not available
for topic %s partition %d".format(topic, partition), e)
+                case e => throw new LeaderNotAvailableException("Leader not available
for partition [%s,%d]".format(topic, partition), e)
               }
             case None => throw new LeaderNotAvailableException("No leader exists for partition
" + partition)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 9a29fb2..02d2c44 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -221,7 +221,7 @@ class Partition(val topic: String,
           if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW)
{
             // expand ISR
             val newInSyncReplicas = inSyncReplicas + replica
-            info("Expanding ISR for topic %s partition %d from %s to %s"
+            info("Expanding ISR for partition [%s,%d] from %s to %s"
                  .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in ZK and cache
             updateIsr(newInSyncReplicas)
@@ -270,10 +270,10 @@ class Partition(val topic: String,
     val oldHighWatermark = leaderReplica.highWatermark
     if(newHighWatermark > oldHighWatermark) {
       leaderReplica.highWatermark = newHighWatermark
-      debug("Highwatermark for topic %s partition %d updated to %d".format(topic, partitionId,
newHighWatermark))
+      debug("Highwatermark for partition [%s,%d] updated to %d".format(topic, partitionId,
newHighWatermark))
     }
     else
-      debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s"
+      debug("Old hw for partition [%s,%d] is %d. New hw is %d. All leo's are %s"
         .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
   }
 
@@ -285,7 +285,7 @@ class Partition(val topic: String,
           if(outOfSyncReplicas.size > 0) {
             val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
             assert(newInSyncReplicas.size > 0)
-            info("Shrinking ISR for topic %s partition %d from %s to %s".format(topic, partitionId,
+            info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
               inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in zk and in cache
             updateIsr(newInSyncReplicas)
@@ -310,13 +310,16 @@ class Partition(val topic: String,
     val candidateReplicas = inSyncReplicas - leaderReplica
     // Case 1 above
     val possiblyStuckReplicas = candidateReplicas.filter(r => r.logEndOffset < leaderLogEndOffset)
-    debug("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId,
-      possiblyStuckReplicas.map(_.brokerId).mkString(",")))
+    if(possiblyStuckReplicas.size > 0)
+      debug("Possibly stuck replicas for partition [%s,%d] are %s".format(topic, partitionId,
+        possiblyStuckReplicas.map(_.brokerId).mkString(",")))
     val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs <
(time.milliseconds - keepInSyncTimeMs))
-    debug("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
+    if(stuckReplicas.size > 0)
+      debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
     // Case 2 above
     val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 &&
(leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages)
-    debug("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
+    if(slowReplicas.size > 0)
+      debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
     stuckReplicas ++ slowReplicas
   }
 
@@ -338,7 +341,7 @@ class Partition(val topic: String,
   }
 
   private def updateIsr(newIsr: Set[Replica]) {
-    debug("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(",")))
+    debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(",")))
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r =>
r.brokerId).toList, zkVersion)
     // use the epoch of the controller that made the leadership decision, instead of the
current controller epoch
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 321ab58..5e659b4 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -40,10 +40,10 @@ class Replica(val brokerId: Int,
     if (!isLocal) {
       logEndOffsetValue.set(newLogEndOffset)
       logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
-      trace("Setting log end offset for replica %d for topic %s partition %d to %d"
+      trace("Setting log end offset for replica %d for partition [%s,%d] to %d"
             .format(brokerId, topic, partitionId, logEndOffsetValue.get()))
     } else
-      throw new KafkaException("Shouldn't set logEndOffset for replica %d topic %s partition
%d since it's local"
+      throw new KafkaException("Shouldn't set logEndOffset for replica %d partition [%s,%d]
since it's local"
           .format(brokerId, topic, partitionId))
 
   }
@@ -66,11 +66,11 @@ class Replica(val brokerId: Int,
 
   def highWatermark_=(newHighWatermark: Long) {
     if (isLocal) {
-      trace("Setting hw for replica %d topic %s partition %d on broker %d to %d"
+      trace("Setting hw for replica %d partition [%s,%d] on broker %d to %d"
               .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
       highWatermarkValue.set(newHighWatermark)
     } else
-      throw new KafkaException("Unable to set highwatermark for replica %d topic %s partition
%d since it's not local"
+      throw new KafkaException("Unable to set highwatermark for replica %d partition [%s,%d]
since it's not local"
               .format(brokerId, topic, partitionId))
   }
 
@@ -78,7 +78,7 @@ class Replica(val brokerId: Int,
     if (isLocal)
       highWatermarkValue.get()
     else
-      throw new KafkaException("Unable to get highwatermark for replica %d topic %s partition
%d since it's not local"
+      throw new KafkaException("Unable to get highwatermark for replica %d partition [%s,%d]
since it's not local"
               .format(brokerId, topic, partitionId))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 80df1b5..5f9c902 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -45,7 +45,7 @@ class ConsumerFetcherThread(name: String,
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData:
FetchResponsePartitionData) {
     val pti = partitionMap(topicAndPartition)
     if (pti.getFetchOffset != fetchOffset)
-      throw new RuntimeException("Offset doesn't match for topic %s partition: %d pti offset:
%d fetch offset: %d"
+      throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset:
%d fetch offset: %d"
                                 .format(topicAndPartition.topic, topicAndPartition.partition,
pti.getFetchOffset, fetchOffset))
     pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 497cfdd..4771d11 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -197,7 +197,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
                     config.logIndexIntervalBytes, 
                     time, 
                     config.brokerId)
-      info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic,
topicAndPartition.partition, dataDir.getAbsolutePath))
+      info("Created log for partition [%s,%d] in %s.".format(topicAndPartition.topic, topicAndPartition.partition,
dataDir.getAbsolutePath))
       logs.put(topicAndPartition, log)
       log
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 72597ef..82e6e4d 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -57,10 +57,10 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
     partitionMetadata.map { m =>
       m.leader match {
         case Some(leader) =>
-          debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id))
+          debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id))
           new PartitionAndLeader(topic, m.partitionId, Some(leader.id))
         case None =>
-          debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId))
+          debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId))
           new PartitionAndLeader(topic, m.partitionId, None)
       }
     }.sortWith((s, t) => s.partitionId < t.partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2ac7a17..162c749 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -144,15 +144,15 @@ abstract class AbstractFetcherThread(name: String, clientId: String,
sourceBroke
                   try {
                     val newOffset = handleOffsetOutOfRange(topicAndPartition)
                     partitionMap.put(topicAndPartition, newOffset)
-                    warn("current offset %d for topic %s partition %d out of range; reset
offset to %d"
+                    warn("current offset %d for partition [%s,%d] out of range; reset offset
to %d"
                       .format(currentOffset.get, topic, partitionId, newOffset))
                   } catch {
                     case e =>
-                      warn("error getting offset for %s %d to broker %d".format(topic, partitionId,
sourceBroker.id), e)
+                      warn("error getting offset for partition [%s,%d] to broker %d".format(topic,
partitionId, sourceBroker.id), e)
                       partitionsWithError += topicAndPartition
                   }
                 case _ =>
-                  warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
+                  warn("error for partition [%s,%d] to broker %d".format(topic, partitionId,
sourceBroker.id),
                     ErrorMapping.exceptionFor(partitionData.error))
                   partitionsWithError += topicAndPartition
               }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
index 5aa0141..30caec1 100644
--- a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
@@ -77,8 +77,7 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging {
     try {
       hwFile.length() match {
         case 0 => 
-          warn("No highwatermark file is found. Returning 0 as the highwatermark for topic
%s partition %d."
-            .format(topic, partition))
+          warn("No highwatermark file is found. Returning 0 as the highwatermark for partition
[%s,%d]".format(topic, partition))
           0L
         case _ =>
           val hwFileReader = new BufferedReader(new FileReader(hwFile))
@@ -99,7 +98,7 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging {
               val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition))
               hwOpt match {
                 case Some(hw) => 
-                  debug("Read hw %d for topic %s partition %d from highwatermark checkpoint
file".format(hw, topic, partition))
+                  debug("Read hw %d for partition [%s,%d] from highwatermark checkpoint file".format(hw,
topic, partition))
                   hw
                 case None => 
                   warn("No previously checkpointed highwatermark value found for topic %s
".format(topic) +

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6b6f8f2..bb178d6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -252,7 +252,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       val response = new FetchResponse(fetchRequest.correlationId, dataRead)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
-      debug("Putting fetch request into purgatory")
+      debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId,
+        fetchRequest.clientId))
       // create a list of (topic, partition) pairs to use as keys for this delayed request
       val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_))
       val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait,
bytesReadable)
@@ -285,7 +286,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             if (!isFetchFromFollower) {
               new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             } else {
-              debug("Leader %d for topic %s partition %d received fetch request from follower
%d"
+              debug("Leader %d for partition [%s,%d] received fetch request from follower
%d"
                             .format(brokerId, topic, partition, fetchRequest.replicaId))
               new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             }
@@ -302,7 +303,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             case t =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
-              error("Error when processing fetch request for topic %s partition %d offset
%d from %s with correlation id %d"
+              error("Error when processing fetch request for partition [%s,%d] offset %d
from %s with correlation id %d"
                     .format(topic, partition, offset, if (isFetchFromFollower) "follower"
else "consumer", fetchRequest.correlationId),
                     t)
               new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
@@ -334,7 +335,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       case Some(log) =>
         log.read(offset, maxSize, maxOffsetOpt)
       case None =>
-        error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic,
partition, brokerId))
+        error("Leader for partition [%s,%d] on broker %d does not have a local log".format(topic,
partition, brokerId))
         MessageSet.Empty
     }
     (messages, localReplica.highWatermark)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index b733fa3..74073d0 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -54,7 +54,7 @@ class ReplicaFetcherThread(name:String,
             .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes))
       val followerHighWatermark = replica.logEndOffset.min(partitionData.hw)
       replica.highWatermark = followerHighWatermark
-      trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
+      trace("Follower %d set replica highwatermark for partition [%s,%d] to %d"
             .format(replica.brokerId, topic, partitionId, followerHighWatermark))
     } catch {
       case e: KafkaStorageException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index dc6d066..eac9af2 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -99,7 +99,7 @@ object VerifyConsumerRebalance extends Logging {
       partitions.foreach { partition =>
       // check if there is a node for [partition]
         if(!partitionsWithOwners.exists(p => p.equals(partition))) {
-          error("No owner for topic %s partition %s".format(topic, partition))
+          error("No owner for partition [%s,%d]".format(topic, partition))
           rebalanceSucceeded = false
         }
         // try reading the partition owner path for see if a valid consumer id exists there
@@ -109,7 +109,7 @@ object VerifyConsumerRebalance extends Logging {
           case None => null
         }
         if(partitionOwner == null) {
-          error("No owner for topic %s partition %s".format(topic, partition))
+          error("No owner for partition [%s,%d]".format(topic, partition))
           rebalanceSucceeded = false
         }
         else {
@@ -117,12 +117,12 @@ object VerifyConsumerRebalance extends Logging {
           consumerIdsForTopic match {
             case Some(consumerIds) =>
               if(!consumerIds.contains(partitionOwner)) {
-                error("Owner %s for topic %s partition %s is not a valid member of consumer
" +
-                  "group %s".format(partitionOwner, topic, partition, group))
+                error(("Owner %s for partition [%s,%d] is not a valid member of consumer
" +
+                  "group %s").format(partitionOwner, topic, partition, group))
                 rebalanceSucceeded = false
               }
               else
-                info("Owner of topic %s partition %s is %s".format(topic, partition, partitionOwner))
+                info("Owner of partition [%s,%d] is %s".format(topic, partition, partitionOwner))
             case None => {
               error("No consumer ids registered for topic " + topic)
               rebalanceSucceeded = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 7971a09..4f6fcd4 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -101,7 +101,7 @@ object ZkUtils extends Logging {
         val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
         val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
         val zkPathVersion = stat.getVersion
-        debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition
%d".format(leader, epoch,
+        debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for partition [%s,%d]".format(leader,
epoch,
           isr.toString(), zkPathVersion, topic, partition))
         Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion),
controllerEpoch))
       case None => None
@@ -131,10 +131,10 @@ object ZkUtils extends Logging {
     leaderAndIsrOpt match {
       case Some(leaderAndIsr) =>
         Json.parseFull(leaderAndIsr) match {
-          case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR
data for topic %s partition %d is invalid".format(topic, partition))
+          case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR
data for partition [%s,%d] is invalid".format(topic, partition))
           case Some(m) => m.asInstanceOf[Map[String, Any]].get("leader_epoch").get.asInstanceOf[Int]
         }
-      case None => throw new NoEpochForPartitionException("No epoch, ISR path for topic
%s partition %d is empty"
+      case None => throw new NoEpochForPartitionException("No epoch, ISR path for partition
[%s,%d] is empty"
         .format(topic, partition))
     }
   }
@@ -177,7 +177,7 @@ object ZkUtils extends Logging {
 
   def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int):
Boolean = {
     val replicas = getReplicasForPartition(zkClient, topic, partition)
-    debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition,
replicas))
+    debug("The list of replicas for partition [%s,%d] is %s".format(topic, partition, replicas))
     replicas.contains(brokerId.toString)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 86d30ad..4d989e4 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -338,7 +338,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props))
     val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition
+ "-" + x)
     producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
-    debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId,
topic, partition))
+    debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId,
topic, partition))
     producer.close()
     ms.toList
   }
@@ -359,7 +359,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
       val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition
+ "-" + x)
       producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
       messages ++= ms
-      debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size,
config.brokerId, topic, partition))
+      debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId,
topic, partition))
     }
     producer.close()
     messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 68c134e..f9c9e64 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -408,7 +408,7 @@ object TestUtils extends Logging {
           ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic,
partition),
             ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
         } catch {
-          case oe => error("Error while electing leader for topic %s partition %d".format(topic,
partition), oe)
+          case oe => error("Error while electing leader for partition [%s,%d]".format(topic,
partition), oe)
         }
       }
     }
@@ -419,9 +419,9 @@ object TestUtils extends Logging {
     val leaderExistsOrChanged = leaderLock.newCondition()
 
     if(oldLeaderOpt == None)
-      info("Waiting for leader to be elected for topic %s partition %d".format(topic, partition))
+      info("Waiting for leader to be elected for partition [%s,%d]".format(topic, partition))
     else
-      info("Waiting for leader for topic %s partition %d to be changed from old leader %d".format(topic,
partition, oldLeaderOpt.get))
+      info("Waiting for leader for partition [%s,%d] to be changed from old leader %d".format(topic,
partition, oldLeaderOpt.get))
 
     leaderLock.lock()
     try {
@@ -432,10 +432,10 @@ object TestUtils extends Logging {
       leader match {
         case Some(l) =>
           if(oldLeaderOpt == None)
-            info("Leader %d is elected for topic %s partition %d".format(l, topic, partition))
+            info("Leader %d is elected for partition [%s,%d]".format(l, topic, partition))
           else
-            info("Leader for topic %s partition %d is changed from %d to %d".format(topic,
partition, oldLeaderOpt.get, l))
-        case None => error("Timing out after %d ms since leader is not elected for topic
%s partition %d"
+            info("Leader for partition [%s,%d] is changed from %d to %d".format(topic, partition,
oldLeaderOpt.get, l))
+        case None => error("Timing out after %d ms since leader is not elected for partition
[%s,%d]"
                                    .format(timeoutMs, topic, partition))
       }
       leader


Mime
View raw message