kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4641) Improve test coverage of StreamsThread
Date Wed, 07 Feb 2018 22:09:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356147#comment-16356147
] 

ASF GitHub Bot commented on KAFKA-4641:
---------------------------------------

guozhangwang closed pull request #4531: KAFKA-4641: Add more unit test for stream thread
URL: https://github.com/apache/kafka/pull/4531
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 064a2935515..5e25d02973a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1183,8 +1183,12 @@ public String toString(final String indent) {
         return sb.toString();
     }
 
-    // this is for testing only
+    // the following are for testing only
     TaskManager taskManager() {
         return taskManager;
     }
+
+    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords()
{
+        return standbyRecords;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e67fe14503c..cc056044792 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -40,7 +41,13 @@
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -100,13 +107,16 @@ public void setUp() {
     }
 
     private final String topic1 = "topic1";
+    private final String topic2 = "topic2";
 
     private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
     private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
+    private final TopicPartition t2p1 = new TopicPartition(topic2, 1);
 
     // task0 is unused
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
+    private final TaskId task3 = new TaskId(1, 1);
 
     private Properties configProps(final boolean enableEos) {
         return new Properties() {
@@ -129,7 +139,7 @@ private Properties configProps(final boolean enableEos) {
     public void testPartitionAssignmentChangeForSingleGroup() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
-        final StreamThread thread = getStreamThread();
+        final StreamThread thread = createStreamThread(clientId, config, false);
 
         final StateListenerStub stateListener = new StateListenerStub();
         thread.setStateListener(stateListener);
@@ -685,10 +695,6 @@ public void onChange(final Thread thread, final ThreadStateTransitionValidator
n
         }
     }
 
-    private StreamThread getStreamThread() {
-        return createStreamThread(clientId, config, false);
-    }
-
     @Test
     public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException
{
         internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
@@ -759,6 +765,151 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() throws
Interrupte
         assertTrue(threadMetadata.activeTasks().isEmpty());
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldUpdateStandbyTask() {
+        final String storeName1 = "count-one";
+        final String storeName2 = "table-two";
+        final String changelogName = applicationId + "-" + storeName1 + "-changelog";
+        final TopicPartition partition1 = new TopicPartition(changelogName, 1);
+        final TopicPartition partition2 = t2p1;
+        internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+                .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes,
byte[]>>as(storeName1));
+        internalStreamsBuilder.table(topic2, new ConsumedInternal(), new MaterializedInternal(Materialized.as(storeName2),
internalStreamsBuilder, ""));
+
+        final StreamThread thread = createStreamThread(clientId, config, false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+        restoreConsumer.updatePartitions(changelogName,
+                Collections.singletonList(new PartitionInfo(changelogName,
+                        1,
+                        null,
+                        new Node[0],
+                        new Node[0])));
+
+        restoreConsumer.assign(Utils.mkSet(partition1, partition2));
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L));
+        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L));
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
+        restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L));
+        // let the store1 be restored from 0 to 10; store2 be restored from 0 to (committed
offset) 5
+        clientSupplier.consumer.assign(Utils.mkSet(partition2));
+        clientSupplier.consumer.commitSync(Collections.singletonMap(partition2, new OffsetAndMetadata(5L,
"")));
+
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName, 1, i, ("K"
+ i).getBytes(), ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(topic2, 1, i, ("K" + i).getBytes(),
("V" + i).getBytes()));
+        }
+
+        thread.setState(StreamThread.State.RUNNING);
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        // assign single partition
+        standbyTasks.put(task1, Collections.singleton(t1p1));
+        standbyTasks.put(task3, Collections.singleton(t2p1));
+
+        thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(),
standbyTasks);
+
+        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+
+        thread.runOnce(-1);
+
+        final StandbyTask standbyTask1 = thread.taskManager().standbyTask(partition1);
+        final StandbyTask standbyTask2 = thread.taskManager().standbyTask(partition2);
+        final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>)
standbyTask1.getStore(storeName1);
+        final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>)
standbyTask2.getStore(storeName2);
+
+        assertEquals(10L, store1.approximateNumEntries());
+        assertEquals(5L, store2.approximateNumEntries());
+        assertEquals(Collections.singleton(partition2), restoreConsumer.paused());
+        assertEquals(1, thread.standbyRecords().size());
+        assertEquals(5, thread.standbyRecords().get(partition2).size());
+    }
+
+    @Test
+    public void shouldPunctuateActiveTask() {
+        final List<Long> punctuatedStreamTime = new ArrayList<>();
+        final List<Long> punctuatedWallClockTime = new ArrayList<>();
+        final ProcessorSupplier<Object, Object> punctuateProcessor = new ProcessorSupplier<Object,
Object>() {
+            @Override
+            public Processor<Object, Object> get() {
+                return new Processor<Object, Object>() {
+                    @Override
+                    public void init(ProcessorContext context) {
+                        context.schedule(100L, PunctuationType.STREAM_TIME, new Punctuator()
{
+                            @Override
+                            public void punctuate(long timestamp) {
+                                punctuatedStreamTime.add(timestamp);
+                            }
+                        });
+                        context.schedule(100L, PunctuationType.WALL_CLOCK_TIME, new Punctuator()
{
+                            @Override
+                            public void punctuate(long timestamp) {
+                                punctuatedWallClockTime.add(timestamp);
+                            }
+                        });
+                    }
+
+                    @Override
+                    public void process(Object key, Object value) { }
+
+                    @SuppressWarnings("deprecation")
+                    @Override
+                    public void punctuate(long timestamp) { }
+
+                    @Override
+                    public void close() { }
+                };
+            }
+        };
+
+        internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor);
+
+        final StreamThread thread = createStreamThread(clientId, config, false);
+
+        thread.setState(StreamThread.State.RUNNING);
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        clientSupplier.consumer.assign(assignedPartitions);
+        clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+
+        thread.runOnce(-1);
+
+        assertEquals(0, punctuatedStreamTime.size());
+        assertEquals(0, punctuatedWallClockTime.size());
+
+        mockTime.sleep(100L);
+        for (long i = 0L; i < 10L; i++) {
+            clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1, i, i
* 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" + i).getBytes().length,
("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" + i).getBytes()));
+        }
+
+        thread.runOnce(-1);
+
+        assertEquals(1, punctuatedStreamTime.size());
+        assertEquals(1, punctuatedWallClockTime.size());
+
+        mockTime.sleep(100L);
+
+        thread.runOnce(-1);
+
+        // we should skip stream time punctuation, only trigger wall-clock time punctuation
+        assertEquals(1, punctuatedStreamTime.size());
+        assertEquals(2, punctuatedWallClockTime.size());
+    }
+
     @Test
     public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException
{
         final StreamThread thread = createStreamThread(clientId, config, false);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Improve test coverage of StreamsThread
> --------------------------------------
>
>                 Key: KAFKA-4641
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4641
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>    Affects Versions: 0.11.0.0
>            Reporter: Damian Guy
>            Priority: Minor
>              Labels: newbie
>
> Some methods in {{StreamThread}} have little or no coverage.
> In particular:
> {{maybeUpdateStandbyTasks}} has little to no coverage
> Committing of StandbyTasks in {{commitAll}}
> {{maybePunctuate}}
> {{commitOne}} - no tests for exceptions
> {{unAssignChangeLogPartitions} - no tests for exceptions
> {{addStreamsTask}} - no tests for exceptions
> {{runLoop}}
> Please see coverage report attached to parent



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message