kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-852, remove clientId from Offset{Fetch, Commit}Response. Reviewed by Jay.
Date Thu, 11 Jul 2013 22:37:31 GMT
Updated Branches:
  refs/heads/trunk db37ed005 -> c27c76846


KAFKA-852, remove clientId from Offset{Fetch,Commit}Response. Reviewed by Jay.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c27c7684
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c27c7684
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c27c7684

Branch: refs/heads/trunk
Commit: c27c768463a5dc6be113f2e5b3e00bf8d9d9d602
Parents: db37ed0
Author: David Arthur <mumrah@gmail.com>
Authored: Thu Jul 11 15:34:57 2013 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Thu Jul 11 15:34:57 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/api/OffsetCommitResponse.scala     | 17 +++--------------
 .../main/scala/kafka/api/OffsetFetchResponse.scala | 15 ++-------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |  6 ++----
 3 files changed, 7 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c27c7684/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index cbb5fa1..ad54bd6 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -25,14 +25,9 @@ import kafka.utils.Logging
 
 object OffsetCommitResponse extends Logging {
   val CurrentVersion: Short = 0
-  val DefaultClientId = ""
 
   def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
-    // Read values from the envelope
     val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
-
-    // Read the OffsetResponse 
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
       val topic = readShortString(buffer)
@@ -43,23 +38,18 @@ object OffsetCommitResponse extends Logging {
         (TopicAndPartition(topic, partitionId), error)
       })
     })
-    OffsetCommitResponse(Map(pairs:_*), correlationId, clientId)
+    OffsetCommitResponse(Map(pairs:_*), correlationId)
   }
 }
 
 case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
-                               override val correlationId: Int = 0,
-                               clientId: String = OffsetCommitResponse.DefaultClientId)
-    extends RequestOrResponse(correlationId = correlationId) {
+                               override val correlationId: Int = 0)
+    extends RequestOrResponse(correlationId=correlationId) {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 
   def writeTo(buffer: ByteBuffer) {
-    // Write envelope
     buffer.putInt(correlationId)
-    writeShortString(buffer, clientId)
-
-    // Write OffsetCommitResponse
     buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
     requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Short]
       writeShortString(buffer, t1._1) // topic
@@ -73,7 +63,6 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
 
   override def sizeInBytes = 
     4 + /* correlationId */
-    shortStringLength(clientId) +
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
       val (topic, offsets) = topicAndOffsets

http://git-wip-us.apache.org/repos/asf/kafka/blob/c27c7684/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
index 71c2efb..ce03a13 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
@@ -25,14 +25,9 @@ import kafka.utils.Logging
 
 object OffsetFetchResponse extends Logging {
   val CurrentVersion: Short = 0
-  val DefaultClientId = ""
 
   def readFrom(buffer: ByteBuffer): OffsetFetchResponse = {
-    // Read values from the envelope
     val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
-
-    // Read the OffsetResponse 
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
       val topic = readShortString(buffer)
@@ -45,23 +40,18 @@ object OffsetFetchResponse extends Logging {
         (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata,
error))
       })
     })
-    OffsetFetchResponse(Map(pairs:_*), correlationId, clientId)
+    OffsetFetchResponse(Map(pairs:_*), correlationId)
   }
 }
 
 case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
-                               override val correlationId: Int = 0,
-                               clientId: String = OffsetFetchResponse.DefaultClientId)
+                               override val correlationId: Int = 0)
     extends RequestOrResponse(correlationId = correlationId) {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 
   def writeTo(buffer: ByteBuffer) {
-    // Write envelope
     buffer.putInt(correlationId)
-    writeShortString(buffer, clientId)
-
-    // Write OffsetFetchResponse
     buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
     requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
       writeShortString(buffer, t1._1) // topic
@@ -77,7 +67,6 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
 
   override def sizeInBytes = 
     4 + /* correlationId */
-    shortStringLength(clientId) +
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
       val (topic, offsets) = topicAndOffsets

http://git-wip-us.apache.org/repos/asf/kafka/blob/c27c7684/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 520bbf0..0ec031a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -608,8 +608,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
     val response = new OffsetCommitResponse(responseInfo, 
-                                            offsetCommitRequest.correlationId,
-                                            offsetCommitRequest.clientId)
+                                            offsetCommitRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
@@ -636,8 +635,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     })
     val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), 
-                                           offsetFetchRequest.correlationId,
-                                           offsetFetchRequest.clientId)
+                                           offsetFetchRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 


Mime
View raw message