flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [flink] 07/11: [FLINK-23854][connectors/kafka] Transfer KafkaProducer from writer to committer.
Date Wed, 01 Sep 2021 06:29:29 GMT
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca63128a849905f3bcb69ee4e2f908a3a5d10dd8
Author: Arvid Heise <arvid@ververica.com>
AuthorDate: Thu Aug 19 14:06:42 2021 +0200

    [FLINK-23854][connectors/kafka] Transfer KafkaProducer from writer to committer.
---
 .../kafka/sink/FlinkKafkaInternalProducer.java     |  6 +++
 .../connector/kafka/sink/KafkaCommittable.java     | 18 +++++++-
 .../kafka/sink/KafkaCommittableSerializer.java     |  2 +-
 .../flink/connector/kafka/sink/KafkaCommitter.java | 15 +++++-
 .../flink/connector/kafka/sink/KafkaWriter.java    | 54 ++++++++++------------
 .../kafka/sink/KafkaCommittableSerializerTest.java |  2 +-
 6 files changed, 61 insertions(+), 36 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
index 992bf04..d3b6667 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
@@ -33,6 +33,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.time.Duration;
 import java.util.Properties;
 
 /**
@@ -86,6 +87,11 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K,
V> {
         return inTransaction;
     }
 
+    @Override
+    public void close() {
+        super.close(Duration.ZERO);
+    }
+
     public Properties getKafkaProducerConfig() {
         return kafkaProducerConfig;
     }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
index 0d5bdb2..e8825fd 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
@@ -19,7 +19,10 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 
+import javax.annotation.Nullable;
+
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * This class holds the necessary information to construct a new {@link FlinkKafkaInternalProducer}
@@ -30,11 +33,17 @@ class KafkaCommittable {
     private final long producerId;
     private final short epoch;
     private final String transactionalId;
+    @Nullable private FlinkKafkaInternalProducer<?, ?> producer;
 
-    public KafkaCommittable(long producerId, short epoch, String transactionalId) {
+    public KafkaCommittable(
+            long producerId,
+            short epoch,
+            String transactionalId,
+            @Nullable FlinkKafkaInternalProducer<?, ?> producer) {
         this.producerId = producerId;
         this.epoch = epoch;
         this.transactionalId = transactionalId;
+        this.producer = producer;
     }
 
     public static KafkaCommittable of(FlinkKafkaInternalProducer<byte[], byte[]> producer)
{
@@ -42,7 +51,8 @@ class KafkaCommittable {
                 producer.getProducerId(),
                 producer.getEpoch(),
                 producer.getKafkaProducerConfig()
-                        .getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+                        .getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG),
+                producer);
     }
 
     public long getProducerId() {
@@ -57,6 +67,10 @@ class KafkaCommittable {
         return transactionalId;
     }
 
+    public Optional<FlinkKafkaInternalProducer<?, ?>> getProducer() {
+        return Optional.ofNullable(producer);
+    }
+
     @Override
     public String toString() {
         return "KafkaCommittable{"
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializer.java
index 7426991..78f1472 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializer.java
@@ -51,7 +51,7 @@ class KafkaCommittableSerializer implements SimpleVersionedSerializer<KafkaCommi
             final short epoch = in.readShort();
             final long producerId = in.readLong();
             final String transactionalId = in.readUTF();
-            return new KafkaCommittable(producerId, epoch, transactionalId);
+            return new KafkaCommittable(producerId, epoch, transactionalId, null);
         }
     }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
index 4263656..705ae2f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
@@ -58,8 +58,7 @@ class KafkaCommitter implements Committer<KafkaCommittable> {
         final String transactionalId = committable.getTransactionalId();
         LOG.debug("Committing Kafka transaction {}", transactionalId);
         try (FlinkKafkaInternalProducer<?, ?> producer =
-                new FlinkKafkaInternalProducer<>(createKafkaProducerConfig(transactionalId)))
{
-            producer.resumeTransaction(committable.getProducerId(), committable.getEpoch());
+                committable.getProducer().orElseGet(() -> createProducer(committable)))
{
             producer.commitTransaction();
         } catch (InvalidTxnStateException | ProducerFencedException e) {
             // That means we have committed this transaction before.
@@ -71,6 +70,18 @@ class KafkaCommitter implements Committer<KafkaCommittable> {
         }
     }
 
+    /**
+     * Creates a producer that can commit into the same transaction as the upstream producer
that
+     * was serialized into {@link KafkaCommittable}.
+     */
+    private FlinkKafkaInternalProducer<?, ?> createProducer(KafkaCommittable committable)
{
+        FlinkKafkaInternalProducer<?, ?> producer =
+                new FlinkKafkaInternalProducer<>(
+                        createKafkaProducerConfig(committable.getTransactionalId()));
+        producer.resumeTransaction(committable.getProducerId(), committable.getEpoch());
+        return producer;
+    }
+
     private Properties createKafkaProducerConfig(String transactionalId) {
         final Properties copy = new Properties();
         copy.putAll(kafkaProducerConfig);
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index f1aa69c..e221b6f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetric
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
@@ -80,17 +81,17 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
     private final Callback deliveryCallback;
     private final AtomicLong pendingRecords = new AtomicLong();
     private final KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext;
-    private final List<FlinkKafkaInternalProducer<byte[], byte[]>> producers
= new ArrayList<>();
     private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics =
new HashMap<>();
     private final SinkWriterMetricGroup metricGroup;
     private final Counter numBytesOutCounter;
     private final Sink.ProcessingTimeService timeService;
     private final boolean disabledMetrics;
 
-    private transient Metric byteOutMetric;
-    private transient FlinkKafkaInternalProducer<byte[], byte[]> currentProducer;
-    private transient KafkaWriterState kafkaWriterState;
-    @Nullable private transient volatile Exception producerAsyncException;
+    private Metric byteOutMetric;
+    private FlinkKafkaInternalProducer<byte[], byte[]> currentProducer;
+    private KafkaWriterState kafkaWriterState;
+    private final Closer closer = Closer.create();
+    @Nullable private volatile Exception producerAsyncException;
 
     private boolean closed = false;
     private long lastSync = System.currentTimeMillis();
@@ -152,8 +153,7 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
         }
         this.kafkaWriterState =
                 recoverAndInitializeState(checkNotNull(recoveredStates, "recoveredStates"));
-        this.currentProducer = beginTransaction();
-        producers.add(currentProducer);
+        this.currentProducer = createProducer();
         registerMetricSync();
     }
 
@@ -167,13 +167,10 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
     }
 
     @Override
-    public List<KafkaCommittable> prepareCommit(boolean flush) throws IOException {
+    public List<KafkaCommittable> prepareCommit(boolean flush) {
         flushRecords(flush);
-        if (!flush) {
-            currentProducer = beginTransaction();
-        }
-        final List<KafkaCommittable> committables = commit();
-        producers.add(currentProducer);
+        List<KafkaCommittable> committables = precommit();
+        currentProducer = createProducer();
         return committables;
     }
 
@@ -189,7 +186,7 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
         }
         currentProducer.flush();
         closed = true;
-        currentProducer.close(Duration.ZERO);
+        closer.close();
     }
 
     private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> recoveredStates)
{
@@ -263,27 +260,26 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
         }
     }
 
-    private FlinkKafkaInternalProducer<byte[], byte[]> beginTransaction() {
+    private FlinkKafkaInternalProducer<byte[], byte[]> createProducer() {
         switch (deliveryGuarantee) {
             case EXACTLY_ONCE:
-                if (currentProducer != null) {
-                    currentProducer.close(Duration.ZERO);
-                }
                 final FlinkKafkaInternalProducer<byte[], byte[]> transactionalProducer
=
                         createTransactionalProducer();
                 initMetrics(transactionalProducer);
                 transactionalProducer.beginTransaction();
+                closer.register(transactionalProducer);
                 return transactionalProducer;
             case AT_LEAST_ONCE:
             case NONE:
-                if (currentProducer == null) {
-                    final FlinkKafkaInternalProducer<byte[], byte[]> producer =
-                            new FlinkKafkaInternalProducer<>(kafkaProducerConfig);
-                    initMetrics(producer);
-                    return producer;
+                if (currentProducer != null) {
+                    LOG.debug("Reusing existing KafkaProducer");
+                    return currentProducer;
                 }
-                LOG.debug("Reusing existing KafkaProducer");
-                return currentProducer;
+                final FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                        new FlinkKafkaInternalProducer<>(kafkaProducerConfig);
+                initMetrics(producer);
+                closer.register(producer);
+                return producer;
             default:
                 throw new UnsupportedOperationException(
                         "Unsupported Kafka writer semantic " + deliveryGuarantee);
@@ -315,17 +311,15 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
         checkErroneous();
     }
 
-    private List<KafkaCommittable> commit() {
+    private List<KafkaCommittable> precommit() {
         final List<KafkaCommittable> committables;
         switch (deliveryGuarantee) {
             case EXACTLY_ONCE:
-                committables =
-                        producers.stream().map(KafkaCommittable::of).collect(Collectors.toList());
-                producers.clear();
+                committables = Collections.singletonList(KafkaCommittable.of(currentProducer));
                 break;
             case AT_LEAST_ONCE:
             case NONE:
-                committables = new ArrayList<>();
+                committables = Collections.emptyList();
                 break;
             default:
                 throw new UnsupportedOperationException(
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializerTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializerTest.java
index 012af07..bcf06e9 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializerTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializerTest.java
@@ -37,7 +37,7 @@ public class KafkaCommittableSerializerTest extends TestLogger {
     public void testCommittableSerDe() throws IOException {
         final String transactionalId = "test-id";
         final short epoch = 5;
-        final KafkaCommittable committable = new KafkaCommittable(1L, epoch, transactionalId);
+        final KafkaCommittable committable = new KafkaCommittable(1L, epoch, transactionalId,
null);
         final byte[] serialized = SERIALIZER.serialize(committable);
         assertEquals(committable, SERIALIZER.deserialize(1, serialized));
     }

Mime
View raw message