flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...
Date Tue, 22 Aug 2017 07:08:19 GMT
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4239#discussion_r134396137
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
---
    @@ -0,0 +1,1000 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.util.SerializableObject;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
    +import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.NetUtils;
    +
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.Metric;
    +import org.apache.kafka.common.MetricName;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.errors.InvalidTxnStateException;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.BlockingDeque;
    +import java.util.concurrent.LinkedBlockingDeque;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka
0.11.x. By default producer
    + * will use {@link Semantic#EXACTLY_ONCE} semantic.
    + *
    + * <p>Implementation note: This producer is a hybrid between a regular regular
    + * {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} (a) and a custom
operator (b).
    + *
    + * <p>Details about approach (a):
    + *  Because of regular {@link org.apache.flink.streaming.api.functions.sink.SinkFunction}
APIs limitations, this
    + *  variant do not allow accessing the timestamp attached to the record.
    + *
    + * <p>Details about approach (b):
    + *  Kafka 0.11 supports writing the timestamp attached to a record to Kafka. When using
the
    + *  {@link FlinkKafkaProducer011#writeToKafkaWithTimestamps} method, the Kafka producer
can access the internal
    + *  record timestamp of the record and write it to Kafka.
    + *
    + * <p>All methods and constructors in this class are marked with the approach they
are needed for.
    + */
    +public class FlinkKafkaProducer011<IN>
    +		extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState>
{
    +
    +	/**
    +	 *  Semantics that can be chosen.
    +	 *  <li>{@link #EXACTLY_ONCE}</li>
    +	 *  <li>{@link #AT_LEAST_ONCE}</li>
    +	 *  <li>{@link #NONE}</li>
    +	 */
    +	public enum Semantic {
    +		/**
    +		 * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction
that will be
    +		 * committed to the Kafka on a checkpoint.
    +		 *
    +		 * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool of {@link FlinkKafkaProducer}.
Between each
    +		 * checkpoint there is created new Kafka transaction, which is being committed on
    +		 * {@link FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete
notifications are
    +		 * running late, {@link FlinkKafkaProducer011} can run out of {@link FlinkKafkaProducer}s
in the pool. In that
    +		 * case any subsequent {@link FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)}
requests will fail
    +		 * and {@link FlinkKafkaProducer011} will keep using the {@link FlinkKafkaProducer}
from previous checkpoint.
    +		 * To decrease chances of failing checkpoints there are three options:
    +		 * <li>decrease number of max concurrent checkpoints</li>
    +		 * <li>make checkpoints more reliable (so that they complete faster)</li>
    +		 * <li>increase delay between checkpoints</li>
    +		 * <li>increase size of {@link FlinkKafkaProducer}s pool</li>
    +		 */
    +		EXACTLY_ONCE,
    +		/**
    +		 * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages
in the Kafka buffers
    +		 * to be acknowledged by the Kafka producer on a checkpoint.
    +		 */
    +		AT_LEAST_ONCE,
    +		/**
    +		 * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or
duplicated in case
    +		 * of failure.
    +		 */
    +		NONE
    +	}
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	/**
    +	 * Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}.
    +	 */
    +	public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
    +
    +	/**
    +	 * Configuration key for disabling the metrics reporting.
    +	 */
    +	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    +
    +	/**
    +	 * Descriptor of the transacionalIds list.
    +	 */
    +	private static final ListStateDescriptor<String> TRANSACTIONAL_IDS_DESCRIPTOR
=
    +		new ListStateDescriptor<>("transactional-ids", TypeInformation.of(String.class));
    +
    +	/**
    +	 * Pool of transacional ids backed up in state.
    +	 */
    +	private ListState<String> transactionalIdsState;
    +
    +	/**
    +	 * Already used transactional ids.
    +	 */
    +	private final Set<String> usedTransactionalIds = new HashSet<>();
    +
    +	/**
    +	 * Available to use transactional ids.
    +	 */
    +	private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();
    +
    +	/**
    +	 * User defined properties for the Producer.
    +	 */
    +	private final Properties producerConfig;
    +
    +	/**
    +	 * The name of the default topic this producer is writing data to.
    +	 */
    +	private final String defaultTopicId;
    +
    +	/**
    +	 * (Serializable) SerializationSchema for turning objects used with Flink into.
    +	 * byte[] for Kafka.
    +	 */
    +	private final KeyedSerializationSchema<IN> schema;
    +
    +	/**
    +	 * User-provided partitioner for assigning an object to a Kafka partition for each topic.
    +	 */
    +	private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
    +
    +	/**
    +	 * Partitions of each topic.
    +	 */
    +	private final Map<String, int[]> topicPartitionsMap;
    +
    +	/**
    +	 * Max number of producers in the pool. If all producers are in use, snapshoting state
will throw an exception.
    +	 */
    +	private final int kafkaProducersPoolSize;
    +
    +	/**
    +	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
    +	 */
    +	private boolean writeTimestampToKafka = false;
    +
    +	/**
    +	 * Flag indicating whether to accept failures (and log them), or to fail on failures.
    +	 */
    +	private boolean logFailuresOnly;
    +
    +	/**
    +	 * Semantic chosen for this instance.
    +	 */
    +	private Semantic semantic;
    +
    +	/**
    +	 * Pool of KafkaProducers objects.
    +	 */
    +	private transient ProducersPool producersPool = new ProducersPool();
    +
    +	// -------------------------------- Runtime fields ------------------------------------------
    +
    +	/** The callback than handles error propagation or logging callbacks. */
    +	@Nullable
    +	private transient Callback callback;
    +
    +	/** Errors encountered in the async producer are stored here. */
    +	@Nullable
    +	private transient volatile Exception asyncException;
    +
    +	/** Lock for accessing the pending records. */
    +	private final SerializableObject pendingRecordsLock = new SerializableObject();
    +
    +	/** Number of unacknowledged records. */
    +	private final AtomicLong pendingRecords = new AtomicLong();
    +
    +	/** Cache of metrics to replace already registered metrics instead of overwriting existing
ones. */
    +	private final Map<String, KafkaMetricMuttableWrapper> previouslyCreatedMetrics
= new HashMap<>();
    +
    +	// ---------------------- "Constructors" for timestamp writing ------------------
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * <p>This constructor allows writing timestamps to Kafka, it follow approach
(b) (see above)
    +	 *
    +	 * @param inStream The stream to write to Kafka
    +	 * @param topicId ID of the Kafka topic.
    +	 * @param serializationSchema User defined serialization schema supporting key/value
messages
    +	 * @param producerConfig Properties with the producer configuration.
    +	 */
    +	public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN>
inStream,
    +																					String topicId,
    +																					KeyedSerializationSchema<IN> serializationSchema,
    +																					Properties producerConfig) {
    +		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig,
new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * <p>This constructor allows writing timestamps to Kafka, it follow approach
(b) (see above)
    +	 *
    +	 * @param inStream The stream to write to Kafka
    +	 * @param topicId ID of the Kafka topic.
    +	 * @param serializationSchema User defined (keyless) serialization schema.
    +	 * @param producerConfig Properties with the producer configuration.
    +	 */
    +	public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN>
inStream,
    +																					String topicId,
    +																					SerializationSchema<IN> serializationSchema,
    +																					Properties producerConfig) {
    +		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema),
producerConfig, new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * <p>This constructor allows writing timestamps to Kafka, it follow approach
(b) (see above)
    +	 *
    +	 *  @param inStream The stream to write to Kafka
    +	 *  @param topicId The name of the target topic
    +	 *  @param serializationSchema A serializable serialization schema for turning user
objects into a
    +	 *                             kafka-consumable byte[] supporting key/value messages
    +	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.'
is the only
    +	 *                        required argument.
    +	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka
partitions.
    +	 */
    +	public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN>
inStream,
    +																					String topicId,
    +																					KeyedSerializationSchema<IN> serializationSchema,
    +																					Properties producerConfig,
    +																					FlinkKafkaPartitioner<IN> customPartitioner) {
    +		return writeToKafkaWithTimestamps(
    +			inStream,
    +			topicId,
    +			serializationSchema,
    +			producerConfig,
    +			customPartitioner,
    +			Semantic.EXACTLY_ONCE,
    +			DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * <p>This constructor allows writing timestamps to Kafka, it follow approach
(b) (see above)
    +	 *
    +	 *  @param inStream The stream to write to Kafka
    +	 *  @param topicId The name of the target topic
    +	 *  @param serializationSchema A serializable serialization schema for turning user
objects into a
    +	 *                             kafka-consumable byte[] supporting key/value messages
    +	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.'
is the only
    +	 *                        required argument.
    +	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka
partitions.
    +	 *  @param semantic Defines semantic that will be used by this producer (see {@link
Semantic}).
    +	 *  @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
Semantic#EXACTLY_ONCE}).
    +	 */
    +	public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN>
inStream,
    +																					String topicId,
    +																					KeyedSerializationSchema<IN> serializationSchema,
    +																					Properties producerConfig,
    +																					FlinkKafkaPartitioner<IN> customPartitioner,
    +																					Semantic semantic,
    +																					int kafkaProducersPoolSize) {
    +
    +		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
    +		FlinkKafkaProducer011<IN> kafkaProducer =
    +			new FlinkKafkaProducer011<>(
    +				topicId,
    +				serializationSchema,
    +				producerConfig,
    +				customPartitioner,
    +				semantic,
    +				kafkaProducersPoolSize);
    +		KafkaStreamSink streamSink = new KafkaStreamSink(kafkaProducer);
    +		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer
0.11.x", objectTypeInfo, streamSink);
    +		return new FlinkKafkaProducer011Configuration<>(transformation, streamSink);
    +	}
    +
    +	// ---------------------- Regular constructors w/o timestamp support  ------------------
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * @param brokerList
    +	 *			Comma separated addresses of the brokers
    +	 * @param topicId
    +	 * 			ID of the Kafka topic.
    +	 * @param serializationSchema
    +	 * 			User defined (keyless) serialization schema.
    +	 */
    +	public FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN>
serializationSchema) {
    +		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList),
new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * @param topicId
    +	 * 			ID of the Kafka topic.
    +	 * @param serializationSchema
    +	 * 			User defined (keyless) serialization schema.
    +	 * @param producerConfig
    +	 * 			Properties with the producer configuration.
    +	 */
    +	public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema,
Properties producerConfig) {
    +		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig,
new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * @param topicId The topic to write data to
    +	 * @param serializationSchema A (keyless) serializable serialization schema for turning
user objects into a kafka-consumable byte[]
    +	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.'
is the only required argument.
    +	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka
partitions (when passing null, we'll use Kafka's partitioner)
    +	 */
    +	public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema,
Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
    +		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig,
customPartitioner);
    +	}
    +
    +	// ------------------- Key/Value serialization schema constructors ----------------------
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * @param brokerList
    +	 *			Comma separated addresses of the brokers
    +	 * @param topicId
    +	 * 			ID of the Kafka topic.
    +	 * @param serializationSchema
    +	 * 			User defined serialization schema supporting key/value messages
    +	 */
    +	public FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN>
serializationSchema) {
    +		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * @param topicId
    +	 * 			ID of the Kafka topic.
    +	 * @param serializationSchema
    +	 * 			User defined serialization schema supporting key/value messages
    +	 * @param producerConfig
    +	 * 			Properties with the producer configuration.
    +	 */
    +	public FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig) {
    +		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * The main constructor for creating a FlinkKafkaProducer.
    +	 *
    +	 * <p>This constructor does not allow writing timestamps to Kafka, it follow approach
(a) (see above)
    +	 *
    +	 * @param defaultTopicId The default topic to write data to
    +	 * @param serializationSchema A serializable serialization schema for turning user objects
into a kafka-consumable byte[] supporting key/value messages
    +	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.'
is the only required argument.
    +	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka
partitions. Passing null will use Kafka's partitioner.
    +	 */
    +	public FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN>
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner)
{
    +		this(
    +			defaultTopicId,
    +			serializationSchema,
    +			producerConfig,
    +			customPartitioner,
    +			Semantic.EXACTLY_ONCE,
    +			DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
    +	}
    +
    +	/**
    +	 * The main constructor for creating a FlinkKafkaProducer.
    +	 *
    +	 * <p>This constructor does not allow writing timestamps to Kafka, it follow approach
(a) (see above)
    +	 *
    +	 * @param defaultTopicId The default topic to write data to
    +	 * @param serializationSchema A serializable serialization schema for turning user objects
into a kafka-consumable byte[] supporting key/value messages
    +	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.'
is the only required argument.
    +	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka
partitions. Passing null will use Kafka's partitioner.
    +	 * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}).
    +	 * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
Semantic#EXACTLY_ONCE}).
    +	 */
    +	public FlinkKafkaProducer011(
    +			String defaultTopicId,
    +			KeyedSerializationSchema<IN> serializationSchema,
    +			Properties producerConfig,
    +			FlinkKafkaPartitioner<IN> customPartitioner,
    +			Semantic semantic,
    +			int kafkaProducersPoolSize) {
    +		super(
    +			TypeInformation.of(KafkaTransactionState.class),
    +			TypeInformation.of(new TypeHint<List<KafkaTransactionState>>() {}));
    +
    +		requireNonNull(defaultTopicId, "TopicID not set");
    +		requireNonNull(serializationSchema, "serializationSchema not set");
    +		requireNonNull(producerConfig, "producerConfig not set");
    +		ClosureCleaner.clean(customPartitioner, true);
    +		ClosureCleaner.ensureSerializable(serializationSchema);
    +
    +		this.defaultTopicId = defaultTopicId;
    +		this.schema = serializationSchema;
    +		this.producerConfig = producerConfig;
    +		this.flinkKafkaPartitioner = customPartitioner;
    +		this.semantic = semantic;
    +		this.kafkaProducersPoolSize = kafkaProducersPoolSize;
    --- End diff --
    
    Check for negative / 0 poll sizes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message