flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [flink] 09/11: [FLINK-23854][connectors/kafka] Add FlinkKafkaInternalProducer#setTransactionalId.
Date Wed, 01 Sep 2021 06:29:31 GMT
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31fdc2328f67c1551497443b07e33963d364f121
Author: Arvid Heise <arvid@ververica.com>
AuthorDate: Fri Aug 27 08:46:18 2021 +0200

    [FLINK-23854][connectors/kafka] Add FlinkKafkaInternalProducer#setTransactionalId.
    
    This will allow reuse of the producer in the next commit which will also include FlinkKafkaInternalProducerITCase.
---
 .../kafka/sink/FlinkKafkaInternalProducer.java     | 60 +++++++++++++++++-----
 .../connector/kafka/sink/KafkaCommittable.java     |  5 +-
 .../flink/connector/kafka/sink/KafkaCommitter.java | 10 +---
 .../flink/connector/kafka/sink/KafkaWriter.java    | 38 ++++----------
 .../connector/kafka/sink/KafkaWriterITCase.java    | 17 +++---
 5 files changed, 66 insertions(+), 64 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
index d3b6667..dadee58 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.util.Preconditions;
-
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.internals.TransactionManager;
@@ -36,6 +34,8 @@ import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.Properties;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A {@link KafkaProducer} that exposes private fields to allow resume producing from a given
state.
  */
@@ -47,14 +47,23 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K,
V> {
             "org.apache.kafka.clients.producer.internals.TransactionManager$State";
     private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";
 
-    private final Properties kafkaProducerConfig;
-    @Nullable private final String transactionalId;
+    @Nullable private String transactionalId;
     private volatile boolean inTransaction;
 
-    public FlinkKafkaInternalProducer(Properties properties) {
-        super(properties);
-        this.kafkaProducerConfig = properties;
-        this.transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+    public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId)
{
+        super(withTransactionalId(properties, transactionalId));
+        this.transactionalId = transactionalId;
+    }
+
+    private static Properties withTransactionalId(
+            Properties properties, @Nullable String transactionalId) {
+        if (transactionalId == null) {
+            return properties;
+        }
+        Properties props = new Properties();
+        props.putAll(properties);
+        props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        return props;
     }
 
     @Override
@@ -92,8 +101,9 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K,
V> {
         super.close(Duration.ZERO);
     }
 
-    public Properties getKafkaProducerConfig() {
-        return kafkaProducerConfig;
+    @Nullable
+    public String getTransactionalId() {
+        return transactionalId;
     }
 
     public short getEpoch() {
@@ -108,6 +118,28 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K,
V> {
         return (long) getField(producerIdAndEpoch, "producerId");
     }
 
+    public void initTransactionId(String transactionalId) {
+        if (!transactionalId.equals(this.transactionalId)) {
+            setTransactionId(transactionalId);
+            initTransactions();
+        }
+    }
+
+    public void setTransactionId(String transactionalId) {
+        if (!transactionalId.equals(this.transactionalId)) {
+            checkState(!inTransaction);
+            Object transactionManager = getTransactionManager();
+            synchronized (transactionManager) {
+                setField(transactionManager, "transactionalId", transactionalId);
+                setField(
+                        transactionManager,
+                        "currentState",
+                        getTransactionManagerState("UNINITIALIZED"));
+                this.transactionalId = transactionalId;
+            }
+        }
+    }
+
     /**
      * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction}
      * is also adding new partitions to the transaction. flushNewPartitions method is moving
this
@@ -214,7 +246,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K,
V> {
      * https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630
      */
     public void resumeTransaction(long producerId, short epoch) {
-        Preconditions.checkState(
+        checkState(
                 producerId >= 0 && epoch >= 0,
                 "Incorrect values for producerId %s and epoch %s",
                 producerId,
@@ -267,8 +299,12 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K,
V> {
      * reflection.
      */
     private static void setField(Object object, String fieldName, Object value) {
+        setField(object, object.getClass(), fieldName, value);
+    }
+
+    private static void setField(Object object, Class<?> clazz, String fieldName, Object
value) {
         try {
-            Field field = object.getClass().getDeclaredField(fieldName);
+            Field field = clazz.getDeclaredField(fieldName);
             field.setAccessible(true);
             field.set(object, value);
         } catch (NoSuchFieldException | IllegalAccessException e) {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
index e8825fd..38579a3 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.kafka.clients.producer.ProducerConfig;
-
 import javax.annotation.Nullable;
 
 import java.util.Objects;
@@ -50,8 +48,7 @@ class KafkaCommittable {
         return new KafkaCommittable(
                 producer.getProducerId(),
                 producer.getEpoch(),
-                producer.getKafkaProducerConfig()
-                        .getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG),
+                producer.getTransactionalId(),
                 producer);
     }
 
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
index f4c852a..d9c0d29 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
@@ -19,7 +19,6 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.api.connector.sink.Committer;
 
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.slf4j.Logger;
@@ -79,15 +78,8 @@ class KafkaCommitter implements Committer<KafkaCommittable> {
     private FlinkKafkaInternalProducer<?, ?> createProducer(KafkaCommittable committable)
{
         FlinkKafkaInternalProducer<?, ?> producer =
                 new FlinkKafkaInternalProducer<>(
-                        createKafkaProducerConfig(committable.getTransactionalId()));
+                        kafkaProducerConfig, committable.getTransactionalId());
         producer.resumeTransaction(committable.getProducerId(), committable.getEpoch());
         return producer;
     }
-
-    private Properties createKafkaProducerConfig(String transactionalId) {
-        final Properties copy = new Properties();
-        copy.putAll(kafkaProducerConfig);
-        copy.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-        return copy;
-    }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index e221b6f..ae2c922 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -33,7 +33,6 @@ import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -229,13 +228,11 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
                     // -> create an internal kafka producer on our own and do not rely
                     // on
                     //    initTransactionalProducer().
-                    final Properties myConfig = new Properties();
-                    myConfig.putAll(kafkaProducerConfig);
-                    myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transaction);
                     LOG.info("Aborting Kafka transaction {}.", transaction);
                     FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = null;
                     try {
-                        kafkaProducer = new FlinkKafkaInternalProducer<>(myConfig);
+                        kafkaProducer =
+                                new FlinkKafkaInternalProducer<>(kafkaProducerConfig,
transaction);
                         // it suffices to call initTransactions - this will abort any
                         // lingering transactions
                         kafkaProducer.initTransactions();
@@ -276,7 +273,7 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
                     return currentProducer;
                 }
                 final FlinkKafkaInternalProducer<byte[], byte[]> producer =
-                        new FlinkKafkaInternalProducer<>(kafkaProducerConfig);
+                        new FlinkKafkaInternalProducer<>(kafkaProducerConfig, null);
                 initMetrics(producer);
                 closer.register(producer);
                 return producer;
@@ -336,38 +333,23 @@ class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable,
KafkaWriterSta
      */
     private FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer()
{
         final long transactionalIdOffset = kafkaWriterState.getTransactionalIdOffset() +
1;
-        final Properties copiedProducerConfig = new Properties();
-        copiedProducerConfig.putAll(kafkaProducerConfig);
-        initTransactionalProducerConfig(
-                copiedProducerConfig,
-                transactionalIdOffset,
-                transactionalIdPrefix,
-                kafkaSinkContext.getParallelInstanceId());
+        String transactionalId =
+                TransactionalIdFactory.buildTransactionalId(
+                        transactionalIdPrefix,
+                        kafkaSinkContext.getParallelInstanceId(),
+                        transactionalIdOffset);
         final FlinkKafkaInternalProducer<byte[], byte[]> producer =
-                new FlinkKafkaInternalProducer<>(copiedProducerConfig);
+                new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId);
         producer.initTransactions();
         kafkaWriterState =
                 new KafkaWriterState(
                         transactionalIdPrefix,
                         kafkaSinkContext.getParallelInstanceId(),
                         transactionalIdOffset);
-        LOG.info(
-                "Created new transactional producer {}",
-                copiedProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+        LOG.info("Created new transactional producer {}", transactionalId);
         return producer;
     }
 
-    private static void initTransactionalProducerConfig(
-            Properties producerConfig,
-            long transactionalIdOffset,
-            String transactionalIdPrefix,
-            int subtaskId) {
-        producerConfig.put(
-                ProducerConfig.TRANSACTIONAL_ID_CONFIG,
-                TransactionalIdFactory.buildTransactionalId(
-                        transactionalIdPrefix, subtaskId, transactionalIdOffset));
-    }
-
     private void initMetrics(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
         byteOutMetric =
                 MetricUtil.getKafkaMetric(
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index d0b4601..1a75cca 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -36,7 +36,6 @@ import org.apache.flink.util.UserCodeClassLoader;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.junit.jupiter.api.AfterAll;
@@ -185,28 +184,24 @@ public class KafkaWriterITCase extends TestLogger {
      */
     @Test
     void testAbortOnClose() throws Exception {
+        Properties properties = getKafkaClientConfiguration();
         try (final KafkaWriter<Integer> writer =
-                createWriterWithConfiguration(
-                        getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
+                createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE))
{
             writer.write(1, SINK_WRITER_CONTEXT);
-            assertThat(drainAllRecordsFromTopic(topic, getKafkaClientConfiguration()), hasSize(0));
+            assertThat(drainAllRecordsFromTopic(topic, properties), hasSize(0));
         }
 
         try (final KafkaWriter<Integer> writer =
-                createWriterWithConfiguration(
-                        getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
+                createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE))
{
             writer.write(2, SINK_WRITER_CONTEXT);
             List<KafkaCommittable> committables = writer.prepareCommit(false);
             writer.snapshotState(1L);
 
             // manually commit here, which would only succeed if the first transaction was
aborted
             assertThat(committables, hasSize(1));
-            Properties properties = getKafkaClientConfiguration();
-            properties.setProperty(
-                    ProducerConfig.TRANSACTIONAL_ID_CONFIG,
-                    committables.get(0).getTransactionalId());
+            String transactionalId = committables.get(0).getTransactionalId();
             try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
-                    new FlinkKafkaInternalProducer<>(properties)) {
+                    new FlinkKafkaInternalProducer<>(properties, transactionalId))
{
                 producer.resumeTransaction(
                         committables.get(0).getProducerId(), committables.get(0).getEpoch());
                 producer.commitTransaction();

Mime
View raw message