flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [flink] branch master updated: [FLINK-23971][tests] fix connector testing framework error when compare records in different splits
Date Wed, 01 Sep 2021 09:42:43 GMT
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ad052cc  [FLINK-23971][tests] fix connector testing framework error when compare
records in different splits
ad052cc is described below

commit ad052cc056c7d6e63d8356dbd22d6a98b54743c3
Author: Hang Ruan <ruanhang1993@hotmail.com>
AuthorDate: Tue Aug 31 15:53:28 2021 +0800

    [FLINK-23971][tests] fix connector testing framework error when compare records in different
splits
    
    Add split index parameter to generate test data, make sure T.equals(object) return false
when records come from differernt splits.
---
 .../testutils/KafkaSingleTopicExternalContext.java  |  8 ++++----
 .../pulsar/testutils/PulsarTestContext.java         |  4 ++--
 .../cases/MultipleTopicConsumingContext.java        |  4 ++--
 .../cases/SingleTopicConsumingContext.java          |  4 ++--
 .../test/common/external/ExternalContext.java       |  6 +++++-
 .../test/common/testsuites/SourceTestSuiteBase.java | 21 ++++++++++++---------
 6 files changed, 27 insertions(+), 20 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
index 81240cf..ad5e31d 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
@@ -155,7 +155,7 @@ public class KafkaSingleTopicExternalContext implements ExternalContext<String>
     }
 
     @Override
-    public Collection<String> generateTestData(long seed) {
+    public Collection<String> generateTestData(int splitIndex, long seed) {
         Random random = new Random(seed);
         List<String> randomStringRecords = new ArrayList<>();
         int recordNum =
@@ -163,15 +163,15 @@ public class KafkaSingleTopicExternalContext implements ExternalContext<String>
                         + NUM_RECORDS_LOWER_BOUND;
         for (int i = 0; i < recordNum; i++) {
             int stringLength = random.nextInt(50) + 1;
-            randomStringRecords.add(generateRandomString(stringLength, random));
+            randomStringRecords.add(generateRandomString(splitIndex, stringLength, random));
         }
         return randomStringRecords;
     }
 
-    private String generateRandomString(int length, Random random) {
+    private String generateRandomString(int splitIndex, int length, Random random) {
         String alphaNumericString =
                 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789";
-        StringBuilder sb = new StringBuilder();
+        StringBuilder sb = new StringBuilder().append(splitIndex).append("-");
         for (int i = 0; i < length; ++i) {
             sb.append(alphaNumericString.charAt(random.nextInt(alphaNumericString.length())));
         }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index 6733439..a80d721 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -43,7 +43,7 @@ public abstract class PulsarTestContext<T> implements ExternalContext<T>
{
 
     // Helper methods for generating data.
 
-    protected List<String> generateStringTestData(long seed) {
+    protected List<String> generateStringTestData(int splitIndex, long seed) {
         Random random = new Random(seed);
         int recordNum =
                 random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
@@ -52,7 +52,7 @@ public abstract class PulsarTestContext<T> implements ExternalContext<T>
{
 
         for (int i = 0; i < recordNum; i++) {
             int stringLength = random.nextInt(50) + 1;
-            records.add(randomAlphanumeric(stringLength));
+            records.add(splitIndex + "-" + randomAlphanumeric(stringLength));
         }
 
         return records;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
index 60a0bfba..7ce676c 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
@@ -98,8 +98,8 @@ public class MultipleTopicConsumingContext extends PulsarTestContext<String>
{
     }
 
     @Override
-    public Collection<String> generateTestData(long seed) {
-        return generateStringTestData(seed);
+    public Collection<String> generateTestData(int splitIndex, long seed) {
+        return generateStringTestData(splitIndex, seed);
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
index 288ecf3..cb1b582 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
@@ -97,8 +97,8 @@ public class SingleTopicConsumingContext extends PulsarTestContext<String>
{
     }
 
     @Override
-    public Collection<String> generateTestData(long seed) {
-        return generateStringTestData(seed);
+    public Collection<String> generateTestData(int splitIndex, long seed) {
+        return generateStringTestData(splitIndex, seed);
     }
 
     @Override
diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
index dacc3c1..5ed06a9 100644
--- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
+++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
@@ -59,10 +59,14 @@ public interface ExternalContext<T> extends Serializable, AutoCloseable
{
     /**
      * Generate test data.
      *
+     * <p>Make sure that the {@link T#equals(Object)} returns false when the records
in different
+     * splits.
+     *
+     * @param splitIndex index of the split.
      * @param seed Seed for generating random test data set.
      * @return Collection of generated test data.
      */
-    Collection<T> generateTestData(long seed);
+    Collection<T> generateTestData(int splitIndex, long seed);
 
     /**
      * Factory for {@link ExternalContext}.
diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
index 02e7a41..5aff6e6 100644
--- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
+++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
@@ -41,7 +41,6 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFacto
 import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
 import org.apache.flink.util.CloseableIterator;
 
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestTemplate;
@@ -103,7 +102,7 @@ public abstract class SourceTestSuiteBase<T> {
             throws Exception {
 
         // Write test data to external system
-        final Collection<T> testRecords = generateAndWriteTestData(externalContext);
+        final Collection<T> testRecords = generateAndWriteTestData(0, externalContext);
 
         // Build and execute Flink job
         StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment();
@@ -140,7 +139,7 @@ public abstract class SourceTestSuiteBase<T> {
         final int splitNumber = 4;
         final List<Collection<T>> testRecordCollections = new ArrayList<>();
         for (int i = 0; i < splitNumber; i++) {
-            testRecordCollections.add(generateAndWriteTestData(externalContext));
+            testRecordCollections.add(generateAndWriteTestData(i, externalContext));
         }
 
         LOG.debug("Build and execute Flink job");
@@ -174,14 +173,13 @@ public abstract class SourceTestSuiteBase<T> {
      */
     @TestTemplate
     @DisplayName("Test source with at least one idle parallelism")
-    @Disabled
     public void testIdleReader(TestEnvironment testEnv, ExternalContext<T> externalContext)
             throws Exception {
 
         final int splitNumber = 4;
         final List<Collection<T>> testRecordCollections = new ArrayList<>();
         for (int i = 0; i < splitNumber; i++) {
-            testRecordCollections.add(generateAndWriteTestData(externalContext));
+            testRecordCollections.add(generateAndWriteTestData(i, externalContext));
         }
 
         try (CloseableIterator<T> resultIterator =
@@ -216,9 +214,11 @@ public abstract class SourceTestSuiteBase<T> {
             ExternalContext<T> externalContext,
             ClusterControllable controller)
             throws Exception {
+        int splitIndex = 0;
 
         final Collection<T> testRecordsBeforeFailure =
-                externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
+                externalContext.generateTestData(
+                        splitIndex, ThreadLocalRandom.current().nextLong());
         final SourceSplitDataWriter<T> sourceSplitDataWriter =
                 externalContext.createSourceSplitDataWriter();
         sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
@@ -267,7 +267,8 @@ public abstract class SourceTestSuiteBase<T> {
                 Deadline.fromNow(Duration.ofSeconds(30)));
 
         final Collection<T> testRecordsAfterFailure =
-                externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
+                externalContext.generateTestData(
+                        splitIndex, ThreadLocalRandom.current().nextLong());
         sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);
 
         assertThat(
@@ -291,9 +292,11 @@ public abstract class SourceTestSuiteBase<T> {
      * @param externalContext External context
      * @return Collection of generated test records
      */
-    protected Collection<T> generateAndWriteTestData(ExternalContext<T> externalContext)
{
+    protected Collection<T> generateAndWriteTestData(
+            int splitIndex, ExternalContext<T> externalContext) {
         final Collection<T> testRecordCollection =
-                externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
+                externalContext.generateTestData(
+                        splitIndex, ThreadLocalRandom.current().nextLong());
         LOG.debug("Writing {} records to external system", testRecordCollection.size());
         externalContext.createSourceSplitDataWriter().writeRecords(testRecordCollection);
         return testRecordCollection;

Mime
View raw message