kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject kafka git commit: KAFKA-1883 Fix NullPointerException in RequestSendThread; reviewed by Neha Narkhede
Date Mon, 26 Jan 2015 02:55:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 995d0d369 -> 4aa3dab3d


KAFKA-1883 Fix NullPointerException in RequestSendThread; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4aa3dab3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4aa3dab3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4aa3dab3

Branch: refs/heads/trunk
Commit: 4aa3dab3de088096461941353ba27cb37f1bd9d1
Parents: 995d0d3
Author: jaikiran pai <jai.forums2013@gmail.com>
Authored: Sun Jan 25 18:54:51 2015 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Sun Jan 25 18:54:58 2015 -0800

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   | 32 +++++++++++---------
 1 file changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4aa3dab3/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index eb492f0..fbef34c 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -125,7 +125,7 @@ class RequestSendThread(val controllerId: Int,
     try {
       lock synchronized {
         var isSendSuccessful = false
-        while(isRunning.get() && !isSendSuccessful) {
+        while (isRunning.get() && !isSendSuccessful) {
           // if a broker goes down for a long time, then at some point the controller's zookeeper
listener will trigger a
           // removeBroker which will invoke shutdown() on this thread. At that point, we
will stop retrying.
           try {
@@ -136,7 +136,7 @@ class RequestSendThread(val controllerId: Int,
             case e: Throwable => // if the send was not successful, reconnect to broker
and resend the message
               warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
                 "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
-                request.toString, toBroker.toString()), e)
+                  request.toString, toBroker.toString()), e)
               channel.disconnect()
               connectToBroker(toBroker, channel)
               isSendSuccessful = false
@@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
               Utils.swallow(Thread.sleep(300))
           }
         }
-        var response: RequestOrResponse = null
-        request.requestId.get match {
-          case RequestKeys.LeaderAndIsrKey =>
-            response = LeaderAndIsrResponse.readFrom(receive.buffer)
-          case RequestKeys.StopReplicaKey =>
-            response = StopReplicaResponse.readFrom(receive.buffer)
-          case RequestKeys.UpdateMetadataKey =>
-            response = UpdateMetadataResponse.readFrom(receive.buffer)
-        }
-        stateChangeLogger.trace("Controller %d epoch %d received response %s for a request
sent to broker %s"
-                                  .format(controllerId, controllerContext.epoch, response.toString,
toBroker.toString))
+        if (receive != null) {
+          var response: RequestOrResponse = null
+          request.requestId.get match {
+            case RequestKeys.LeaderAndIsrKey =>
+              response = LeaderAndIsrResponse.readFrom(receive.buffer)
+            case RequestKeys.StopReplicaKey =>
+              response = StopReplicaResponse.readFrom(receive.buffer)
+            case RequestKeys.UpdateMetadataKey =>
+              response = UpdateMetadataResponse.readFrom(receive.buffer)
+          }
+          stateChangeLogger.trace("Controller %d epoch %d received response %s for a request
sent to broker %s"
+            .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString))
 
-        if(callback != null) {
-          callback(response)
+          if (callback != null) {
+            callback(response)
+          }
         }
       }
     } catch {


Mime
View raw message