flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [flink] 06/11: [FLINK-23854][connectors/kafka] Abort transactions on close in KafkaWriter.
Date Wed, 01 Sep 2021 06:29:28 GMT
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fcba0b8007f7083b632c2459fd8f8b0e9cda4532
Author: Arvid Heise <arvid@ververica.com>
AuthorDate: Thu Aug 19 13:58:09 2021 +0200

    [FLINK-23854][connectors/kafka] Abort transactions on close in KafkaWriter.
    
    All transactions that have been opened since last (pre)commit are transient by definition,
so cancel them asap.
---
 .../kafka/sink/FlinkKafkaInternalProducer.java     | 26 +++++++-
 .../flink/connector/kafka/sink/KafkaWriter.java    |  4 ++
 .../connector/kafka/sink/KafkaSinkITCase.java      | 13 +++-
 .../connector/kafka/sink/KafkaWriterITCase.java    | 75 ++++++++++++++++------
 4 files changed, 96 insertions(+), 22 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
index 6866ceb..992bf04 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.internals.TransactionManager;
 import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K,
V> {
 
     private final Properties kafkaProducerConfig;
     @Nullable private final String transactionalId;
+    private volatile boolean inTransaction;
 
     public FlinkKafkaInternalProducer(Properties properties) {
         super(properties);
@@ -57,11 +59,33 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K,
V> {
     @Override
     public void flush() {
         super.flush();
-        if (transactionalId != null) {
+        if (inTransaction) {
             flushNewPartitions();
         }
     }
 
+    @Override
+    public void beginTransaction() throws ProducerFencedException {
+        super.beginTransaction();
+        inTransaction = true;
+    }
+
+    @Override
+    public void abortTransaction() throws ProducerFencedException {
+        super.abortTransaction();
+        inTransaction = false;
+    }
+
+    @Override
+    public void commitTransaction() throws ProducerFencedException {
+        super.commitTransaction();
+        inTransaction = false;
+    }
+
+    public boolean isInTransaction() {
+        return inTransaction;
+    }
+
     public Properties getKafkaProducerConfig() {
         return kafkaProducerConfig;
     }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 3ace69e..f1aa69c 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -184,6 +184,10 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
 
     @Override
     public void close() throws Exception {
+        if (currentProducer.isInTransaction()) {
+            currentProducer.abortTransaction();
+        }
+        currentProducer.flush();
         closed = true;
         currentProducer.close(Duration.ZERO);
     }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index a7995e2..33ce934 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -376,9 +376,10 @@ public class KafkaSinkITCase extends TestLogger {
         return standardProps;
     }
 
-    private Consumer<byte[], byte[]> createTestConsumer(String topic) {
+    private static Consumer<byte[], byte[]> createTestConsumer(
+            String topic, Properties properties) {
         final Properties consumerConfig = new Properties();
-        consumerConfig.putAll(getKafkaClientConfiguration());
+        consumerConfig.putAll(properties);
         consumerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName());
         consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName());
         consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
@@ -403,8 +404,14 @@ public class KafkaSinkITCase extends TestLogger {
     }
 
     private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(String
topic) {
+        Properties properties = getKafkaClientConfiguration();
+        return drainAllRecordsFromTopic(topic, properties);
+    }
+
+    static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
+            String topic, Properties properties) {
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords = new ArrayList<>();
-        try (Consumer<byte[], byte[]> consumer = createTestConsumer(topic)) {
+        try (Consumer<byte[], byte[]> consumer = createTestConsumer(topic, properties))
{
             ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_DURATION);
             // Drain the kafka topic till all records are consumed
             while (!records.isEmpty()) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 063411b..d0b4601 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -35,16 +35,16 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.UserCodeClassLoader;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.hamcrest.MatcherAssert;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.slf4j.Logger;
@@ -58,13 +58,17 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Comparator;
+import java.util.List;
 import java.util.Optional;
-import java.util.PriorityQueue;
 import java.util.OptionalLong;
+import java.util.PriorityQueue;
 import java.util.Properties;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.connector.kafka.sink.KafkaSinkITCase.drainAllRecordsFromTopic;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for the standalone KafkaWriter. */
@@ -76,6 +80,7 @@ public class KafkaWriterITCase extends TestLogger {
     private static final Network NETWORK = Network.newNetwork();
     private static final String KAFKA_METRIC_WITH_GROUP_NAME = "KafkaProducer.incoming-byte-total";
     private static final SinkWriter.Context SINK_WRITER_CONTEXT = new DummySinkWriterContext();
+    private String topic;
 
     private MetricListener metricListener;
     private TriggerTimeService timeService;
@@ -83,16 +88,12 @@ public class KafkaWriterITCase extends TestLogger {
     private static final KafkaContainer KAFKA_CONTAINER =
             new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))
                     .withEmbeddedZookeeper()
+                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
+                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
+                    .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
                     .withEnv(
-                            ImmutableMap.of(
-                                    "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR",
-                                    "1",
-                                    "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
-                                    String.valueOf(Duration.ofHours(2).toMillis()),
-                                    "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR",
-                                    "1",
-                                    "KAFKA_MIN_INSYNC_REPLICAS",
-                                    "1"))
+                            "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
+                            String.valueOf(Duration.ofHours(2).toMillis()))
                     .withNetwork(NETWORK)
                     .withLogConsumer(LOG_CONSUMER)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
@@ -108,9 +109,10 @@ public class KafkaWriterITCase extends TestLogger {
     }
 
     @BeforeEach
-    public void setUp() {
+    public void setUp(TestInfo testInfo) {
         metricListener = new MetricListener();
         timeService = new TriggerTimeService();
+        topic = testInfo.getDisplayName().replaceAll("\\W", "");
     }
 
     @ParameterizedTest
@@ -143,7 +145,7 @@ public class KafkaWriterITCase extends TestLogger {
             Assertions.assertEquals(numBytesOut.getCount(), 0L);
             writer.write(1, SINK_WRITER_CONTEXT);
             timeService.trigger();
-            MatcherAssert.assertThat(numBytesOut.getCount(), greaterThan(0L));
+            assertThat(numBytesOut.getCount(), greaterThan(0L));
         }
     }
 
@@ -173,7 +175,44 @@ public class KafkaWriterITCase extends TestLogger {
                                     throw new RuntimeException("Failed writing Kafka record.");
                                 }
                             });
-            MatcherAssert.assertThat(currentSendTime.get().getValue(), greaterThan(0L));
+            assertThat(currentSendTime.get().getValue(), greaterThan(0L));
+        }
+    }
+
+    /**
+     * Tests that open transactions are automatically aborted on close such that successive
writes
+     * succeed.
+     */
+    @Test
+    void testAbortOnClose() throws Exception {
+        try (final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
+            writer.write(1, SINK_WRITER_CONTEXT);
+            assertThat(drainAllRecordsFromTopic(topic, getKafkaClientConfiguration()), hasSize(0));
+        }
+
+        try (final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
+            writer.write(2, SINK_WRITER_CONTEXT);
+            List<KafkaCommittable> committables = writer.prepareCommit(false);
+            writer.snapshotState(1L);
+
+            // manually commit here, which would only succeed if the first transaction was
aborted
+            assertThat(committables, hasSize(1));
+            Properties properties = getKafkaClientConfiguration();
+            properties.setProperty(
+                    ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+                    committables.get(0).getTransactionalId());
+            try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                    new FlinkKafkaInternalProducer<>(properties)) {
+                producer.resumeTransaction(
+                        committables.get(0).getProducerId(), committables.get(0).getEpoch());
+                producer.commitTransaction();
+            }
+
+            assertThat(drainAllRecordsFromTopic(topic, properties), hasSize(1));
         }
     }
 
@@ -263,15 +302,15 @@ public class KafkaWriterITCase extends TestLogger {
 
         @Override
         public OptionalLong getRestoredCheckpointId() {
-            return OptionalLong.of(0L);
+            return OptionalLong.empty();
         }
     }
 
-    private static class DummyRecordSerializer implements KafkaRecordSerializationSchema<Integer>
{
+    private class DummyRecordSerializer implements KafkaRecordSerializationSchema<Integer>
{
         @Override
         public ProducerRecord<byte[], byte[]> serialize(
                 Integer element, KafkaSinkContext context, Long timestamp) {
-            return new ProducerRecord<>("topic", ByteBuffer.allocate(4).putInt(element).array());
+            return new ProducerRecord<>(topic, ByteBuffer.allocate(4).putInt(element).array());
         }
     }
 

Mime
View raw message