Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F1BED200CA4 for ; Wed, 7 Jun 2017 19:22:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F0559160BE2; Wed, 7 Jun 2017 17:22:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4A3FF160BBF for ; Wed, 7 Jun 2017 19:22:54 +0200 (CEST) Received: (qmail 99206 invoked by uid 500); 7 Jun 2017 17:22:48 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 98987 invoked by uid 99); 7 Jun 2017 17:22:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jun 2017 17:22:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7A052DFC8B; Wed, 7 Jun 2017 17:22:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tzulitai@apache.org To: commits@flink.apache.org Date: Wed, 07 Jun 2017 17:22:56 -0000 Message-Id: In-Reply-To: <7bee20c886c540fd9c56125559f1d972@git.apache.org> References: <7bee20c886c540fd9c56125559f1d972@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/12] flink git commit: [FLINK-6830] [kafka] Port migration tests for FlinkKafkaConsumerBase to Flink 1.3 archived-at: Wed, 07 Jun 2017 17:22:56 -0000 [FLINK-6830] [kafka] Port migration tests for FlinkKafkaConsumerBase to Flink 1.3 This commit also consolidates all tests for migration from Flink 1.1 and 1.2 for the FlinkKafkaConsumerBase to a single class FlinkKafkaConsumerBaseMigrationTest. Parameterization is used to test migration from different Flink version savepoints. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74bb9a83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74bb9a83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74bb9a83 Branch: refs/heads/release-1.3 Commit: 74bb9a83a923a76443426e15ad4b4285bcfeb69d Parents: 0544b44 Author: Tzu-Li (Gordon) Tai Authored: Sun Jun 4 00:18:26 2017 +0200 Committer: Tzu-Li (Gordon) Tai Committed: Wed Jun 7 19:08:03 2017 +0200 ---------------------------------------------------------------------- ...inkKafkaConsumerBaseFrom11MigrationTest.java | 347 ----------------- ...inkKafkaConsumerBaseFrom12MigrationTest.java | 338 ----------------- .../FlinkKafkaConsumerBaseMigrationTest.java | 374 +++++++++++++++++++ ...migration-test-flink1.3-empty-state-snapshot | Bin 0 -> 473 bytes ...ka-consumer-migration-test-flink1.3-snapshot | Bin 0 -> 1255 bytes 5 files changed, 374 insertions(+), 685 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/74bb9a83/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java deleted file mode 100644 index c07ebd5..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; -import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.SerializedValue; -import org.junit.Assert; -import org.junit.Test; - -import java.net.URL; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.HashMap; -import java.util.List; - -import static org.mockito.Mockito.mock; - -/** - * Tests for checking whether {@link FlinkKafkaConsumerBase} can restore from snapshots that were - * done using the Flink 1.1 {@link FlinkKafkaConsumerBase}. - * - *

For regenerating the binary snapshot file you have to run the commented out portion - * of each test on a checkout of the Flink 1.1 branch. - */ -public class FlinkKafkaConsumerBaseFrom11MigrationTest { - - /** Test restoring from an legacy empty state, when no partitions could be found for topics. */ - @Test - public void testRestoreFromFlink11WithEmptyStateNoPartitions() throws Exception { - final DummyFlinkKafkaConsumer consumerFunction = - new DummyFlinkKafkaConsumer<>(Collections.emptyList()); - - StreamSource> consumerOperator = new StreamSource<>(consumerFunction); - - final AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); - - testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - testHarness.setup(); - // restore state from binary snapshot file using legacy method - testHarness.initializeStateFromLegacyCheckpoint( - getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot")); - testHarness.open(); - - // assert that no partitions were found and is empty - Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); - Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); - - // assert that no state was restored - Assert.assertTrue(consumerFunction.getRestoredState() == null); - - consumerOperator.close(); - consumerOperator.cancel(); - } - - /** Test restoring from an empty state taken using Flink 1.1, when some partitions could be found for topics. */ - @Test - public void testRestoreFromFlink11WithEmptyStateWithPartitions() throws Exception { - final List partitions = new ArrayList<>(); - partitions.add(new KafkaTopicPartition("abc", 13)); - partitions.add(new KafkaTopicPartition("def", 7)); - - final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>(partitions); - - StreamSource> consumerOperator = - new StreamSource<>(consumerFunction); - - final AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); - - testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - testHarness.setup(); - // restore state from binary snapshot file using legacy method - testHarness.initializeStateFromLegacyCheckpoint( - getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot")); - testHarness.open(); - - // the expected state in "kafka-consumer-migration-test-flink1.1-empty-state-snapshot"; - // since the state is empty, the consumer should reflect on the startup mode to determine start offsets. - final HashMap expectedSubscribedPartitionsWithStartOffsets = new HashMap<>(); - expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("abc", 13), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); - expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("def", 7), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); - - // assert that there are partitions and is identical to expected list - Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); - Assert.assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); - Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets()); - - // assert that no state was restored - Assert.assertTrue(consumerFunction.getRestoredState() == null); - - consumerOperator.close(); - consumerOperator.cancel(); - } - - /** Test restoring from a non-empty state taken using Flink 1.1, when some partitions could be found for topics. */ - @Test - public void testRestoreFromFlink11() throws Exception { - final List partitions = new ArrayList<>(); - partitions.add(new KafkaTopicPartition("abc", 13)); - partitions.add(new KafkaTopicPartition("def", 7)); - - final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>(partitions); - - StreamSource> consumerOperator = - new StreamSource<>(consumerFunction); - - final AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); - - testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - testHarness.setup(); - // restore state from binary snapshot file using legacy method - testHarness.initializeStateFromLegacyCheckpoint( - getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot")); - testHarness.open(); - - // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot" - final HashMap expectedState = new HashMap<>(); - expectedState.put(new KafkaTopicPartition("abc", 13), 16768L); - expectedState.put(new KafkaTopicPartition("def", 7), 987654321L); - - // assert that there are partitions and is identical to expected list - Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); - Assert.assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); - - // on restore, subscribedPartitionsToStartOffsets should be identical to the restored state - Assert.assertEquals(expectedState, consumerFunction.getSubscribedPartitionsToStartOffsets()); - - // assert that state is correctly restored from legacy checkpoint - Assert.assertTrue(consumerFunction.getRestoredState() != null); - Assert.assertEquals(expectedState, consumerFunction.getRestoredState()); - - consumerOperator.close(); - consumerOperator.cancel(); - } - - // ------------------------------------------------------------------------ - - private static String getResourceFilename(String filename) { - ClassLoader cl = FlinkKafkaConsumerBaseFrom11MigrationTest.class.getClassLoader(); - URL resource = cl.getResource(filename); - if (resource == null) { - throw new NullPointerException("Missing snapshot resource."); - } - return resource.getFile(); - } - - private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase { - private static final long serialVersionUID = 1L; - - private final List partitions; - - @SuppressWarnings("unchecked") - DummyFlinkKafkaConsumer(List partitions) { - super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class)); - this.partitions = partitions; - } - - @Override - protected AbstractFetcher createFetcher( - SourceContext sourceContext, - Map thisSubtaskPartitionsWithStartOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, - StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { - return mock(AbstractFetcher.class); - } - - @Override - protected List getKafkaPartitions(List topics) { - return partitions; - } - - @Override - protected boolean getIsAutoCommitEnabled() { - return false; - } - } -} - -/* - THE CODE FOR FLINK 1.1 - - @Test - public void testRestoreFromFlink11() throws Exception { - // -------------------------------------------------------------------- - // prepare fake states - // -------------------------------------------------------------------- - - final HashMap state1 = new HashMap<>(); - state1.put(new KafkaTopicPartition("abc", 13), 16768L); - state1.put(new KafkaTopicPartition("def", 7), 987654321L); - - final OneShotLatch latch = new OneShotLatch(); - final AbstractFetcher fetcher = mock(AbstractFetcher.class); - - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - latch.trigger(); - return null; - } - }).when(fetcher).runFetchLoop(); - - when(fetcher.snapshotCurrentState()).thenReturn(state1); - - final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>( - new FetcherFactory() { - private static final long serialVersionUID = -2803131905656983619L; - - @Override - public AbstractFetcher createFetcher() { - return fetcher; - } - }); - - StreamSource> consumerOperator = - new StreamSource<>(consumerFunction); - - final OneInputStreamOperatorTestHarness testHarness = - new OneInputStreamOperatorTestHarness<>(consumerOperator); - - testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - testHarness.setup(); - testHarness.open(); - - final Throwable[] error = new Throwable[1]; - - // run the source asynchronously - Thread runner = new Thread() { - @Override - public void run() { - try { - consumerFunction.run(new DummySourceContext() { - @Override - public void collect(String element) { - latch.trigger(); - } - }); - } - catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } - } - }; - runner.start(); - - if (!latch.isTriggered()) { - latch.await(); - } - - StreamTaskState snapshot = testHarness.snapshot(0L, 0L); - testHarness.snaphotToFile(snapshot, "src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-2"); - consumerOperator.run(new Object()); - - consumerOperator.close(); - runner.join(); - - System.out.println("Killed"); - } - - private static abstract class DummySourceContext - implements SourceFunction.SourceContext { - - private final Object lock = new Object(); - - @Override - public void collectWithTimestamp(String element, long timestamp) { - } - - @Override - public void emitWatermark(Watermark mark) { - } - - @Override - public Object getCheckpointLock() { - return lock; - } - - @Override - public void close() { - } - } - - - // ------------------------------------------------------------------------ - - private interface FetcherFactory extends Serializable { - AbstractFetcher createFetcher(); - } - - private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase { - private static final long serialVersionUID = 1L; - - private final FetcherFactory fetcherFactory; - - @SuppressWarnings("unchecked") - public DummyFlinkKafkaConsumer(FetcherFactory fetcherFactory) { - super((KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class)); - - final List partitions = new ArrayList<>(); - partitions.add(new KafkaTopicPartition("dummy-topic", 0)); - setSubscribedPartitions(partitions); - - this.fetcherFactory = fetcherFactory; - } - - @Override - protected AbstractFetcher createFetcher(SourceContext sourceContext, List thisSubtaskPartitions, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { - return fetcherFactory.createFetcher(); - } - } -* */ http://git-wip-us.apache.org/repos/asf/flink/blob/74bb9a83/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java deleted file mode 100644 index f11bf9f..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; -import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OperatorSnapshotUtil; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.SerializedValue; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Tests for checking whether {@link FlinkKafkaConsumerBase} can restore from snapshots that were - * done using the Flink 1.2 {@link FlinkKafkaConsumerBase}. - * - *

For regenerating the binary snapshot files run {@link #writeSnapshot()} on the Flink 1.2 - * branch. - */ -public class FlinkKafkaConsumerBaseFrom12MigrationTest { - - final static HashMap PARTITION_STATE = new HashMap<>(); - - static { - PARTITION_STATE.put(new KafkaTopicPartition("abc", 13), 16768L); - PARTITION_STATE.put(new KafkaTopicPartition("def", 7), 987654321L); - } - - /** - * Manually run this to write binary snapshot data. - */ - @Ignore - @Test - public void writeSnapshot() throws Exception { - writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot", PARTITION_STATE); - - final HashMap emptyState = new HashMap<>(); - writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot", emptyState); - } - - private void writeSnapshot(String path, HashMap state) throws Exception { - - final OneShotLatch latch = new OneShotLatch(); - final AbstractFetcher fetcher = mock(AbstractFetcher.class); - - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - latch.trigger(); - return null; - } - }).when(fetcher).runFetchLoop(); - - when(fetcher.snapshotCurrentState()).thenReturn(state); - - final List partitions = new ArrayList<>(PARTITION_STATE.keySet()); - - final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>(fetcher, partitions); - - StreamSource> consumerOperator = - new StreamSource<>(consumerFunction); - - - final AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); - - testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - testHarness.setup(); - testHarness.open(); - - final Throwable[] error = new Throwable[1]; - - // run the source asynchronously - Thread runner = new Thread() { - @Override - public void run() { - try { - consumerFunction.run(new DummySourceContext() { - @Override - public void collect(String element) { - - } - }); - } - catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } - } - }; - runner.start(); - - if (!latch.isTriggered()) { - latch.await(); - } - - final OperatorStateHandles snapshot; - synchronized (testHarness.getCheckpointLock()) { - snapshot = testHarness.snapshot(0L, 0L); - } - - OperatorSnapshotUtil.writeStateHandle(snapshot, path); - - consumerOperator.close(); - runner.join(); - } - - /** - * Test restoring from an legacy empty state, when no partitions could be found for topics. - */ - @Test - public void testRestoreFromEmptyStateNoPartitions() throws Exception { - final DummyFlinkKafkaConsumer consumerFunction = - new DummyFlinkKafkaConsumer<>(Collections.emptyList()); - - StreamSource> consumerOperator = new StreamSource<>(consumerFunction); - - final AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); - - testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - testHarness.setup(); - // restore state from binary snapshot file - testHarness.initializeState( - OperatorSnapshotUtil.readStateHandle( - OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-empty-state-snapshot"))); - testHarness.open(); - - // assert that no partitions were found and is empty - assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); - assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); - - // assert that no state was restored - assertTrue(consumerFunction.getRestoredState() == null); - - consumerOperator.close(); - consumerOperator.cancel(); - } - - /** - * Test restoring from an empty state taken using Flink 1.2, when some partitions could be - * found for topics. - */ - @Test - public void testRestoreFromEmptyStateWithPartitions() throws Exception { - final List partitions = new ArrayList<>(PARTITION_STATE.keySet()); - - final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>(partitions); - - StreamSource> consumerOperator = - new StreamSource<>(consumerFunction); - - final AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); - - testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - testHarness.setup(); - // restore state from binary snapshot file - testHarness.initializeState( - OperatorSnapshotUtil.readStateHandle( - OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-empty-state-snapshot"))); - testHarness.open(); - - // the expected state in "kafka-consumer-migration-test-flink1.2-empty-state-snapshot"; - // since the state is empty, the consumer should reflect on the startup mode to determine start offsets. - final HashMap expectedSubscribedPartitionsWithStartOffsets = new HashMap<>(); - for (KafkaTopicPartition partition : PARTITION_STATE.keySet()) { - expectedSubscribedPartitionsWithStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); - } - - // assert that there are partitions and is identical to expected list - assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); - assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); - Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets()); - - assertTrue(consumerFunction.getRestoredState() == null); - - consumerOperator.close(); - consumerOperator.cancel(); - } - - /** - * Test restoring from a non-empty state taken using Flink 1.2, when some partitions could be - * found for topics. - */ - @Test - public void testRestore() throws Exception { - final List partitions = new ArrayList<>(PARTITION_STATE.keySet()); - - final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>(partitions); - - StreamSource> consumerOperator = - new StreamSource<>(consumerFunction); - - final AbstractStreamOperatorTestHarness testHarness = - new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); - - testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - testHarness.setup(); - // restore state from binary snapshot file - testHarness.initializeState( - OperatorSnapshotUtil.readStateHandle( - OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-snapshot"))); - testHarness.open(); - - // assert that there are partitions and is identical to expected list - assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); - assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); - - // on restore, subscribedPartitionsToStartOffsets should be identical to the restored state - Assert.assertEquals(PARTITION_STATE, consumerFunction.getSubscribedPartitionsToStartOffsets()); - - // assert that state is correctly restored from legacy checkpoint - assertTrue(consumerFunction.getRestoredState() != null); - Assert.assertEquals(PARTITION_STATE, consumerFunction.getRestoredState()); - - consumerOperator.close(); - consumerOperator.cancel(); - } - - // ------------------------------------------------------------------------ - - private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase { - private static final long serialVersionUID = 1L; - - private final List partitions; - - private final AbstractFetcher fetcher; - - @SuppressWarnings("unchecked") - DummyFlinkKafkaConsumer(AbstractFetcher fetcher, List partitions) { - super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class)); - this.fetcher = fetcher; - this.partitions = partitions; - } - - DummyFlinkKafkaConsumer(List partitions) { - super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class)); - this.fetcher = mock(AbstractFetcher.class); - this.partitions = partitions; - } - - @Override - protected AbstractFetcher createFetcher( - SourceContext sourceContext, - Map thisSubtaskPartitionsWithStartOffsets, - SerializedValue> watermarksPeriodic, - SerializedValue> watermarksPunctuated, - StreamingRuntimeContext runtimeContext, - OffsetCommitMode offsetCommitMode) throws Exception { - return fetcher; - } - - @Override - protected List getKafkaPartitions(List topics) { - return partitions; - } - - @Override - protected boolean getIsAutoCommitEnabled() { - return false; - } - } - - - private static abstract class DummySourceContext - implements SourceFunction.SourceContext { - - private final Object lock = new Object(); - - @Override - public void collectWithTimestamp(String element, long timestamp) { - } - - @Override - public void emitWatermark(Watermark mark) { - } - - @Override - public Object getCheckpointLock() { - return lock; - } - - @Override - public void close() { - } - - @Override - public void markAsTemporarilyIdle() { - - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/74bb9a83/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java new file mode 100644 index 0000000..70e60f3 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.streaming.util.migration.MigrationTestUtil; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.SerializedValue; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Collection; + +/** + * Tests for checking whether {@link FlinkKafkaConsumerBase} can restore from snapshots that were + * done using previous Flink versions' {@link FlinkKafkaConsumerBase}. + * + *

For regenerating the binary snapshot files run {@link #writeSnapshot()} on the corresponding + * Flink release-* branch. + */ +@RunWith(Parameterized.class) +public class FlinkKafkaConsumerBaseMigrationTest { + + /** + * TODO change this to the corresponding savepoint version to be written (e.g. {@link MigrationVersion#v1_3} for 1.3) + * TODO and remove all @Ignore annotations on write*Snapshot() methods to generate savepoints + */ + private final MigrationVersion flinkGenerateSavepointVersion = null; + + final static HashMap PARTITION_STATE = new HashMap<>(); + + static { + PARTITION_STATE.put(new KafkaTopicPartition("abc", 13), 16768L); + PARTITION_STATE.put(new KafkaTopicPartition("def", 7), 987654321L); + } + + private final MigrationVersion testMigrateVersion; + + @Parameterized.Parameters(name = "Migration Savepoint: {0}") + public static Collection parameters () { + return Arrays.asList(MigrationVersion.v1_1, MigrationVersion.v1_2, MigrationVersion.v1_3); + } + + public FlinkKafkaConsumerBaseMigrationTest(MigrationVersion testMigrateVersion) { + this.testMigrateVersion = testMigrateVersion; + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeSnapshot() throws Exception { + writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink" + flinkGenerateSavepointVersion + "-snapshot", PARTITION_STATE); + + final HashMap emptyState = new HashMap<>(); + writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink" + flinkGenerateSavepointVersion + "-empty-state-snapshot", emptyState); + } + + private void writeSnapshot(String path, HashMap state) throws Exception { + + final OneShotLatch latch = new OneShotLatch(); + final AbstractFetcher fetcher = mock(AbstractFetcher.class); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + latch.trigger(); + return null; + } + }).when(fetcher).runFetchLoop(); + + when(fetcher.snapshotCurrentState()).thenReturn(state); + + final List partitions = new ArrayList<>(PARTITION_STATE.keySet()); + + final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>(fetcher, partitions); + + StreamSource> consumerOperator = + new StreamSource<>(consumerFunction); + + + final AbstractStreamOperatorTestHarness testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + testHarness.setup(); + testHarness.open(); + + final Throwable[] error = new Throwable[1]; + + // run the source asynchronously + Thread runner = new Thread() { + @Override + public void run() { + try { + consumerFunction.run(new DummySourceContext() { + @Override + public void collect(String element) { + + } + }); + } + catch (Throwable t) { + t.printStackTrace(); + error[0] = t; + } + } + }; + runner.start(); + + if (!latch.isTriggered()) { + latch.await(); + } + + final OperatorStateHandles snapshot; + synchronized (testHarness.getCheckpointLock()) { + snapshot = testHarness.snapshot(0L, 0L); + } + + OperatorSnapshotUtil.writeStateHandle(snapshot, path); + + consumerOperator.close(); + runner.join(); + } + + /** + * Test restoring from an legacy empty state, when no partitions could be found for topics. + */ + @Test + public void testRestoreFromEmptyStateNoPartitions() throws Exception { + final DummyFlinkKafkaConsumer consumerFunction = + new DummyFlinkKafkaConsumer<>(Collections.emptyList()); + + StreamSource> consumerOperator = new StreamSource<>(consumerFunction); + + final AbstractStreamOperatorTestHarness testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + testHarness.setup(); + + // restore state from binary snapshot file + MigrationTestUtil.restoreFromSnapshot( + testHarness, + OperatorSnapshotUtil.getResourceFilename( + "kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"), + testMigrateVersion); + + testHarness.open(); + + // assert that no partitions were found and is empty + assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); + assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); + + // assert that no state was restored + assertTrue(consumerFunction.getRestoredState() == null); + + consumerOperator.close(); + consumerOperator.cancel(); + } + + /** + * Test restoring from an empty state taken using a previous Flink version, when some partitions could be + * found for topics. + */ + @Test + public void testRestoreFromEmptyStateWithPartitions() throws Exception { + final List partitions = new ArrayList<>(PARTITION_STATE.keySet()); + + final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>(partitions); + + StreamSource> consumerOperator = + new StreamSource<>(consumerFunction); + + final AbstractStreamOperatorTestHarness testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + testHarness.setup(); + + // restore state from binary snapshot file + MigrationTestUtil.restoreFromSnapshot( + testHarness, + OperatorSnapshotUtil.getResourceFilename( + "kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"), + testMigrateVersion); + + testHarness.open(); + + // the expected state in "kafka-consumer-migration-test-flink*-empty-state-snapshot"; + // since the state is empty, the consumer should reflect on the startup mode to determine start offsets. + final HashMap expectedSubscribedPartitionsWithStartOffsets = new HashMap<>(); + for (KafkaTopicPartition partition : PARTITION_STATE.keySet()) { + expectedSubscribedPartitionsWithStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + } + + // assert that there are partitions and is identical to expected list + assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); + assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); + Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets()); + + assertTrue(consumerFunction.getRestoredState() == null); + + consumerOperator.close(); + consumerOperator.cancel(); + } + + /** + * Test restoring from a non-empty state taken using a previous Flink version, when some partitions could be + * found for topics. + */ + @Test + public void testRestore() throws Exception { + final List partitions = new ArrayList<>(PARTITION_STATE.keySet()); + + final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer<>(partitions); + + StreamSource> consumerOperator = + new StreamSource<>(consumerFunction); + + final AbstractStreamOperatorTestHarness testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + testHarness.setup(); + + // restore state from binary snapshot file + MigrationTestUtil.restoreFromSnapshot( + testHarness, + OperatorSnapshotUtil.getResourceFilename( + "kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"), + testMigrateVersion); + + testHarness.open(); + + // assert that there are partitions and is identical to expected list + assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); + assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); + + // on restore, subscribedPartitionsToStartOffsets should be identical to the restored state + Assert.assertEquals(PARTITION_STATE, consumerFunction.getSubscribedPartitionsToStartOffsets()); + + // assert that state is correctly restored from legacy checkpoint + assertTrue(consumerFunction.getRestoredState() != null); + Assert.assertEquals(PARTITION_STATE, consumerFunction.getRestoredState()); + + consumerOperator.close(); + consumerOperator.cancel(); + } + + // ------------------------------------------------------------------------ + + private static class DummyFlinkKafkaConsumer extends FlinkKafkaConsumerBase { + private static final long serialVersionUID = 1L; + + private final List partitions; + + private final AbstractFetcher fetcher; + + @SuppressWarnings("unchecked") + DummyFlinkKafkaConsumer(AbstractFetcher fetcher, List partitions) { + super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class)); + this.fetcher = fetcher; + this.partitions = partitions; + } + + DummyFlinkKafkaConsumer(List partitions) { + super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class)); + this.fetcher = mock(AbstractFetcher.class); + this.partitions = partitions; + } + + @Override + protected AbstractFetcher createFetcher( + SourceContext sourceContext, + Map thisSubtaskPartitionsWithStartOffsets, + SerializedValue> watermarksPeriodic, + SerializedValue> watermarksPunctuated, + StreamingRuntimeContext runtimeContext, + OffsetCommitMode offsetCommitMode) throws Exception { + return fetcher; + } + + @Override + protected List getKafkaPartitions(List topics) { + return partitions; + } + + @Override + protected boolean getIsAutoCommitEnabled() { + return false; + } + } + + + private static abstract class DummySourceContext + implements SourceFunction.SourceContext { + + private final Object lock = new Object(); + + @Override + public void collectWithTimestamp(String element, long timestamp) { + } + + @Override + public void emitWatermark(Watermark mark) { + } + + @Override + public Object getCheckpointLock() { + return lock; + } + + @Override + public void close() { + } + + @Override + public void markAsTemporarilyIdle() { + + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/74bb9a83/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot new file mode 100644 index 0000000..1a5aad1 Binary files /dev/null and b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-empty-state-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/74bb9a83/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot new file mode 100644 index 0000000..dc820ef Binary files /dev/null and b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.3-snapshot differ