kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1491; Always read coordinator information in consumer metadata response; reviewed by Neha Narkhede.
Date Fri, 27 Jun 2014 18:09:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 62f208704 -> c4b95641e


KAFKA-1491; Always read coordinator information in consumer metadata response; reviewed by
Neha Narkhede.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4b95641
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4b95641
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4b95641

Branch: refs/heads/trunk
Commit: c4b95641ea295478f9480b264652b7b52d8f71c5
Parents: 62f2087
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Fri Jun 27 11:09:18 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Jun 27 11:09:18 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala | 3 ++-
 core/src/main/scala/kafka/client/ClientUtils.scala           | 5 +++++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c4b95641/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
index f8cf6c3..c72ca14 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -29,8 +29,9 @@ object ConsumerMetadataResponse {
   def readFrom(buffer: ByteBuffer) = {
     val correlationId = buffer.getInt
     val errorCode = buffer.getShort
+    val broker = Broker.readFrom(buffer)
     val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
-      Some(Broker.readFrom(buffer))
+      Some(broker)
     else
       None
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4b95641/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index ba5fbdc..ce7ede3 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -160,6 +160,11 @@ object ClientUtils extends Logging{
            debug("Consumer metadata response: " + consumerMetadataResponse.toString)
            if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
              coordinatorOpt = consumerMetadataResponse.coordinatorOpt
+           else {
+             debug("Query to %s:%d to locate offset manager for %s failed - will retry in
%d milliseconds."
+                  .format(queryChannel.host, queryChannel.port, group, retryBackOffMs))
+             Thread.sleep(retryBackOffMs)
+           }
          }
          catch {
            case ioe: IOException =>


Mime
View raw message