flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [flink] 05/11: [FLINK-23678][tests] Re-enable KafkaSinkITCase and optimize it.
Date Wed, 01 Sep 2021 06:29:27 GMT
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9a51b06981206678123b66d9139f7826c05b1f4
Author: Arvid Heise <arvid@ververica.com>
AuthorDate: Mon Aug 30 21:14:52 2021 +0200

    [FLINK-23678][tests] Re-enable KafkaSinkITCase and optimize it.
---
 .../connector/kafka/sink/KafkaSinkITCase.java      | 91 ++++++++++++----------
 1 file changed, 48 insertions(+), 43 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 614773f..a7995e2 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -48,8 +48,6 @@ import org.apache.flink.testutils.junit.SharedObjects;
 import org.apache.flink.testutils.junit.SharedReference;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
-
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
@@ -63,9 +61,10 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -101,6 +100,7 @@ import java.util.stream.LongStream;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -115,6 +115,7 @@ public class KafkaSinkITCase extends TestLogger {
     private static final int ZK_TIMEOUT_MILLIS = 30000;
     private static final short TOPIC_REPLICATION_FACTOR = 1;
     private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1);
+    private static AdminClient admin;
 
     private String topic;
     private SharedReference<AtomicLong> emittedRecordsCount;
@@ -126,16 +127,12 @@ public class KafkaSinkITCase extends TestLogger {
     public static final KafkaContainer KAFKA_CONTAINER =
             new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))
                     .withEmbeddedZookeeper()
+                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
+                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
+                    .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
                     .withEnv(
-                            ImmutableMap.of(
-                                    "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR",
-                                    "1",
-                                    "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
-                                    String.valueOf(Duration.ofHours(2).toMillis()),
-                                    "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR",
-                                    "1",
-                                    "KAFKA_MIN_INSYNC_REPLICAS",
-                                    "1"))
+                            "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
+                            String.valueOf(Duration.ofHours(2).toMillis()))
                     .withNetwork(NETWORK)
                     .withLogConsumer(LOG_CONSUMER)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
@@ -144,6 +141,20 @@ public class KafkaSinkITCase extends TestLogger {
 
     @Rule public final TemporaryFolder temp = new TemporaryFolder();
 
+    @BeforeClass
+    public static void setupAdmin() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put(
+                CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                KAFKA_CONTAINER.getBootstrapServers());
+        admin = AdminClient.create(properties);
+    }
+
+    @AfterClass
+    public static void teardownAdmin() {
+        admin.close();
+    }
+
     @Before
     public void setUp() throws ExecutionException, InterruptedException, TimeoutException
{
         emittedRecordsCount = sharedObjects.add(new AtomicLong());
@@ -170,7 +181,6 @@ public class KafkaSinkITCase extends TestLogger {
     }
 
     @Test
-    @Ignore
     public void testWriteRecordsToKafkaWithExactlyOnceGuarantee() throws Exception {
         writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, emittedRecordsWithCheckpoint);
     }
@@ -226,11 +236,12 @@ public class KafkaSinkITCase extends TestLogger {
                 new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, "newPrefix");
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
                 drainAllRecordsFromTopic(topic);
-        assertEquals(
+        assertThat(
                 deserializeValues(collectedRecords),
-                LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
-                        .boxed()
-                        .collect(Collectors.toList()));
+                contains(
+                        LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
+                                .boxed()
+                                .toArray()));
     }
 
     @Test
@@ -254,11 +265,12 @@ public class KafkaSinkITCase extends TestLogger {
                 new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, null);
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
                 drainAllRecordsFromTopic(topic);
-        assertEquals(
+        assertThat(
                 deserializeValues(collectedRecords),
-                LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
-                        .boxed()
-                        .collect(Collectors.toList()));
+                contains(
+                        LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
+                                .boxed()
+                                .toArray()));
     }
 
     private void executeWithMapper(
@@ -312,7 +324,9 @@ public class KafkaSinkITCase extends TestLogger {
     private void writeRecordsToKafka(
             DeliveryGuarantee deliveryGuarantee, SharedReference<AtomicLong> expectedRecords)
             throws Exception {
-        final StreamExecutionEnvironment env = new LocalStreamEnvironment();
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        final StreamExecutionEnvironment env = new LocalStreamEnvironment(config);
         env.enableCheckpointing(100L);
         final DataStream<Long> source =
                 env.addSource(
@@ -331,9 +345,9 @@ public class KafkaSinkITCase extends TestLogger {
                 drainAllRecordsFromTopic(topic);
         final long recordsCount = expectedRecords.get().get();
         assertEquals(collectedRecords.size(), recordsCount);
-        assertEquals(
+        assertThat(
                 deserializeValues(collectedRecords),
-                LongStream.range(1, recordsCount + 1).boxed().collect(Collectors.toList()));
+                contains(LongStream.range(1, recordsCount + 1).boxed().toArray()));
         checkProducerLeak();
     }
 
@@ -375,29 +389,17 @@ public class KafkaSinkITCase extends TestLogger {
 
     private void createTestTopic(String topic, int numPartitions, short replicationFactor)
             throws ExecutionException, InterruptedException, TimeoutException {
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(
-                CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
-                KAFKA_CONTAINER.getBootstrapServers());
-        try (AdminClient admin = AdminClient.create(properties)) {
-            final CreateTopicsResult result =
-                    admin.createTopics(
-                            Collections.singletonList(
-                                    new NewTopic(topic, numPartitions, replicationFactor)));
-            result.all().get(1, TimeUnit.MINUTES);
-        }
+        final CreateTopicsResult result =
+                admin.createTopics(
+                        Collections.singletonList(
+                                new NewTopic(topic, numPartitions, replicationFactor)));
+        result.all().get(1, TimeUnit.MINUTES);
     }
 
     private void deleteTestTopic(String topic)
             throws ExecutionException, InterruptedException, TimeoutException {
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(
-                CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
-                KAFKA_CONTAINER.getBootstrapServers());
-        try (AdminClient admin = AdminClient.create(properties)) {
-            final DeleteTopicsResult result = admin.deleteTopics(Collections.singletonList(topic));
-            result.all().get(1, TimeUnit.MINUTES);
-        }
+        final DeleteTopicsResult result = admin.deleteTopics(Collections.singletonList(topic));
+        result.all().get(1, TimeUnit.MINUTES);
     }
 
     private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(String
topic) {
@@ -563,6 +565,7 @@ public class KafkaSinkITCase extends TestLogger {
 
         @Override
         public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            LOG.info("notifyCheckpointComplete {} @ {}", checkpointedRecord, checkpointId);
             lastCheckpointId = checkpointId;
             emittedBetweenCheckpoint.set(0);
             lastCheckpointedRecord.get().set(checkpointedRecord);
@@ -570,6 +573,7 @@ public class KafkaSinkITCase extends TestLogger {
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            LOG.info("snapshotState {} @ {}", lastSeenRecord, context.getCheckpointId());
             checkpointedRecord = lastSeenRecord;
         }
 
@@ -612,6 +616,7 @@ public class KafkaSinkITCase extends TestLogger {
             while (running) {
                 synchronized (lock) {
                     ctx.collect(emittedRecordsCount.get().addAndGet(1));
+                    Thread.sleep(1);
                 }
             }
         }

Mime
View raw message