From jira-return-9695-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Feb 7 23:09:08 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 009E418065B for ; Wed, 7 Feb 2018 23:09:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E4A38160C5C; Wed, 7 Feb 2018 22:09:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DEAAA160C3A for ; Wed, 7 Feb 2018 23:09:06 +0100 (CET) Received: (qmail 60574 invoked by uid 500); 7 Feb 2018 22:09:06 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 60563 invoked by uid 99); 7 Feb 2018 22:09:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Feb 2018 22:09:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id A1EF5C0042 for ; Wed, 7 Feb 2018 22:09:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.511 X-Spam-Level: X-Spam-Status: No, score=-109.511 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 0Q_67EJZotUV for ; Wed, 7 Feb 2018 22:09:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 437C15F5B3 for ; Wed, 7 Feb 2018 22:09:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id E8434E018A for ; Wed, 7 Feb 2018 22:09:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 4DCA4240F6 for ; Wed, 7 Feb 2018 22:09:00 +0000 (UTC) Date: Wed, 7 Feb 2018 22:09:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-4641) Improve test coverage of StreamsThread MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/KAFKA-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356147#comment-16356147 ] ASF GitHub Bot commented on KAFKA-4641: --------------------------------------- guozhangwang closed pull request #4531: KAFKA-4641: Add more unit test for stream thread URL: https://github.com/apache/kafka/pull/4531 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 064a2935515..5e25d02973a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1183,8 +1183,12 @@ public String toString(final String indent) { return sb.toString(); } - // this is for testing only + // the following are for testing only TaskManager taskManager() { return taskManager; } + + Map>> standbyRecords() { + return standbyRecords; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e67fe14503c..cc056044792 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; @@ -40,7 +41,13 @@ import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest; +import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; @@ -100,13 +107,16 @@ public void setUp() { } private final String topic1 = "topic1"; + private final String topic2 = "topic2"; private final TopicPartition t1p1 = new TopicPartition(topic1, 1); private final TopicPartition t1p2 = new TopicPartition(topic1, 2); + private final TopicPartition t2p1 = new TopicPartition(topic2, 1); // task0 is unused private final TaskId task1 = new TaskId(0, 1); private final TaskId task2 = new TaskId(0, 2); + private final TaskId task3 = new TaskId(1, 1); private Properties configProps(final boolean enableEos) { return new Properties() { @@ -129,7 +139,7 @@ private Properties configProps(final boolean enableEos) { public void testPartitionAssignmentChangeForSingleGroup() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final StreamThread thread = getStreamThread(); + final StreamThread thread = createStreamThread(clientId, config, false); final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); @@ -685,10 +695,6 @@ public void onChange(final Thread thread, final ThreadStateTransitionValidator n } } - private StreamThread getStreamThread() { - return createStreamThread(clientId, config, false); - } - @Test public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException { internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); @@ -759,6 +765,151 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() throws Interrupte assertTrue(threadMetadata.activeTasks().isEmpty()); } + @SuppressWarnings("unchecked") + @Test + public void shouldUpdateStandbyTask() { + final String storeName1 = "count-one"; + final String storeName2 = "table-two"; + final String changelogName = applicationId + "-" + storeName1 + "-changelog"; + final TopicPartition partition1 = new TopicPartition(changelogName, 1); + final TopicPartition partition2 = t2p1; + internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) + .groupByKey().count(Materialized.>as(storeName1)); + internalStreamsBuilder.table(topic2, new ConsumedInternal(), new MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, "")); + + final StreamThread thread = createStreamThread(clientId, config, false); + final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; + restoreConsumer.updatePartitions(changelogName, + Collections.singletonList(new PartitionInfo(changelogName, + 1, + null, + new Node[0], + new Node[0]))); + + restoreConsumer.assign(Utils.mkSet(partition1, partition2)); + restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L)); + restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L)); + restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L)); + restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L)); + // let the store1 be restored from 0 to 10; store2 be restored from 0 to (committed offset) 5 + clientSupplier.consumer.assign(Utils.mkSet(partition2)); + clientSupplier.consumer.commitSync(Collections.singletonMap(partition2, new OffsetAndMetadata(5L, ""))); + + for (long i = 0L; i < 10L; i++) { + restoreConsumer.addRecord(new ConsumerRecord<>(changelogName, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes())); + restoreConsumer.addRecord(new ConsumerRecord<>(topic2, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes())); + } + + thread.setState(StreamThread.State.RUNNING); + + thread.rebalanceListener.onPartitionsRevoked(null); + + final Map> standbyTasks = new HashMap<>(); + + // assign single partition + standbyTasks.put(task1, Collections.singleton(t1p1)); + standbyTasks.put(task3, Collections.singleton(t2p1)); + + thread.taskManager().setAssignmentMetadata(Collections.>emptyMap(), standbyTasks); + + thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList()); + + thread.runOnce(-1); + + final StandbyTask standbyTask1 = thread.taskManager().standbyTask(partition1); + final StandbyTask standbyTask2 = thread.taskManager().standbyTask(partition2); + final KeyValueStore store1 = (KeyValueStore) standbyTask1.getStore(storeName1); + final KeyValueStore store2 = (KeyValueStore) standbyTask2.getStore(storeName2); + + assertEquals(10L, store1.approximateNumEntries()); + assertEquals(5L, store2.approximateNumEntries()); + assertEquals(Collections.singleton(partition2), restoreConsumer.paused()); + assertEquals(1, thread.standbyRecords().size()); + assertEquals(5, thread.standbyRecords().get(partition2).size()); + } + + @Test + public void shouldPunctuateActiveTask() { + final List punctuatedStreamTime = new ArrayList<>(); + final List punctuatedWallClockTime = new ArrayList<>(); + final ProcessorSupplier punctuateProcessor = new ProcessorSupplier() { + @Override + public Processor get() { + return new Processor() { + @Override + public void init(ProcessorContext context) { + context.schedule(100L, PunctuationType.STREAM_TIME, new Punctuator() { + @Override + public void punctuate(long timestamp) { + punctuatedStreamTime.add(timestamp); + } + }); + context.schedule(100L, PunctuationType.WALL_CLOCK_TIME, new Punctuator() { + @Override + public void punctuate(long timestamp) { + punctuatedWallClockTime.add(timestamp); + } + }); + } + + @Override + public void process(Object key, Object value) { } + + @SuppressWarnings("deprecation") + @Override + public void punctuate(long timestamp) { } + + @Override + public void close() { } + }; + } + }; + + internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor); + + final StreamThread thread = createStreamThread(clientId, config, false); + + thread.setState(StreamThread.State.RUNNING); + + thread.rebalanceListener.onPartitionsRevoked(null); + final List assignedPartitions = new ArrayList<>(); + + final Map> activeTasks = new HashMap<>(); + + // assign single partition + assignedPartitions.add(t1p1); + activeTasks.put(task1, Collections.singleton(t1p1)); + + thread.taskManager().setAssignmentMetadata(activeTasks, Collections.>emptyMap()); + + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + clientSupplier.consumer.assign(assignedPartitions); + clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + + thread.runOnce(-1); + + assertEquals(0, punctuatedStreamTime.size()); + assertEquals(0, punctuatedWallClockTime.size()); + + mockTime.sleep(100L); + for (long i = 0L; i < 10L; i++) { + clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1, i, i * 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" + i).getBytes().length, ("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" + i).getBytes())); + } + + thread.runOnce(-1); + + assertEquals(1, punctuatedStreamTime.size()); + assertEquals(1, punctuatedWallClockTime.size()); + + mockTime.sleep(100L); + + thread.runOnce(-1); + + // we should skip stream time punctuation, only trigger wall-clock time punctuation + assertEquals(1, punctuatedStreamTime.size()); + assertEquals(2, punctuatedWallClockTime.size()); + } + @Test public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException { final StreamThread thread = createStreamThread(clientId, config, false); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org > Improve test coverage of StreamsThread > -------------------------------------- > > Key: KAFKA-4641 > URL: https://issues.apache.org/jira/browse/KAFKA-4641 > Project: Kafka > Issue Type: Sub-task > Components: streams > Affects Versions: 0.11.0.0 > Reporter: Damian Guy > Priority: Minor > Labels: newbie > > Some methods in {{StreamThread}} have little or no coverage. > In particular: > {{maybeUpdateStandbyTasks}} has little to no coverage > Committing of StandbyTasks in {{commitAll}} > {{maybePunctuate}} > {{commitOne}} - no tests for exceptions > {{unAssignChangeLogPartitions} - no tests for exceptions > {{addStreamsTask}} - no tests for exceptions > {{runLoop}} > Please see coverage report attached to parent -- This message was sent by Atlassian JIRA (v7.6.3#76005)