From commits-return-43074-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Sep 1 09:44:58 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 13AC4180634 for ; Wed, 1 Sep 2021 11:44:58 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id A968466D48 for ; Wed, 1 Sep 2021 09:42:50 +0000 (UTC) Received: (qmail 81744 invoked by uid 500); 1 Sep 2021 09:42:46 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 81666 invoked by uid 99); 1 Sep 2021 09:42:45 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Sep 2021 09:42:45 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id B30E381EFC; Wed, 1 Sep 2021 09:42:45 +0000 (UTC) Date: Wed, 01 Sep 2021 09:42:43 +0000 To: "commits@flink.apache.org" Subject: [flink] branch master updated: [FLINK-23971][tests] fix connector testing framework error when compare records in different splits MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <163048936223.9819.11277554308697893731@gitbox.apache.org> From: jqin@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: flink X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: d09ef68549fcc7422e4d6bc0f0537c9abcfc81f1 X-Git-Newrev: ad052cc056c7d6e63d8356dbd22d6a98b54743c3 X-Git-Rev: ad052cc056c7d6e63d8356dbd22d6a98b54743c3 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new ad052cc [FLINK-23971][tests] fix connector testing framework error when compare records in different splits ad052cc is described below commit ad052cc056c7d6e63d8356dbd22d6a98b54743c3 Author: Hang Ruan AuthorDate: Tue Aug 31 15:53:28 2021 +0800 [FLINK-23971][tests] fix connector testing framework error when compare records in different splits Add split index parameter to generate test data, make sure T.equals(object) return false when records come from differernt splits. --- .../testutils/KafkaSingleTopicExternalContext.java | 8 ++++---- .../pulsar/testutils/PulsarTestContext.java | 4 ++-- .../cases/MultipleTopicConsumingContext.java | 4 ++-- .../cases/SingleTopicConsumingContext.java | 4 ++-- .../test/common/external/ExternalContext.java | 6 +++++- .../test/common/testsuites/SourceTestSuiteBase.java | 21 ++++++++++++--------- 6 files changed, 27 insertions(+), 20 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java index 81240cf..ad5e31d 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java @@ -155,7 +155,7 @@ public class KafkaSingleTopicExternalContext implements ExternalContext } @Override - public Collection generateTestData(long seed) { + public Collection generateTestData(int splitIndex, long seed) { Random random = new Random(seed); List randomStringRecords = new ArrayList<>(); int recordNum = @@ -163,15 +163,15 @@ public class KafkaSingleTopicExternalContext implements ExternalContext + NUM_RECORDS_LOWER_BOUND; for (int i = 0; i < recordNum; i++) { int stringLength = random.nextInt(50) + 1; - randomStringRecords.add(generateRandomString(stringLength, random)); + randomStringRecords.add(generateRandomString(splitIndex, stringLength, random)); } return randomStringRecords; } - private String generateRandomString(int length, Random random) { + private String generateRandomString(int splitIndex, int length, Random random) { String alphaNumericString = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789"; - StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder().append(splitIndex).append("-"); for (int i = 0; i < length; ++i) { sb.append(alphaNumericString.charAt(random.nextInt(alphaNumericString.length()))); } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java index 6733439..a80d721 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java @@ -43,7 +43,7 @@ public abstract class PulsarTestContext implements ExternalContext { // Helper methods for generating data. - protected List generateStringTestData(long seed) { + protected List generateStringTestData(int splitIndex, long seed) { Random random = new Random(seed); int recordNum = random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND) @@ -52,7 +52,7 @@ public abstract class PulsarTestContext implements ExternalContext { for (int i = 0; i < recordNum; i++) { int stringLength = random.nextInt(50) + 1; - records.add(randomAlphanumeric(stringLength)); + records.add(splitIndex + "-" + randomAlphanumeric(stringLength)); } return records; diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java index 60a0bfba..7ce676c 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java @@ -98,8 +98,8 @@ public class MultipleTopicConsumingContext extends PulsarTestContext { } @Override - public Collection generateTestData(long seed) { - return generateStringTestData(seed); + public Collection generateTestData(int splitIndex, long seed) { + return generateStringTestData(splitIndex, seed); } @Override diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java index 288ecf3..cb1b582 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java @@ -97,8 +97,8 @@ public class SingleTopicConsumingContext extends PulsarTestContext { } @Override - public Collection generateTestData(long seed) { - return generateStringTestData(seed); + public Collection generateTestData(int splitIndex, long seed) { + return generateStringTestData(splitIndex, seed); } @Override diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java index dacc3c1..5ed06a9 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java @@ -59,10 +59,14 @@ public interface ExternalContext extends Serializable, AutoCloseable { /** * Generate test data. * + *

Make sure that the {@link T#equals(Object)} returns false when the records in different + * splits. + * + * @param splitIndex index of the split. * @param seed Seed for generating random test data set. * @return Collection of generated test data. */ - Collection generateTestData(long seed); + Collection generateTestData(int splitIndex, long seed); /** * Factory for {@link ExternalContext}. diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java index 02e7a41..5aff6e6 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java @@ -41,7 +41,6 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFacto import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; import org.apache.flink.util.CloseableIterator; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestTemplate; @@ -103,7 +102,7 @@ public abstract class SourceTestSuiteBase { throws Exception { // Write test data to external system - final Collection testRecords = generateAndWriteTestData(externalContext); + final Collection testRecords = generateAndWriteTestData(0, externalContext); // Build and execute Flink job StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(); @@ -140,7 +139,7 @@ public abstract class SourceTestSuiteBase { final int splitNumber = 4; final List> testRecordCollections = new ArrayList<>(); for (int i = 0; i < splitNumber; i++) { - testRecordCollections.add(generateAndWriteTestData(externalContext)); + testRecordCollections.add(generateAndWriteTestData(i, externalContext)); } LOG.debug("Build and execute Flink job"); @@ -174,14 +173,13 @@ public abstract class SourceTestSuiteBase { */ @TestTemplate @DisplayName("Test source with at least one idle parallelism") - @Disabled public void testIdleReader(TestEnvironment testEnv, ExternalContext externalContext) throws Exception { final int splitNumber = 4; final List> testRecordCollections = new ArrayList<>(); for (int i = 0; i < splitNumber; i++) { - testRecordCollections.add(generateAndWriteTestData(externalContext)); + testRecordCollections.add(generateAndWriteTestData(i, externalContext)); } try (CloseableIterator resultIterator = @@ -216,9 +214,11 @@ public abstract class SourceTestSuiteBase { ExternalContext externalContext, ClusterControllable controller) throws Exception { + int splitIndex = 0; final Collection testRecordsBeforeFailure = - externalContext.generateTestData(ThreadLocalRandom.current().nextLong()); + externalContext.generateTestData( + splitIndex, ThreadLocalRandom.current().nextLong()); final SourceSplitDataWriter sourceSplitDataWriter = externalContext.createSourceSplitDataWriter(); sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure); @@ -267,7 +267,8 @@ public abstract class SourceTestSuiteBase { Deadline.fromNow(Duration.ofSeconds(30))); final Collection testRecordsAfterFailure = - externalContext.generateTestData(ThreadLocalRandom.current().nextLong()); + externalContext.generateTestData( + splitIndex, ThreadLocalRandom.current().nextLong()); sourceSplitDataWriter.writeRecords(testRecordsAfterFailure); assertThat( @@ -291,9 +292,11 @@ public abstract class SourceTestSuiteBase { * @param externalContext External context * @return Collection of generated test records */ - protected Collection generateAndWriteTestData(ExternalContext externalContext) { + protected Collection generateAndWriteTestData( + int splitIndex, ExternalContext externalContext) { final Collection testRecordCollection = - externalContext.generateTestData(ThreadLocalRandom.current().nextLong()); + externalContext.generateTestData( + splitIndex, ThreadLocalRandom.current().nextLong()); LOG.debug("Writing {} records to external system", testRecordCollection.size()); externalContext.createSourceSplitDataWriter().writeRecords(testRecordCollection); return testRecordCollection;