kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3960; Committed offset not set after first assign [Forced Update!]
Date Sat, 23 Jul 2016 10:01:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 031bbe58e -> 17914453e (forced update)


KAFKA-3960; Committed offset not set after first assign

Author: Alexey Romanchuk <al.romanchuk@2gis.ru>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1629 from 13h3r/kafka-3960

(cherry picked from commit 932bb84c837807eeca3600007ae3030561fdcb37)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/17914453
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/17914453
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/17914453

Branch: refs/heads/0.10.0
Commit: 17914453e35cc29396685b7fd7665918861b9805
Parents: 24daae7
Author: Alexey Romanchuk <al.romanchuk@2gis.ru>
Authored: Sat Jul 23 09:31:16 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Jul 23 11:00:15 2016 +0100

----------------------------------------------------------------------
 .../consumer/internals/SubscriptionState.java   |  1 +
 .../clients/consumer/KafkaConsumerTest.java     | 60 ++++++++++++++++++++
 .../internals/SubscriptionStateTest.java        |  2 +
 3 files changed, 63 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/17914453/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 2412d36..38660e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -170,6 +170,7 @@ public class SubscriptionState {
         this.assignment.keySet().retainAll(this.userAssignment);
 
         this.needsPartitionAssignment = false;
+        this.needsFetchCommittedOffsets = true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/17914453/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 694faf2..d846a69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -59,7 +60,9 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -322,6 +325,55 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    public void testCommitsFetchedDuringAssign() {
+        String topic = "topic";
+        final TopicPartition partition1 = new TopicPartition(topic, 0);
+        final TopicPartition partition2 = new TopicPartition(topic, 1);
+
+        long offset1 = 10000;
+        long offset2 = 20000;
+
+        int sessionTimeoutMs = 3000;
+        int heartbeatIntervalMs = 2000;
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata,
assignor,
+                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+        consumer.assign(Arrays.asList(partition1));
+
+        // lookup coordinator
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(),
node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+        // fetch offset for one topic
+        client.prepareResponseFrom(
+                offsetResponse(Collections.singletonMap(partition1, offset1), Errors.NONE.code()),
+                coordinator);
+
+        assertEquals(offset1, consumer.committed(partition1).offset());
+
+        consumer.assign(Arrays.asList(partition1, partition2));
+
+        // fetch offset for two topics
+        Map<TopicPartition, Long> offsets = new HashMap<>();
+        offsets.put(partition1, offset1);
+        offsets.put(partition2, offset2);
+        client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE.code()), coordinator);
+
+        assertEquals(offset1, consumer.committed(partition1).offset());
+        assertEquals(offset2, consumer.committed(partition2).offset());
+    }
+
+    @Test
     public void testAutoCommitSentBeforePositionUpdate() {
         String topic = "topic";
         final TopicPartition partition = new TopicPartition(topic, 0);
@@ -479,6 +531,14 @@ public class KafkaConsumerTest {
         return new SyncGroupResponse(error, buf).toStruct();
     }
 
+    private Struct offsetResponse(Map<TopicPartition, Long> offsets, short error) {
+        Map<TopicPartition, OffsetFetchResponse.PartitionData> partitionData = new
HashMap<>();
+        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
+            partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(),
"", error));
+        }
+        return new OffsetFetchResponse(partitionData).toStruct();
+    }
+
     private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) {
         MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
         for (int i = 0; i < count; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/17914453/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 439ded7..3b4b10e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -46,6 +46,8 @@ public class SubscriptionStateTest {
         state.assignFromUser(Arrays.asList(tp0));
         assertEquals(Collections.singleton(tp0), state.assignedPartitions());
         assertFalse(state.partitionAssignmentNeeded());
+        assertFalse(state.hasAllFetchPositions());
+        assertTrue(state.refreshCommitsNeeded());
         state.committed(tp0, new OffsetAndMetadata(1));
         state.seek(tp0, 1);
         assertTrue(state.isFetchable(tp0));


Mime
View raw message