From commits-return-43051-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Sep 1 06:29:38 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 17744180634 for ; Wed, 1 Sep 2021 08:29:38 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 970D040054 for ; Wed, 1 Sep 2021 06:29:29 +0000 (UTC) Received: (qmail 69891 invoked by uid 500); 1 Sep 2021 06:29:29 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 69841 invoked by uid 99); 1 Sep 2021 06:29:29 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Sep 2021 06:29:29 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 2EA7581EFC; Wed, 1 Sep 2021 06:29:29 +0000 (UTC) Date: Wed, 01 Sep 2021 06:29:31 +0000 To: "commits@flink.apache.org" Subject: [flink] 09/11: [FLINK-23854][connectors/kafka] Add FlinkKafkaInternalProducer#setTransactionalId. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: arvid@apache.org In-Reply-To: <163047776084.23457.752556586781185374@gitbox.apache.org> References: <163047776084.23457.752556586781185374@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: flink X-Git-Refname: refs/heads/release-1.14 X-Git-Reftype: branch X-Git-Rev: 31fdc2328f67c1551497443b07e33963d364f121 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210901062929.2EA7581EFC@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git commit 31fdc2328f67c1551497443b07e33963d364f121 Author: Arvid Heise AuthorDate: Fri Aug 27 08:46:18 2021 +0200 [FLINK-23854][connectors/kafka] Add FlinkKafkaInternalProducer#setTransactionalId. This will allow reuse of the producer in the next commit which will also include FlinkKafkaInternalProducerITCase. --- .../kafka/sink/FlinkKafkaInternalProducer.java | 60 +++++++++++++++++----- .../connector/kafka/sink/KafkaCommittable.java | 5 +- .../flink/connector/kafka/sink/KafkaCommitter.java | 10 +--- .../flink/connector/kafka/sink/KafkaWriter.java | 38 ++++---------- .../connector/kafka/sink/KafkaWriterITCase.java | 17 +++--- 5 files changed, 66 insertions(+), 64 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java index d3b6667..dadee58 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java @@ -17,8 +17,6 @@ package org.apache.flink.connector.kafka.sink; -import org.apache.flink.util.Preconditions; - import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.internals.TransactionManager; @@ -36,6 +34,8 @@ import java.lang.reflect.Method; import java.time.Duration; import java.util.Properties; +import static org.apache.flink.util.Preconditions.checkState; + /** * A {@link KafkaProducer} that exposes private fields to allow resume producing from a given state. */ @@ -47,14 +47,23 @@ class FlinkKafkaInternalProducer extends KafkaProducer { "org.apache.kafka.clients.producer.internals.TransactionManager$State"; private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch"; - private final Properties kafkaProducerConfig; - @Nullable private final String transactionalId; + @Nullable private String transactionalId; private volatile boolean inTransaction; - public FlinkKafkaInternalProducer(Properties properties) { - super(properties); - this.kafkaProducerConfig = properties; - this.transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) { + super(withTransactionalId(properties, transactionalId)); + this.transactionalId = transactionalId; + } + + private static Properties withTransactionalId( + Properties properties, @Nullable String transactionalId) { + if (transactionalId == null) { + return properties; + } + Properties props = new Properties(); + props.putAll(properties); + props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + return props; } @Override @@ -92,8 +101,9 @@ class FlinkKafkaInternalProducer extends KafkaProducer { super.close(Duration.ZERO); } - public Properties getKafkaProducerConfig() { - return kafkaProducerConfig; + @Nullable + public String getTransactionalId() { + return transactionalId; } public short getEpoch() { @@ -108,6 +118,28 @@ class FlinkKafkaInternalProducer extends KafkaProducer { return (long) getField(producerIdAndEpoch, "producerId"); } + public void initTransactionId(String transactionalId) { + if (!transactionalId.equals(this.transactionalId)) { + setTransactionId(transactionalId); + initTransactions(); + } + } + + public void setTransactionId(String transactionalId) { + if (!transactionalId.equals(this.transactionalId)) { + checkState(!inTransaction); + Object transactionManager = getTransactionManager(); + synchronized (transactionManager) { + setField(transactionManager, "transactionalId", transactionalId); + setField( + transactionManager, + "currentState", + getTransactionManagerState("UNINITIALIZED")); + this.transactionalId = transactionalId; + } + } + } + /** * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} * is also adding new partitions to the transaction. flushNewPartitions method is moving this @@ -214,7 +246,7 @@ class FlinkKafkaInternalProducer extends KafkaProducer { * https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630 */ public void resumeTransaction(long producerId, short epoch) { - Preconditions.checkState( + checkState( producerId >= 0 && epoch >= 0, "Incorrect values for producerId %s and epoch %s", producerId, @@ -267,8 +299,12 @@ class FlinkKafkaInternalProducer extends KafkaProducer { * reflection. */ private static void setField(Object object, String fieldName, Object value) { + setField(object, object.getClass(), fieldName, value); + } + + private static void setField(Object object, Class clazz, String fieldName, Object value) { try { - Field field = object.getClass().getDeclaredField(fieldName); + Field field = clazz.getDeclaredField(fieldName); field.setAccessible(true); field.set(object, value); } catch (NoSuchFieldException | IllegalAccessException e) { diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java index e8825fd..38579a3 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java @@ -17,8 +17,6 @@ package org.apache.flink.connector.kafka.sink; -import org.apache.kafka.clients.producer.ProducerConfig; - import javax.annotation.Nullable; import java.util.Objects; @@ -50,8 +48,7 @@ class KafkaCommittable { return new KafkaCommittable( producer.getProducerId(), producer.getEpoch(), - producer.getKafkaProducerConfig() - .getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG), + producer.getTransactionalId(), producer); } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java index f4c852a..d9c0d29 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java @@ -19,7 +19,6 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.api.connector.sink.Committer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.ProducerFencedException; import org.slf4j.Logger; @@ -79,15 +78,8 @@ class KafkaCommitter implements Committer { private FlinkKafkaInternalProducer createProducer(KafkaCommittable committable) { FlinkKafkaInternalProducer producer = new FlinkKafkaInternalProducer<>( - createKafkaProducerConfig(committable.getTransactionalId())); + kafkaProducerConfig, committable.getTransactionalId()); producer.resumeTransaction(committable.getProducerId(), committable.getEpoch()); return producer; } - - private Properties createKafkaProducerConfig(String transactionalId) { - final Properties copy = new Properties(); - copy.putAll(kafkaProducerConfig); - copy.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); - return copy; - } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index e221b6f..ae2c922 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -33,7 +33,6 @@ import org.apache.flink.shaded.guava30.com.google.common.io.Closer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -229,13 +228,11 @@ class KafkaWriter implements SinkWriter create an internal kafka producer on our own and do not rely // on // initTransactionalProducer(). - final Properties myConfig = new Properties(); - myConfig.putAll(kafkaProducerConfig); - myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transaction); LOG.info("Aborting Kafka transaction {}.", transaction); FlinkKafkaInternalProducer kafkaProducer = null; try { - kafkaProducer = new FlinkKafkaInternalProducer<>(myConfig); + kafkaProducer = + new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transaction); // it suffices to call initTransactions - this will abort any // lingering transactions kafkaProducer.initTransactions(); @@ -276,7 +273,7 @@ class KafkaWriter implements SinkWriter producer = - new FlinkKafkaInternalProducer<>(kafkaProducerConfig); + new FlinkKafkaInternalProducer<>(kafkaProducerConfig, null); initMetrics(producer); closer.register(producer); return producer; @@ -336,38 +333,23 @@ class KafkaWriter implements SinkWriter createTransactionalProducer() { final long transactionalIdOffset = kafkaWriterState.getTransactionalIdOffset() + 1; - final Properties copiedProducerConfig = new Properties(); - copiedProducerConfig.putAll(kafkaProducerConfig); - initTransactionalProducerConfig( - copiedProducerConfig, - transactionalIdOffset, - transactionalIdPrefix, - kafkaSinkContext.getParallelInstanceId()); + String transactionalId = + TransactionalIdFactory.buildTransactionalId( + transactionalIdPrefix, + kafkaSinkContext.getParallelInstanceId(), + transactionalIdOffset); final FlinkKafkaInternalProducer producer = - new FlinkKafkaInternalProducer<>(copiedProducerConfig); + new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId); producer.initTransactions(); kafkaWriterState = new KafkaWriterState( transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), transactionalIdOffset); - LOG.info( - "Created new transactional producer {}", - copiedProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + LOG.info("Created new transactional producer {}", transactionalId); return producer; } - private static void initTransactionalProducerConfig( - Properties producerConfig, - long transactionalIdOffset, - String transactionalIdPrefix, - int subtaskId) { - producerConfig.put( - ProducerConfig.TRANSACTIONAL_ID_CONFIG, - TransactionalIdFactory.buildTransactionalId( - transactionalIdPrefix, subtaskId, transactionalIdOffset)); - } - private void initMetrics(FlinkKafkaInternalProducer producer) { byteOutMetric = MetricUtil.getKafkaMetric( diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index d0b4601..1a75cca 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -36,7 +36,6 @@ import org.apache.flink.util.UserCodeClassLoader; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterAll; @@ -185,28 +184,24 @@ public class KafkaWriterITCase extends TestLogger { */ @Test void testAbortOnClose() throws Exception { + Properties properties = getKafkaClientConfiguration(); try (final KafkaWriter writer = - createWriterWithConfiguration( - getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { + createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE)) { writer.write(1, SINK_WRITER_CONTEXT); - assertThat(drainAllRecordsFromTopic(topic, getKafkaClientConfiguration()), hasSize(0)); + assertThat(drainAllRecordsFromTopic(topic, properties), hasSize(0)); } try (final KafkaWriter writer = - createWriterWithConfiguration( - getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { + createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE)) { writer.write(2, SINK_WRITER_CONTEXT); List committables = writer.prepareCommit(false); writer.snapshotState(1L); // manually commit here, which would only succeed if the first transaction was aborted assertThat(committables, hasSize(1)); - Properties properties = getKafkaClientConfiguration(); - properties.setProperty( - ProducerConfig.TRANSACTIONAL_ID_CONFIG, - committables.get(0).getTransactionalId()); + String transactionalId = committables.get(0).getTransactionalId(); try (FlinkKafkaInternalProducer producer = - new FlinkKafkaInternalProducer<>(properties)) { + new FlinkKafkaInternalProducer<>(properties, transactionalId)) { producer.resumeTransaction( committables.get(0).getProducerId(), committables.get(0).getEpoch()); producer.commitTransaction();