flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] twalthr commented on a change in pull request #16769: [FLINK-23639][connectors/kafka] Migrate Table API Kafka connector to use FLIP-143 KafkaSink
Date Wed, 11 Aug 2021 11:17:33 GMT

twalthr commented on a change in pull request #16769:
URL: https://github.com/apache/flink/pull/16769#discussion_r686728164



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -535,6 +538,15 @@ private static void autoCompleteSubject(
         }
     }
 
+    static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
+        if (tableOptions.getOptional(DELIVERY_GUARANTEE).orElse(DELIVERY_GUARANTEE.defaultValue())

Review comment:
       replace with `tableOptions.get(DELIVERY_GUARANTEE))`

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -240,6 +235,18 @@
                                                     + "must be set to be greater than zero
to enable sink buffer flushing.")
                                     .build());
 
+    public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE =

Review comment:
       please also regenerate the table connector docs

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link
+ * org.apache.flink.streaming.connectors.kafka.sink.KafkaSink}.
+ */
+class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<RowData>
{
+
+    private final String topic;
+    private final FlinkKafkaPartitioner<RowData> partitioner;
+    private final SerializationSchema<RowData> keySerialization;
+    private final SerializationSchema<RowData> valueSerialization;
+    private final RowData.FieldGetter[] keyFieldGetters;
+    private final RowData.FieldGetter[] valueFieldGetters;
+    private final boolean hasMetadata;
+    private final int[] metadataPositions;
+    private final boolean upsertMode;
+
+    private transient boolean isPartitionerOpen = false;
+
+    DynamicKafkaRecordSerializationSchema(
+            String topic,
+            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            @Nullable SerializationSchema<RowData> keySerialization,
+            SerializationSchema<RowData> valueSerialization,
+            RowData.FieldGetter[] keyFieldGetters,
+            RowData.FieldGetter[] valueFieldGetters,
+            boolean hasMetadata,
+            int[] metadataPositions,
+            boolean upsertMode) {
+        if (upsertMode) {
+            Preconditions.checkArgument(
+                    keySerialization != null && keyFieldGetters.length > 0,
+                    "Key must be set in upsert mode for serialization schema.");
+        }
+        this.topic = topic;
+        this.partitioner = partitioner;
+        this.keySerialization = keySerialization;
+        this.valueSerialization = valueSerialization;
+        this.keyFieldGetters = keyFieldGetters;
+        this.valueFieldGetters = valueFieldGetters;
+        this.hasMetadata = hasMetadata;
+        this.metadataPositions = metadataPositions;
+        this.upsertMode = upsertMode;
+    }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(
+            RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+        if (partitioner != null && !isPartitionerOpen) {
+            partitioner.open(
+                    context.getParallelInstanceId(), context.getNumberOfParallelInstances());
+            isPartitionerOpen = true;
+        }
+        // shortcut in case no input projection is required
+        if (keySerialization == null && !hasMetadata) {

Review comment:
       No that doesn't hold. This shortcut is for a simple Kafka topic scan. Where the topic
is used as an insert-only log without reading the key. E.g. for batch use cases.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -240,6 +235,18 @@
                                                     + "must be set to be greater than zero
to enable sink buffer flushing.")
                                     .build());
 
+    public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE =
+            ConfigOptions.key("sink.delivery-guarantee")
+                    .enumType(DeliveryGuarantee.class)
+                    .defaultValue(DeliveryGuarantee.NONE)
+                    .withDescription("Optional delivery guarantee when committing.");
+
+    public static final ConfigOption<String> TRANSACTIONAL_ID_PREFIX =
+            ConfigOptions.key("sink.transactional-id-prefix")
+                    .stringType()
+                    .defaultValue(null)
+                    .withDescription("Optional delivery guarantee when committing.");

Review comment:
       please keep in mind that table users are even less skilled than DataStream API users
when explaining this option. we have the infrastructure to add multiple paragraphs and code
examples if it helps.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -172,20 +185,43 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         final SerializationSchema<RowData> valueSerialization =
                 createSerialization(context, valueEncodingFormat, valueProjection, null);
 
-        final FlinkKafkaProducer<RowData> kafkaProducer =
-                createKafkaProducer(keySerialization, valueSerialization);
-
         if (flushMode.isEnabled() && upsertMode) {
             BufferedUpsertSinkFunction buffedSinkFunction =

Review comment:
       >Kafka producer is doing internally already
   
   Are you sure about that? This feature has been added recently to avoid too many tombstone
messages in Kafka log. It tries to normalize the changelog by a reducing step.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -240,6 +235,18 @@
                                                     + "must be set to be greater than zero
to enable sink buffer flushing.")
                                     .build());
 
+    public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE =
+            ConfigOptions.key("sink.delivery-guarantee")
+                    .enumType(DeliveryGuarantee.class)
+                    .defaultValue(DeliveryGuarantee.NONE)
+                    .withDescription("Optional delivery guarantee when committing.");
+
+    public static final ConfigOption<String> TRANSACTIONAL_ID_PREFIX =
+            ConfigOptions.key("sink.transactional-id-prefix")
+                    .stringType()
+                    .defaultValue(null)

Review comment:
       use `noDefaultValue()` instead?

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -195,12 +196,6 @@
                                                     "custom class name (use custom FlinkKafkaPartitioner
subclass)"))
                                     .build());
 
-    public static final ConfigOption<SinkSemantic> SINK_SEMANTIC =

Review comment:
       The class itself become public in 1.14 but the string key itself is public for a long
time. We should at least swallow it and warn in the log which happens when using the deprecated
keys feature of config options.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -535,6 +538,15 @@ private static void autoCompleteSubject(
         }
     }
 
+    static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
+        if (tableOptions.getOptional(DELIVERY_GUARANTEE).orElse(DELIVERY_GUARANTEE.defaultValue())
+                        == DeliveryGuarantee.EXACTLY_ONCE
+                && !tableOptions.getOptional(TRANSACTIONAL_ID_PREFIX).isPresent())
{
+            throw new ValidationException(
+                    "sink.transactional-id-prefix must be specified when using DeliveryGuarantee.EXACTLY_ONCE.");

Review comment:
       I'm sure we could create one with a lot of parameters such as configuration and plan,
but manually specifying it might be the best option for now. Would it make sense to allow
this on a per record basis using a metadata column, I guess not? And job graph id is not an
option?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message