kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject kafka git commit: MINOR: JoinGroupRequest V0 invalid rebalance timeout
Date Thu, 11 May 2017 20:18:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 24a4e6146 -> 27107ee34


MINOR: JoinGroupRequest V0 invalid rebalance timeout

A JoinGroupRequest V0 built with the Builder had
a rebalance timeout  = -1 rather than equal to session timeout
as it would have been if coming from the wire and deserialized
from a V0 Struct

fix developed with mimaison

Author: Edoardo Comar <ecomar@uk.ibm.com>

Reviewers: Rajini Sivaram

Closes #2936 from edoardocomar/MINOR-JoinGroupRequestV0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27107ee3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27107ee3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27107ee3

Branch: refs/heads/trunk
Commit: 27107ee34d1df89035eb9b9b4e11036fca6cf723
Parents: 24a4e61
Author: Edoardo Comar <ecomar@uk.ibm.com>
Authored: Thu May 11 16:17:34 2017 -0400
Committer: Rajini Sivaram <rajinisivaram@googlemail.com>
Committed: Thu May 11 16:17:34 2017 -0400

----------------------------------------------------------------------
 .../apache/kafka/common/requests/JoinGroupRequest.java   |  3 ++-
 .../kafka/common/requests/RequestResponseTest.java       | 11 +++++++++--
 2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/27107ee3/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 1080fe7..ff07d13 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -90,7 +90,8 @@ public class JoinGroupRequest extends AbstractRequest {
         @Override
         public JoinGroupRequest build(short version) {
             if (version < 1) {
-                rebalanceTimeout = -1;
+                // v0 had no rebalance timeout but used session timeout implicitly
+                rebalanceTimeout = sessionTimeout;
             }
             return new JoinGroupRequest(version, groupId, sessionTimeout,
                     rebalanceTimeout, memberId, protocolType, groupProtocols);

http://git-wip-us.apache.org/repos/asf/kafka/blob/27107ee3/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 6443e4d..b1e83bf 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -511,7 +511,15 @@ public class RequestResponseTest {
         deserialized = (FetchRequest) deserialize(request, struct, request.version());
         assertEquals(request.isolationLevel(), deserialized.isolationLevel());
     }
-    
+
+    @Test
+    public void testJoinGroupRequestVersion0RebalanceTimeout() throws Exception {
+        final short version = 0;
+        JoinGroupRequest jgr = createJoinGroupRequest(version);
+        JoinGroupRequest jgr2 = new JoinGroupRequest(jgr.toStruct(), version);
+        assertEquals(jgr2.rebalanceTimeout(), jgr.rebalanceTimeout());
+    }
+
     private RequestHeader createRequestHeader() {
         return new RequestHeader((short) 10, (short) 1, "", 10);
     }
@@ -565,7 +573,6 @@ public class RequestResponseTest {
         return new HeartbeatResponse(Errors.NONE);
     }
 
-    @SuppressWarnings("deprecation")
     private JoinGroupRequest createJoinGroupRequest(int version) {
         ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
         List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();


Mime
View raw message