Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0FD58166586 for ; Tue, 22 Aug 2017 09:09:25 +0200 (CEST) Received: (qmail 58877 invoked by uid 500); 22 Aug 2017 07:09:25 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 58868 invoked by uid 99); 22 Aug 2017 07:09:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Aug 2017 07:09:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id A31D71805B2 for ; Tue, 22 Aug 2017 07:09:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 5rAeQIl6SbC9 for ; Tue, 22 Aug 2017 07:09:20 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 9D14662472 for ; Tue, 22 Aug 2017 07:09:11 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A8BB4E0EB9 for ; Tue, 22 Aug 2017 07:09:10 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 07CC225391 for ; Tue, 22 Aug 2017 07:09:10 +0000 (UTC) Date: Tue, 22 Aug 2017 07:09:10 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136440#comment-16136440 ] ASF GitHub Bot commented on FLINK-6988: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4239#discussion_r134395636 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -0,0 +1,1000 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.NetUtils; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.11.x. By default producer + * will use {@link Semantic#EXACTLY_ONCE} semantic. + * + *

Implementation note: This producer is a hybrid between a regular regular + * {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} (a) and a custom operator (b). + * + *

Details about approach (a): + * Because of regular {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} APIs limitations, this + * variant do not allow accessing the timestamp attached to the record. + * + *

Details about approach (b): + * Kafka 0.11 supports writing the timestamp attached to a record to Kafka. When using the + * {@link FlinkKafkaProducer011#writeToKafkaWithTimestamps} method, the Kafka producer can access the internal + * record timestamp of the record and write it to Kafka. + * + *

All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer011 + extends TwoPhaseCommitSinkFunction { + + /** + * Semantics that can be chosen. + *

  • {@link #EXACTLY_ONCE}
  • + *
  • {@link #AT_LEAST_ONCE}
  • + *
  • {@link #NONE}
  • + */ + public enum Semantic { + /** + * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction that will be + * committed to the Kafka on a checkpoint. + * + *

    In this mode {@link FlinkKafkaProducer011} sets up a pool of {@link FlinkKafkaProducer}. Between each + * checkpoint there is created new Kafka transaction, which is being committed on + * {@link FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete notifications are + * running late, {@link FlinkKafkaProducer011} can run out of {@link FlinkKafkaProducer}s in the pool. In that + * case any subsequent {@link FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail + * and {@link FlinkKafkaProducer011} will keep using the {@link FlinkKafkaProducer} from previous checkpoint. + * To decrease chances of failing checkpoints there are three options: + *

  • decrease number of max concurrent checkpoints
  • + *
  • make checkpoints more reliable (so that they complete faster)
  • + *
  • increase delay between checkpoints
  • + *
  • increase size of {@link FlinkKafkaProducer}s pool
  • + */ + EXACTLY_ONCE, + /** + * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the Kafka buffers + * to be acknowledged by the Kafka producer on a checkpoint. + */ + AT_LEAST_ONCE, + /** + * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or duplicated in case + * of failure. + */ + NONE + } + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class); + + private static final long serialVersionUID = 1L; + + /** + * Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}. + */ + public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; + + /** + * Configuration key for disabling the metrics reporting. + */ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** + * Descriptor of the transacionalIds list. + */ + private static final ListStateDescriptor TRANSACTIONAL_IDS_DESCRIPTOR = + new ListStateDescriptor<>("transactional-ids", TypeInformation.of(String.class)); + + /** + * Pool of transacional ids backed up in state. + */ + private ListState transactionalIdsState; + + /** + * Already used transactional ids. + */ + private final Set usedTransactionalIds = new HashSet<>(); + + /** + * Available to use transactional ids. + */ + private final BlockingDeque availableTransactionalIds = new LinkedBlockingDeque<>(); + + /** + * User defined properties for the Producer. + */ + private final Properties producerConfig; + + /** + * The name of the default topic this producer is writing data to. + */ + private final String defaultTopicId; + + /** + * (Serializable) SerializationSchema for turning objects used with Flink into. + * byte[] for Kafka. + */ + private final KeyedSerializationSchema schema; + + /** + * User-provided partitioner for assigning an object to a Kafka partition for each topic. + */ + private final FlinkKafkaPartitioner flinkKafkaPartitioner; + + /** + * Partitions of each topic. + */ + private final Map topicPartitionsMap; + + /** + * Max number of producers in the pool. If all producers are in use, snapshoting state will throw an exception. + */ + private final int kafkaProducersPoolSize; + + /** + * Flag controlling whether we are writing the Flink record's timestamp into Kafka. + */ + private boolean writeTimestampToKafka = false; + + /** + * Flag indicating whether to accept failures (and log them), or to fail on failures. + */ + private boolean logFailuresOnly; + + /** + * Semantic chosen for this instance. + */ + private Semantic semantic; + + /** + * Pool of KafkaProducers objects. + */ + private transient ProducersPool producersPool = new ProducersPool(); + + // -------------------------------- Runtime fields ------------------------------------------ + + /** The callback than handles error propagation or logging callbacks. */ + @Nullable + private transient Callback callback; + + /** Errors encountered in the async producer are stored here. */ + @Nullable + private transient volatile Exception asyncException; + + /** Lock for accessing the pending records. */ + private final SerializableObject pendingRecordsLock = new SerializableObject(); + + /** Number of unacknowledged records. */ + private final AtomicLong pendingRecords = new AtomicLong(); + + /** Cache of metrics to replace already registered metrics instead of overwriting existing ones. */ + private final Map previouslyCreatedMetrics = new HashMap<>(); + + // ---------------------- "Constructors" for timestamp writing ------------------ + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + *

    This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @param producerConfig Properties with the producer configuration. + */ + public static FlinkKafkaProducer011Configuration writeToKafkaWithTimestamps(DataStream inStream, + String topicId, + KeyedSerializationSchema serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + *

    This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined (keyless) serialization schema. + * @param producerConfig Properties with the producer configuration. + */ + public static FlinkKafkaProducer011Configuration writeToKafkaWithTimestamps(DataStream inStream, + String topicId, + SerializationSchema serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + *

    This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a + * kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only + * required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + */ + public static FlinkKafkaProducer011Configuration writeToKafkaWithTimestamps(DataStream inStream, + String topicId, + KeyedSerializationSchema serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner customPartitioner) { + return writeToKafkaWithTimestamps( + inStream, + topicId, + serializationSchema, + producerConfig, + customPartitioner, + Semantic.EXACTLY_ONCE, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + *

    This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a + * kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only + * required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}). + */ + public static FlinkKafkaProducer011Configuration writeToKafkaWithTimestamps(DataStream inStream, + String topicId, + KeyedSerializationSchema serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner customPartitioner, + Semantic semantic, + int kafkaProducersPoolSize) { + + GenericTypeInfo objectTypeInfo = new GenericTypeInfo<>(Object.class); + FlinkKafkaProducer011 kafkaProducer = + new FlinkKafkaProducer011<>( + topicId, + serializationSchema, + producerConfig, + customPartitioner, + semantic, + kafkaProducersPoolSize); + KafkaStreamSink streamSink = new KafkaStreamSink(kafkaProducer); + SingleOutputStreamOperator transformation = inStream.transform("FlinKafkaProducer 0.11.x", objectTypeInfo, streamSink); + return new FlinkKafkaProducer011Configuration<>(transformation, streamSink); + } + + // ---------------------- Regular constructors w/o timestamp support ------------------ + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema serializationSchema) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer011(String topicId, SerializationSchema serializationSchema, Properties producerConfig) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + */ + public FlinkKafkaProducer011(String topicId, SerializationSchema serializationSchema, Properties producerConfig, FlinkKafkaPartitioner customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + */ + public FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer011(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner()); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + *

    This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. + */ + public FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, FlinkKafkaPartitioner customPartitioner) { + this( + defaultTopicId, + serializationSchema, + producerConfig, + customPartitioner, + Semantic.EXACTLY_ONCE, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + *

    This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. + * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}). + */ + public FlinkKafkaProducer011( + String defaultTopicId, + KeyedSerializationSchema serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner customPartitioner, + Semantic semantic, + int kafkaProducersPoolSize) { + super( + TypeInformation.of(KafkaTransactionState.class), + TypeInformation.of(new TypeHint>() {})); + + requireNonNull(defaultTopicId, "TopicID not set"); + requireNonNull(serializationSchema, "serializationSchema not set"); + requireNonNull(producerConfig, "producerConfig not set"); --- End diff -- In Flink we usually use `Preconditions.checkNotNull` for this. > Add Apache Kafka 0.11 connector > ------------------------------- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Affects Versions: 1.3.1 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > > Kafka 0.11 (it will be released very soon) add supports for transactions. Thanks to that, Flink might be able to implement Kafka sink supporting "exactly-once" semantic. API changes and whole transactions support is described in [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the state and would write all incoming data to an output Kafka topic using that transaction > * on `snapshotState` call, it would flush the data and write in state information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we either abort this pending transaction (if not every participant successfully saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)