kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-649; Cleanup log4j logging; patched by Jun Rao; reviewed by Jay Kreps
Date Tue, 09 Apr 2013 22:37:57 GMT
Updated Branches:
  refs/heads/0.8 ef123c20b -> ae362b086


kafka-649; Cleanup log4j logging; patched by Jun Rao; reviewed by Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ae362b08
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ae362b08
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ae362b08

Branch: refs/heads/0.8
Commit: ae362b0864cab835088fc1cd2faf8f646a502af6
Parents: ef123c2
Author: Jun Rao <junrao@gmail.com>
Authored: Tue Apr 9 15:37:33 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Apr 9 15:37:33 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Partition.scala  |    5 +++--
 .../scala/kafka/consumer/ConsumerIterator.scala    |    2 +-
 .../kafka/controller/PartitionLeaderSelector.scala |   10 +++++-----
 core/src/main/scala/kafka/log/FileMessageSet.scala |    2 +-
 core/src/main/scala/kafka/log/Log.scala            |    2 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala    |    2 +-
 .../kafka/producer/async/DefaultEventHandler.scala |    2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   12 +++++++-----
 .../main/scala/kafka/server/ReplicaManager.scala   |    6 +++---
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    8 ++++----
 10 files changed, 27 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ae362b08/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index d5f5a4e..6146c6e 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -220,7 +220,8 @@ class Partition(val topic: String,
           if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW)
{
             // expand ISR
             val newInSyncReplicas = inSyncReplicas + replica
-            info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId,
newInSyncReplicas.map(_.brokerId).mkString(", ")))
+            info("Expanding ISR for topic %s partition %d from %s to %s"
+                 .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in ZK and cache
             updateIsr(newInSyncReplicas)
             replicaManager.isrExpandRate.mark()
@@ -315,7 +316,7 @@ class Partition(val topic: String,
   }
 
   private def updateIsr(newIsr: Set[Replica]) {
-    info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(",
")))
+    debug("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(",")))
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r =>
r.brokerId).toList, zkVersion)
     // use the epoch of the controller that made the leadership decision, instead of the
current controller epoch
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae362b08/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index a504534..b80c0b0 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -109,7 +109,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
 
   def clearCurrentChunk() {
     try {
-      info("Clearing the current data chunk for this consumer iterator")
+      debug("Clearing the current data chunk for this consumer iterator")
       current.set(null)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae362b08/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 7a06c24..21b0e24 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -53,8 +53,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext)
exten
         val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
         val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
           case true =>
-            debug("No broker is ISR is alive, picking the leader from the alive assigned
replicas: %s"
-              .format(liveAssignedReplicasToThisPartition.mkString(",")))
+            debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned
replicas: %s"
+              .format(topicAndPartition, liveAssignedReplicasToThisPartition.mkString(",")))
             liveAssignedReplicasToThisPartition.isEmpty match {
               case true =>
                 throw new NoReplicaOnlineException(("No replica for partition " +
@@ -63,13 +63,13 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext)
exten
               case false =>
                 ControllerStats.uncleanLeaderElectionRate.mark()
                 val newLeader = liveAssignedReplicasToThisPartition.head
-                warn("No broker in ISR is alive, elected leader from the alive replicas is
[%s], ".format(newLeader) +
-                  "There's potential data loss")
+                warn("No broker in ISR is alive for %s. Elect leader from broker %s. There's
potential data loss."
+                     .format(topicAndPartition, newLeader))
                 new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion
+ 1)
             }
           case false =>
             val newLeader = liveBrokersInIsr.head
-            debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
+            debug("Some broker in ISR is alive for %s. Select %d from ISR to be the leader.".format(topicAndPartition,
newLeader))
             new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList,
currentLeaderIsrZkPathVersion + 1)
         }
         info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(),
topicAndPartition))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae362b08/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index ce27a19..c4397b7 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -44,7 +44,7 @@ class FileMessageSet private[kafka](val file: File,
   private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
 
   if (initChannelPositionToEnd) {
-    info("Creating or reloading log segment %s".format(file.getAbsolutePath))
+    debug("Creating or reloading log segment %s".format(file.getAbsolutePath))
     /* set the file position to the last byte in the file */
     channel.position(channel.size)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae362b08/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 7d71451..6ac6545 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -127,7 +127,7 @@ private[kafka] class Log(val dir: File,
   /* Calculate the offset of the next message */
   private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
   
-  debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))
+  info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
   newGauge(name + "-" + "NumLogSegments",
            new Gauge[Int] { def getValue = numberOfSegments })

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae362b08/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index e806da9..60ebc52 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -90,7 +90,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
   /* the last offset in the index */
   var lastOffset = readLastOffset()
   
-  info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset
= %d, file position = %d"
+  debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset
= %d, file position = %d"
     .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
 
   /* the maximum number of entries this index can hold */

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae362b08/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 27b16e3..89cb27d 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -135,7 +135,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           } else {
             // currently, if in async mode, we just log the serialization error. We need
to revisit
             // this when doing kafka-496
-            error("Error serializing message ", t)
+            error("Error serializing message for topic %s".format(e.topic), t)
           }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae362b08/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 87ca6b0..7ee81a1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -205,10 +205,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           Runtime.getRuntime.halt(1)
           null
         case utpe: UnknownTopicOrPartitionException =>
-          warn(utpe.getMessage)
+          warn("Produce request: " + utpe.getMessage)
           new ProduceResult(topicAndPartition, utpe)
         case nle: NotLeaderForPartitionException =>
-          warn(nle.getMessage)
+          warn("Produce request: " + nle.getMessage)
           new ProduceResult(topicAndPartition, nle)
         case e =>
           BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
@@ -291,15 +291,17 @@ class KafkaApis(val requestChannel: RequestChannel,
             // since failed fetch requests metric is supposed to indicate failure of a broker
in handling a fetch request
             // for a partition it is the leader for
             case utpe: UnknownTopicOrPartitionException =>
-              warn(utpe.getMessage)
+              warn("Fetch request: " + utpe.getMessage)
               new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
             case nle: NotLeaderForPartitionException =>
-              warn(nle.getMessage)
+              warn("Fetch request: " + nle.getMessage)
               new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
             case t =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
-              error("error when processing request " + (topic, partition, offset, fetchSize),
t)
+              error("Error when processing fetch request for topic %s partition %d offset
%d from %s with correlation id %d"
+                    .format(topic, partition, offset, if (isFetchFromFollower) "follower"
else "consumer", fetchRequest.correlationId),
+                    t)
               new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
           }
         (TopicAndPartition(topic, partition), partitionData)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae362b08/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4a41bde..8b0f797 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -131,7 +131,7 @@ class ReplicaManager(val config: KafkaConfig,
   def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short],
Short) = {
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
     if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
-      stateChangeLogger.error("Broker %d received stop replica request from an old controller
epoch %d."
+      stateChangeLogger.warn("Broker %d received stop replica request from an old controller
epoch %d."
         .format(localBrokerId, stopReplicaRequest.controllerEpoch) +
             " Latest known controller epoch is %d " + controllerEpoch)
       (responseMap, ErrorMapping.StaleControllerEpochCode)
@@ -196,14 +196,14 @@ class ReplicaManager(val config: KafkaConfig,
 
   def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String,
Int), Short], Short) = {
     leaderAndISRRequest.partitionStateInfos.foreach(p =>
-      stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlationId %d received
from controller %d epoch %d for partition [%s,%d]"
+      stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d
received from controller %d epoch %d for partition [%s,%d]"
                                 .format(localBrokerId, leaderAndISRRequest.correlationId,
leaderAndISRRequest.controllerId,
                                         leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2)))
     info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest))
 
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
     if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-      stateChangeLogger.error("Broker %d received LeaderAndIsr request correlationId %d with
an old controllerEpoch %d, latest known controllerEpoch is %d"
+      stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d with
an old controller epoch %d. Latest known controller epoch is %d"
                                 .format(localBrokerId, leaderAndISRRequest.controllerEpoch,
leaderAndISRRequest.correlationId, controllerEpoch))
       (responseMap, ErrorMapping.StaleControllerEpochCode)
     }else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae362b08/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 5673ae2..bd93ff1 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -328,12 +328,12 @@ object ZkUtils extends Logging {
   def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion:
Int): (Boolean, Int) = {
     try {
       val stat = client.writeData(path, data, expectVersion)
-      info("Conditional update of zkPath %s with value %s and expected version %d succeeded,
returning the new version: %d"
+      debug("Conditional update of path %s with value %s and expected version %d succeeded,
returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
     } catch {
       case e: Exception =>
-        error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path,
data,
+        error("Conditional update of path %s with data %s and expected version %d failed".format(path,
data,
           expectVersion), e)
         (false, -1)
     }
@@ -346,13 +346,13 @@ object ZkUtils extends Logging {
   def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String,
expectVersion: Int): (Boolean, Int) = {
     try {
       val stat = client.writeData(path, data, expectVersion)
-      info("Conditional update of zkPath %s with value %s and expected version %d succeeded,
returning the new version: %d"
+      debug("Conditional update of path %s with value %s and expected version %d succeeded,
returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
     } catch {
       case nne: ZkNoNodeException => throw nne
       case e: Exception =>
-        error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path,
data,
+        error("Conditional update of path %s with data %s and expected version %d failed".format(path,
data,
           expectVersion), e)
         (false, -1)
     }


Mime
View raw message