kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-901: Follow up minor changes
Date Fri, 17 May 2013 23:59:47 GMT
Updated Branches:
  refs/heads/0.8 cfdc403e1 -> eff59330f


KAFKA-901: Follow up minor changes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eff59330
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eff59330
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eff59330

Branch: refs/heads/0.8
Commit: eff59330f2ce19e7074d74f4d268774dd260bca7
Parents: cfdc403
Author: Neha Narkhede <nehanarkhede@apache.org>
Authored: Fri May 17 16:59:40 2013 -0700
Committer: Neha Narkhede <nehanarkhede@apache.org>
Committed: Fri May 17 16:59:40 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |   50 ++++++---------
 .../unit/kafka/producer/AsyncProducerTest.scala    |    4 +-
 2 files changed, 23 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eff59330/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7642179..0c5c4d5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -435,37 +435,29 @@ class KafkaApis(val requestChannel: RequestChannel,
             val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState)
=>
               val replicas = leaderCache(topicAndPartition).allReplicas
               var replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_
!= null).toSeq
-              val partitionStateOpt = leaderCache.get(topicAndPartition)
               var leaderInfo: Option[Broker] = None
               var isrInfo: Seq[Broker] = Nil
-              partitionStateOpt match {
-                case Some(partitionState) =>
-                  val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
-                  val leader = leaderIsrAndEpoch.leaderAndIsr.leader
-                  val isr = leaderIsrAndEpoch.leaderAndIsr.isr
-                  debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in
sync replicas = " + isr + ", leader = " + leader)
-                  try {
-                    if(aliveBrokers.keySet.contains(leader))
-                      leaderInfo = Some(aliveBrokers(leader))
-                    else throw new LeaderNotAvailableException("Leader not available for
partition %s".format(topicAndPartition))
-                    isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
-                    if(replicaInfo.size < replicas.size)
-                      throw new ReplicaNotAvailableException("Replica information not available
for following brokers: " +
-                        replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-                    if(isrInfo.size < isr.size)
-                      throw new ReplicaNotAvailableException("In Sync Replica information
not available for following brokers: " +
-                        isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-                    new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo,
isrInfo, ErrorMapping.NoError)
-                  } catch {
-                    case e =>
-                      error("Error while fetching metadata for partition %s".format(topicAndPartition),
e)
-                      new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo,
isrInfo,
-                        ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-                  }
-                case None => // it is possible that for a newly created topic/partition,
its replicas are assigned, but a
-                  // leader hasn't been assigned yet
-                  debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in
sync replicas = None, leader = None")
-                  new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo,
isrInfo, ErrorMapping.LeaderNotAvailableCode)
+              val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
+              val leader = leaderIsrAndEpoch.leaderAndIsr.leader
+              val isr = leaderIsrAndEpoch.leaderAndIsr.isr
+              debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync
replicas = " + isr + ", leader = " + leader)
+              try {
+                if(aliveBrokers.keySet.contains(leader))
+                  leaderInfo = Some(aliveBrokers(leader))
+                else throw new LeaderNotAvailableException("Leader not available for partition
%s".format(topicAndPartition))
+                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
+                if(replicaInfo.size < replicas.size)
+                  throw new ReplicaNotAvailableException("Replica information not available
for following brokers: " +
+                    replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+                if(isrInfo.size < isr.size)
+                  throw new ReplicaNotAvailableException("In Sync Replica information not
available for following brokers: " +
+                    isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
+                new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo,
isrInfo, ErrorMapping.NoError)
+              } catch {
+                case e =>
+                  error("Error while fetching metadata for partition %s".format(topicAndPartition),
e)
+                  new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo,
isrInfo,
+                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
               }
             }
             new TopicMetadata(topic, partitionMetadata)

http://git-wip-us.apache.org/repos/asf/kafka/blob/eff59330/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 458b9ad..1781bc0 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -262,8 +262,8 @@ class AsyncProducerTest extends JUnit3Suite {
       handler.partitionAndCollate(producerDataList)
     }
     catch {
-      // should not throw UnknownTopicOrPartitionException to allow resend
-      case e: UnknownTopicOrPartitionException => fail("Should not throw UnknownTopicOrPartitionException")
+      // should not throw any exception
+      case e => fail("Should not throw any exception")
 
     }
   }


Mime
View raw message