Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 51E04200C79 for ; Fri, 19 May 2017 08:40:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5043E160BBE; Fri, 19 May 2017 06:40:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 22F83160BD4 for ; Fri, 19 May 2017 08:40:12 +0200 (CEST) Received: (qmail 96993 invoked by uid 500); 19 May 2017 06:40:12 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 96745 invoked by uid 99); 19 May 2017 06:40:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 May 2017 06:40:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C0336E0210; Fri, 19 May 2017 06:40:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tzulitai@apache.org To: commits@flink.apache.org Date: Fri, 19 May 2017 06:40:13 -0000 Message-Id: <5990231977fc4944b584c01053e08cd3@git.apache.org> In-Reply-To: <0ce7bbf768e9450897def052fae65326@git.apache.org> References: <0ce7bbf768e9450897def052fae65326@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] flink git commit: [FLINK-6288] [kafka] New custom partitioner API that correctly handles multiple Kafka sink topics archived-at: Fri, 19 May 2017 06:40:15 -0000 [FLINK-6288] [kafka] New custom partitioner API that correctly handles multiple Kafka sink topics Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ed9b683 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ed9b683 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ed9b683 Branch: refs/heads/master Commit: 9ed9b68397b51bfd2b0f6e532212a82f771641bd Parents: ffa9aa0 Author: zjureel Authored: Mon May 15 17:41:47 2017 +0800 Committer: Tzu-Li (Gordon) Tai Committed: Fri May 19 14:38:48 2017 +0800 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer010.java | 82 ++++++++++-- .../connectors/kafka/Kafka010ITCase.java | 6 +- .../kafka/KafkaTestEnvironmentImpl.java | 8 +- .../connectors/kafka/FlinkKafkaProducer.java | 9 +- .../connectors/kafka/FlinkKafkaProducer08.java | 40 +++++- .../connectors/kafka/Kafka08JsonTableSink.java | 17 +++ .../kafka/Kafka08JsonTableSinkTest.java | 14 ++ .../connectors/kafka/KafkaProducerTest.java | 5 +- .../kafka/KafkaTestEnvironmentImpl.java | 6 +- .../connectors/kafka/FlinkKafkaProducer09.java | 42 +++++- .../connectors/kafka/Kafka09JsonTableSink.java | 29 +++++ .../kafka/Kafka09JsonTableSinkTest.java | 15 +++ .../connectors/kafka/KafkaProducerTest.java | 6 +- .../kafka/KafkaTestEnvironmentImpl.java | 10 +- .../kafka/FlinkKafkaProducerBase.java | 127 ++++++++++++------- .../connectors/kafka/KafkaJsonTableSink.java | 16 ++- .../connectors/kafka/KafkaTableSink.java | 35 ++++- .../kafka/partitioner/FixedPartitioner.java | 3 +- .../partitioner/FlinkFixedPartitioner.java | 71 +++++++++++ .../FlinkKafkaDelegatePartitioner.java | 47 +++++++ .../partitioner/FlinkKafkaPartitioner.java | 39 ++++++ .../kafka/partitioner/KafkaPartitioner.java | 14 ++ .../connectors/kafka/KafkaConsumerTestBase.java | 106 ++++++++-------- .../connectors/kafka/KafkaProducerTestBase.java | 11 +- .../kafka/KafkaTableSinkTestBase.java | 56 +++++++- .../connectors/kafka/KafkaTestEnvironment.java | 14 +- .../kafka/TestFlinkFixedPartitioner.java | 104 +++++++++++++++ .../TestFlinkKafkaDelegatePartitioner.java | 111 ++++++++++++++++ .../kafka/testutils/DataGenerators.java | 20 +-- .../kafka/testutils/Tuple2FlinkPartitioner.java | 45 +++++++ .../kafka/testutils/Tuple2Partitioner.java | 9 +- 31 files changed, 937 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java index cc0194b..7addafa 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.connectors.kafka; +import java.util.Properties; + import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -27,7 +29,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; @@ -35,8 +38,6 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; -import java.util.Properties; - import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; @@ -87,7 +88,7 @@ public class FlinkKafkaProducer010 extends StreamSink implements SinkFunct String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) { - return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner()); + return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner()); } @@ -106,7 +107,7 @@ public class FlinkKafkaProducer010 extends StreamSink implements SinkFunct String topicId, SerializationSchema serializationSchema, Properties producerConfig) { - return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner()); + return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner()); } /** @@ -120,7 +121,9 @@ public class FlinkKafkaProducer010 extends StreamSink implements SinkFunct * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * @deprecated Use {@link FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public static FlinkKafkaProducer010Configuration writeToKafkaWithTimestamps(DataStream inStream, String topicId, KeyedSerializationSchema serializationSchema, @@ -133,6 +136,30 @@ public class FlinkKafkaProducer010 extends StreamSink implements SinkFunct return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); } + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + */ + public static FlinkKafkaProducer010Configuration writeToKafkaWithTimestamps(DataStream inStream, + String topicId, + KeyedSerializationSchema serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner customPartitioner) { + + GenericTypeInfo objectTypeInfo = new GenericTypeInfo<>(Object.class); + FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner); + SingleOutputStreamOperator transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); + return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); + } + // ---------------------- Regular constructors w/o timestamp support ------------------ /** @@ -147,7 +174,7 @@ public class FlinkKafkaProducer010 extends StreamSink implements SinkFunct * User defined (keyless) serialization schema. */ public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema serializationSchema) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner()); + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner()); } /** @@ -162,7 +189,7 @@ public class FlinkKafkaProducer010 extends StreamSink implements SinkFunct * Properties with the producer configuration. */ public FlinkKafkaProducer010(String topicId, SerializationSchema serializationSchema, Properties producerConfig) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner()); + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner()); } /** @@ -173,11 +200,26 @@ public class FlinkKafkaProducer010 extends StreamSink implements SinkFunct * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public FlinkKafkaProducer010(String topicId, SerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); } + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + */ + public FlinkKafkaProducer010(String topicId, SerializationSchema serializationSchema, Properties producerConfig, FlinkKafkaPartitioner customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + } + // ------------------- Key/Value serialization schema constructors ---------------------- /** @@ -192,7 +234,7 @@ public class FlinkKafkaProducer010 extends StreamSink implements SinkFunct * User defined serialization schema supporting key/value messages */ public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema serializationSchema) { - this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner()); + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner()); } /** @@ -207,19 +249,32 @@ public class FlinkKafkaProducer010 extends StreamSink implements SinkFunct * Properties with the producer configuration. */ public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) { - this(topicId, serializationSchema, producerConfig, new FixedPartitioner()); + this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner()); } /** * Create Kafka producer * * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) + * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { // We create a Kafka 09 producer instance here and only "override" (by intercepting) the // invoke call. super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner)); } + + /** + * Create Kafka producer + * + * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) + */ + public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, FlinkKafkaPartitioner customPartitioner) { + // We create a Kafka 09 producer instance here and only "override" (by intercepting) the + // invoke call. + super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner)); + } // ----------------------------- Generic element processing --------------------------- @@ -243,10 +298,15 @@ public class FlinkKafkaProducer010 extends StreamSink implements SinkFunct } ProducerRecord record; - if (internalProducer.partitioner == null) { + int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic); + if(null == partitions) { + partitions = internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer); + internalProducer.topicPartitionsMap.put(targetTopic, partitions); + } + if (internalProducer.flinkKafkaPartitioner == null) { record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); } else { - record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue); + record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue); } if (internalProducer.flushOnCheckpoint) { synchronized (internalProducer.pendingRecordsLock) { http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java index 39b2b8f..add623e 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; @@ -208,11 +208,11 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { }); final TypeInformationSerializationSchema longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.parse("Long"), env.getConfig()); - FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner() { + FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new FlinkKafkaPartitioner() { private static final long serialVersionUID = -6730989584364230617L; @Override - public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + public int partition(Long next, byte[] key, byte[] value, String targetTopic, int[] partitions) { return (int)(next % 3); } }); http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index d27e53a..c88c858 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -30,8 +30,8 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; @@ -46,9 +46,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.net.BindException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.HashMap; import java.util.Properties; import java.util.UUID; @@ -116,7 +116,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public StreamSink getProducerSink(String topic, KeyedSerializationSchema serSchema, Properties props, KafkaPartitioner partitioner) { + public StreamSink getProducerSink(String topic, KeyedSerializationSchema serSchema, Properties props, FlinkKafkaPartitioner partitioner) { FlinkKafkaProducer010 prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); return new StreamSink<>(prod); @@ -124,7 +124,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { @Override - public DataStreamSink produceIntoKafka(DataStream stream, String topic, KeyedSerializationSchema serSchema, Properties props, KafkaPartitioner partitioner) { + public DataStreamSink produceIntoKafka(DataStream stream, String topic, KeyedSerializationSchema serSchema, Properties props, FlinkKafkaPartitioner partitioner) { FlinkKafkaProducer010 prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); return stream.addSink(prod); http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index a7b89f8..98dac3e 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; @@ -37,7 +38,7 @@ public class FlinkKafkaProducer extends FlinkKafkaProducer08 { */ @Deprecated public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema serializationSchema) { - super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null); + super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)null); } /** @@ -45,7 +46,7 @@ public class FlinkKafkaProducer extends FlinkKafkaProducer08 { */ @Deprecated public FlinkKafkaProducer(String topicId, SerializationSchema serializationSchema, Properties producerConfig) { - super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null); + super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, (FlinkKafkaPartitioner)null); } /** @@ -62,7 +63,7 @@ public class FlinkKafkaProducer extends FlinkKafkaProducer08 { */ @Deprecated public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema serializationSchema) { - super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null); + super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)null); } /** @@ -70,7 +71,7 @@ public class FlinkKafkaProducer extends FlinkKafkaProducer08 { */ @Deprecated public FlinkKafkaProducer(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) { - super(topicId, serializationSchema, producerConfig, null); + super(topicId, serializationSchema, producerConfig, (FlinkKafkaPartitioner)null); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java index 65de5fc..64d3716 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java @@ -17,7 +17,8 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; @@ -50,7 +51,7 @@ public class FlinkKafkaProducer08 extends FlinkKafkaProducerBase { * User defined (keyless) serialization schema. */ public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema serializationSchema) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner()); + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner()); } /** @@ -65,7 +66,7 @@ public class FlinkKafkaProducer08 extends FlinkKafkaProducerBase { * Properties with the producer configuration. */ public FlinkKafkaProducer08(String topicId, SerializationSchema serializationSchema, Properties producerConfig) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner()); + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner()); } /** @@ -75,12 +76,27 @@ public class FlinkKafkaProducer08 extends FlinkKafkaProducerBase { * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. + * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public FlinkKafkaProducer08(String topicId, SerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); } + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. + */ + public FlinkKafkaProducer08(String topicId, SerializationSchema serializationSchema, Properties producerConfig, FlinkKafkaPartitioner customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + + } + // ------------------- Key/Value serialization schema constructors ---------------------- /** @@ -95,7 +111,7 @@ public class FlinkKafkaProducer08 extends FlinkKafkaProducerBase { * User defined serialization schema supporting key/value messages */ public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema serializationSchema) { - this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner()); + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner()); } /** @@ -110,7 +126,7 @@ public class FlinkKafkaProducer08 extends FlinkKafkaProducerBase { * Properties with the producer configuration. */ public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) { - this(topicId, serializationSchema, producerConfig, new FixedPartitioner()); + this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner()); } /** @@ -120,11 +136,25 @@ public class FlinkKafkaProducer08 extends FlinkKafkaProducerBase { * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. + * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { super(topicId, serializationSchema, producerConfig, customPartitioner); } + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param topicId The topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. + */ + public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, FlinkKafkaPartitioner customPartitioner) { + super(topicId, serializationSchema, producerConfig, customPartitioner); + } + @Override protected void flush() { // The Kafka 0.8 producer doesn't support flushing, we wait here http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 839388f..5a066ec0 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -38,6 +39,17 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner) { super(topic, properties, partitioner); } + + /** + * Creates {@link KafkaTableSink} for Kafka 0.8 + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner partitioner) { + super(topic, properties, partitioner); + } @Override protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, KafkaPartitioner partitioner) { @@ -45,6 +57,11 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { } @Override + protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, FlinkKafkaPartitioner partitioner) { + return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner); + } + + @Override protected Kafka08JsonTableSink createCopy() { return new Kafka08JsonTableSink(topic, properties, partitioner); } http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 0ac452e..164c162 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; @@ -40,6 +41,19 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { } @Override + protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner partitioner, + final FlinkKafkaProducerBase kafkaProducer) { + + return new Kafka08JsonTableSink(topic, properties, partitioner) { + @Override + protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, + SerializationSchema serializationSchema, FlinkKafkaPartitioner partitioner) { + return kafkaProducer; + } + }; + } + + @Override @SuppressWarnings("unchecked") protected SerializationSchema getSerializationSchema() { return new JsonRowSerializationSchema(FIELD_NAMES); http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java index 65d7596..c7da5af 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; @@ -83,7 +84,7 @@ public class KafkaProducerTest extends TestLogger { // (1) producer that propagates errors FlinkKafkaProducer08 producerPropagating = new FlinkKafkaProducer08<>( - "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null); OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating)); @@ -106,7 +107,7 @@ public class KafkaProducerTest extends TestLogger { // (2) producer that only logs errors FlinkKafkaProducer08 producerLogging = new FlinkKafkaProducer08<>( - "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null); producerLogging.setLogFailuresOnly(true); testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging)); http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 643ee8e..2419b53 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -36,8 +36,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; @@ -109,7 +109,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { String topic, KeyedSerializationSchema serSchema, Properties props, - KafkaPartitioner partitioner) { + FlinkKafkaPartitioner partitioner) { FlinkKafkaProducer08 prod = new FlinkKafkaProducer08<>( topic, serSchema, @@ -120,7 +120,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public DataStreamSink produceIntoKafka(DataStream stream, String topic, KeyedSerializationSchema serSchema, Properties props, KafkaPartitioner partitioner) { + public DataStreamSink produceIntoKafka(DataStream stream, String topic, KeyedSerializationSchema serSchema, Properties props, FlinkKafkaPartitioner partitioner) { FlinkKafkaProducer08 prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); return stream.addSink(prod); http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java index 2a3e39d..4f41c43 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java @@ -17,7 +17,8 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; @@ -51,7 +52,7 @@ public class FlinkKafkaProducer09 extends FlinkKafkaProducerBase { * User defined (keyless) serialization schema. */ public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema serializationSchema) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner()); + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner()); } /** @@ -66,7 +67,7 @@ public class FlinkKafkaProducer09 extends FlinkKafkaProducerBase { * Properties with the producer configuration. */ public FlinkKafkaProducer09(String topicId, SerializationSchema serializationSchema, Properties producerConfig) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner()); + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner()); } /** @@ -77,12 +78,28 @@ public class FlinkKafkaProducer09 extends FlinkKafkaProducerBase { * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public FlinkKafkaProducer09(String topicId, SerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); } + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + */ + public FlinkKafkaProducer09(String topicId, SerializationSchema serializationSchema, Properties producerConfig, FlinkKafkaPartitioner customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + + } + // ------------------- Key/Value serialization schema constructors ---------------------- /** @@ -97,7 +114,7 @@ public class FlinkKafkaProducer09 extends FlinkKafkaProducerBase { * User defined serialization schema supporting key/value messages */ public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema serializationSchema) { - this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner()); + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner()); } /** @@ -112,7 +129,7 @@ public class FlinkKafkaProducer09 extends FlinkKafkaProducerBase { * Properties with the producer configuration. */ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig) { - this(topicId, serializationSchema, producerConfig, new FixedPartitioner()); + this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner()); } /** @@ -123,11 +140,26 @@ public class FlinkKafkaProducer09 extends FlinkKafkaProducerBase { * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { super(topicId, serializationSchema, producerConfig, customPartitioner); } + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + */ + public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, FlinkKafkaPartitioner customPartitioner) { + super(topicId, serializationSchema, producerConfig, customPartitioner); + } + @Override protected void flush() { if (this.producer != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index edbebd0..b82ebc4 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -33,17 +34,45 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner + * @deprecated Use {@link Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner) { super(topic, properties, partitioner); } + + /** + * Creates {@link KafkaTableSink} for Kafka 0.9 + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner partitioner) { + super(topic, properties, partitioner); + } + /** + * + * @param topic Kafka topic to produce to. + * @param properties Properties for the Kafka producer. + * @param serializationSchema Serialization schema to use to create Kafka records. + * @param partitioner Partitioner to select Kafka partition. + * @return The version-specific Kafka producer + * @deprecated Use {@link Kafka09JsonTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead + */ + @Deprecated @Override protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, KafkaPartitioner partitioner) { return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner); } @Override + protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, FlinkKafkaPartitioner partitioner) { + return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner); + } + + @Override protected Kafka09JsonTableSink createCopy() { return new Kafka09JsonTableSink(topic, properties, partitioner); } http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index df84a0f..ad8f623 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; @@ -26,6 +27,7 @@ import java.util.Properties; public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { + @Deprecated @Override protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner partitioner, final FlinkKafkaProducerBase kafkaProducer) { @@ -40,6 +42,19 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { } @Override + protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner partitioner, + final FlinkKafkaProducerBase kafkaProducer) { + + return new Kafka09JsonTableSink(topic, properties, partitioner) { + @Override + protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, + SerializationSchema serializationSchema, FlinkKafkaPartitioner partitioner) { + return kafkaProducer; + } + }; + } + + @Override @SuppressWarnings("unchecked") protected SerializationSchema getSerializationSchema() { return new JsonRowSerializationSchema(FIELD_NAMES); http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java index 18b2aec..e9a4947 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; @@ -83,9 +84,8 @@ public class KafkaProducerTest extends TestLogger { whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock); // (1) producer that propagates errors - FlinkKafkaProducer09 producerPropagating = new FlinkKafkaProducer09<>( - "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null); OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating)); @@ -106,7 +106,7 @@ public class KafkaProducerTest extends TestLogger { // (2) producer that only logs errors FlinkKafkaProducer09 producerLogging = new FlinkKafkaProducer09<>( - "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null); producerLogging.setLogFailuresOnly(true); testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging)); http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index c9ef6da..84fdbf8 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.kafka; import kafka.admin.AdminUtils; -import kafka.common.KafkaException; import kafka.api.PartitionMetadata; +import kafka.common.KafkaException; import kafka.network.SocketServer; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; @@ -32,8 +32,8 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; @@ -48,9 +48,9 @@ import scala.collection.Seq; import java.io.File; import java.net.BindException; import java.util.ArrayList; -import java.util.Map; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -106,14 +106,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { String topic, KeyedSerializationSchema serSchema, Properties props, - KafkaPartitioner partitioner) { + FlinkKafkaPartitioner partitioner) { FlinkKafkaProducer09 prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); return new StreamSink<>(prod); } @Override - public DataStreamSink produceIntoKafka(DataStream stream, String topic, KeyedSerializationSchema serSchema, Properties props, KafkaPartitioner partitioner) { + public DataStreamSink produceIntoKafka(DataStream stream, String topic, KeyedSerializationSchema serSchema, Properties props, FlinkKafkaPartitioner partitioner) { FlinkKafkaProducer09 prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); return stream.addSink(prod); http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index 6a7b17f..f9a1e41 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -17,6 +17,14 @@ package org.apache.flink.streaming.connectors.kafka; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.OperatorStateStore; @@ -30,6 +38,8 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; @@ -45,13 +55,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.Properties; -import java.util.Collections; -import java.util.Comparator; - import static java.util.Objects.requireNonNull; @@ -76,12 +79,6 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; /** - * Array with the partition ids of the given defaultTopicId - * The size of this array is the number of partitions - */ - protected int[] partitions; - - /** * User defined properties for the Producer */ protected final Properties producerConfig; @@ -98,9 +95,14 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im protected final KeyedSerializationSchema schema; /** - * User-provided partitioner for assigning an object to a Kafka partition. + * User-provided partitioner for assigning an object to a Kafka partition for each topic + */ + protected final FlinkKafkaPartitioner flinkKafkaPartitioner; + + /** + * Partitions for each topic */ - protected final KafkaPartitioner partitioner; + protected final Map topicPartitionsMap; /** * Flag indicating whether to accept failures (and log them), or to fail on failures @@ -111,7 +113,12 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im * If true, the producer will wait until all outstanding records have been send to the broker. */ protected boolean flushOnCheckpoint; - + + /** + * Retry times of fetching kafka meta + */ + protected long kafkaMetaRetryTimes; + // -------------------------------- Runtime fields ------------------------------------------ /** KafkaProducer instance */ @@ -133,14 +140,28 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im /** - * The main constructor for creating a FlinkKafkaProducer. + * The main constructor for creating a FlinkKafkaProducer. For customPartitioner parameter, use {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} instead * * @param defaultTopicId The default topic to write data to * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. + * @deprecated Use {@link FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { + this(defaultTopicId, serializationSchema, producerConfig, null == customPartitioner ? null : new FlinkKafkaDelegatePartitioner<>(customPartitioner)); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. + */ + public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, FlinkKafkaPartitioner customPartitioner) { requireNonNull(defaultTopicId, "TopicID not set"); requireNonNull(serializationSchema, "serializationSchema not set"); requireNonNull(producerConfig, "producerConfig not set"); @@ -150,6 +171,7 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im this.defaultTopicId = defaultTopicId; this.schema = serializationSchema; this.producerConfig = producerConfig; + this.flinkKafkaPartitioner = customPartitioner; // set the producer configuration properties for kafka record key value serializers. if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { @@ -169,7 +191,7 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties."); } - this.partitioner = customPartitioner; + this.topicPartitionsMap = new HashMap<>(); } // ---------------------------------- Properties -------------------------- @@ -177,9 +199,9 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im /** * Defines whether the producer should fail on errors, or only log them. * If this is set to true, then exceptions will be only logged, if set to false, - * exceptions will be eventually thrown and cause the streaming program to + * exceptions will be eventually thrown and cause the streaming program to * fail (and enter recovery). - * + * * @param logFailuresOnly The flag to indicate logging-only on exceptions. */ public void setLogFailuresOnly(boolean logFailuresOnly) { @@ -205,7 +227,7 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im } // ----------------------------------- Utilities -------------------------- - + /** * Initializes the connection to Kafka. */ @@ -214,27 +236,14 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im producer = getKafkaProducer(this.producerConfig); RuntimeContext ctx = getRuntimeContext(); - if (partitioner != null) { - // the fetched list is immutable, so we're creating a mutable copy in order to sort it - List partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId)); - - // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks - Collections.sort(partitionsList, new Comparator() { - @Override - public int compare(PartitionInfo o1, PartitionInfo o2) { - return Integer.compare(o1.partition(), o2.partition()); - } - }); - - partitions = new int[partitionsList.size()]; - for (int i = 0; i < partitions.length; i++) { - partitions[i] = partitionsList.get(i).partition(); + if(null != flinkKafkaPartitioner) { + if(flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) { + ((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId, this.producer)); } - - partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions); + flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); } - LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", + LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}", ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId); // register Kafka metrics to Flink accumulators @@ -281,6 +290,26 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im } } + protected int[] getPartitionsByTopic(String topic, KafkaProducer producer) { + // the fetched list is immutable, so we're creating a mutable copy in order to sort it + List partitionsList = new ArrayList<>(producer.partitionsFor(topic)); + + // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks + Collections.sort(partitionsList, new Comparator() { + @Override + public int compare(PartitionInfo o1, PartitionInfo o2) { + return Integer.compare(o1.partition(), o2.partition()); + } + }); + + int[] partitions = new int[partitionsList.size()]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = partitionsList.get(i).partition(); + } + + return partitions; + } + /** * Called when new data arrives to the sink, and forwards it to Kafka. * @@ -299,11 +328,17 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im targetTopic = defaultTopicId; } + int[] partitions = this.topicPartitionsMap.get(targetTopic); + if(null == partitions) { + partitions = this.getPartitionsByTopic(targetTopic, producer); + this.topicPartitionsMap.put(targetTopic, partitions); + } + ProducerRecord record; - if (partitioner == null) { + if (flinkKafkaPartitioner == null) { record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue); } else { - record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue); + record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue); } if (flushOnCheckpoint) { synchronized (pendingRecordsLock) { @@ -319,7 +354,7 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im if (producer != null) { producer.close(); } - + // make sure we propagate pending errors checkErroneous(); } @@ -376,15 +411,15 @@ public abstract class FlinkKafkaProducerBase extends RichSinkFunction im throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e); } } - + public static Properties getPropertiesFromBrokerList(String brokerList) { String[] elements = brokerList.split(","); - + // validate the broker addresses for (String broker: elements) { NetUtils.getCorrectHostnamePort(broker); } - + Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); return props; http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index 27c4de7..a0b5033 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -17,10 +17,11 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.types.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.types.Row; import java.util.Properties; @@ -35,10 +36,23 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink { * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner + * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner partitioner) { super(topic, properties, partitioner); } + + /** + * Creates KafkaJsonTableSink + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner partitioner) { + super(topic, properties, partitioner); + } @Override protected SerializationSchema createSerializationSchema(String[] fieldNames) { http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index a8a2fd0..0a937d6 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.sinks.AppendStreamTableSink; @@ -39,7 +41,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink { protected final String topic; protected final Properties properties; protected SerializationSchema serializationSchema; - protected final KafkaPartitioner partitioner; + protected final FlinkKafkaPartitioner partitioner; protected String[] fieldNames; protected TypeInformation[] fieldTypes; @@ -49,12 +51,27 @@ public abstract class KafkaTableSink implements AppendStreamTableSink { * @param topic Kafka topic to write to. * @param properties Properties for the Kafka consumer. * @param partitioner Partitioner to select Kafka partition for each item + * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, Properties, FlinkKafkaPartitioner)} instead */ + @Deprecated public KafkaTableSink( String topic, Properties properties, KafkaPartitioner partitioner) { + this(topic, properties, new FlinkKafkaDelegatePartitioner(partitioner)); + } + /** + * Creates KafkaTableSink + * + * @param topic Kafka topic to write to. + * @param properties Properties for the Kafka consumer. + * @param partitioner Partitioner to select Kafka partition for each item + */ + public KafkaTableSink( + String topic, + Properties properties, + FlinkKafkaPartitioner partitioner) { this.topic = Preconditions.checkNotNull(topic, "topic"); this.properties = Preconditions.checkNotNull(properties, "properties"); this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); @@ -68,13 +85,29 @@ public abstract class KafkaTableSink implements AppendStreamTableSink { * @param serializationSchema Serialization schema to use to create Kafka records. * @param partitioner Partitioner to select Kafka partition. * @return The version-specific Kafka producer + * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead */ + @Deprecated protected abstract FlinkKafkaProducerBase createKafkaProducer( String topic, Properties properties, SerializationSchema serializationSchema, KafkaPartitioner partitioner); /** + * Returns the version-specifid Kafka producer. + * + * @param topic Kafka topic to produce to. + * @param properties Properties for the Kafka producer. + * @param serializationSchema Serialization schema to use to create Kafka records. + * @param partitioner Partitioner to select Kafka partition. + * @return The version-specific Kafka producer + */ + protected abstract FlinkKafkaProducerBase createKafkaProducer( + String topic, Properties properties, + SerializationSchema serializationSchema, + FlinkKafkaPartitioner partitioner); + + /** * Create serialization schema for converting table rows into bytes. * * @param fieldNames Field names in table rows. http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java index 9b848e0..edabfe0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java @@ -48,9 +48,10 @@ import java.io.Serializable; * Not all Kafka partitions contain data * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will * cause a lot of network connections between all the Flink instances and all the Kafka brokers - * + * @deprecated Use {@link FlinkFixedPartitioner} instead. * */ +@Deprecated public class FixedPartitioner extends KafkaPartitioner implements Serializable { private static final long serialVersionUID = 1627268846962918126L; http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java new file mode 100644 index 0000000..d2eb7af --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.partitioner; + +/** + * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition. + * + * Note, one Kafka partition can contain multiple Flink partitions. + * + * Cases: + * # More Flink partitions than kafka partitions + *
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	---------------->	1
+ * 			2   --------------/
+ * 			3   -------------/
+ * 			4	------------/
+ * 
+ * Some (or all) kafka partitions contain the output of more than one flink partition + * + *# Fewer Flink partitions than Kafka + *
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	---------------->	1
+ * 			2	---------------->	2
+ * 										3
+ * 										4
+ * 										5
+ * 
+ * + * Not all Kafka partitions contain data + * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will + * cause a lot of network connections between all the Flink instances and all the Kafka brokers + * + */ +public class FlinkFixedPartitioner extends FlinkKafkaPartitioner { + + private int parallelInstanceId; + + @Override + public void open(int parallelInstanceId, int parallelInstances) { + if (parallelInstanceId < 0 || parallelInstances <= 0) { + throw new IllegalArgumentException(); + } + this.parallelInstanceId = parallelInstanceId; + } + + @Override + public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + if(null == partitions || partitions.length == 0) { + throw new IllegalArgumentException(); + } + + return partitions[parallelInstanceId % partitions.length]; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java new file mode 100644 index 0000000..469fd1b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.partitioner; + +/** + * Delegate for KafkaPartitioner + * @param + * @deprecated Delegate for {@link KafkaPartitioner}, use {@link FlinkKafkaPartitioner} instead + */ +@Deprecated +public class FlinkKafkaDelegatePartitioner extends FlinkKafkaPartitioner { + private final KafkaPartitioner kafkaPartitioner; + private int[] partitions; + + public FlinkKafkaDelegatePartitioner(KafkaPartitioner kafkaPartitioner) { + this.kafkaPartitioner = kafkaPartitioner; + } + + public void setPartitions(int[] partitions) { + this.partitions = partitions; + } + + @Override + public void open(int parallelInstanceId, int parallelInstances) { + this.kafkaPartitioner.open(parallelInstanceId, parallelInstances, partitions); + } + + @Override + public int partition(T next, byte[] serializedKey, byte[] serializedValue, String targetTopic, int[] partitions) { + return this.kafkaPartitioner.partition(next, serializedKey, serializedValue, this.partitions.length); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java new file mode 100644 index 0000000..e074b9b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import java.io.Serializable; + +/** + * It contains a open() method which is called on each parallel instance. + * Partitioners must be serializable! + */ +public abstract class FlinkKafkaPartitioner implements Serializable { + private static final long serialVersionUID = -9086719227828020494L; + + /** + * Initializer for the Partitioner. + * @param parallelInstanceId 0-indexed id of the parallel instance in Flink + * @param parallelInstances the total number of parallel instances + */ + public void open(int parallelInstanceId, int parallelInstances) { + // overwrite this method if needed. + } + + public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions); +} http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java index 37e2ef6..7c82bd1 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java @@ -22,7 +22,9 @@ import java.io.Serializable; /** * It contains a open() method which is called on each parallel instance. * Partitioners must be serializable! + * @deprecated Use {@link FlinkKafkaPartitioner} instead. */ +@Deprecated public abstract class KafkaPartitioner implements Serializable { private static final long serialVersionUID = -1974260817778593473L; @@ -32,10 +34,22 @@ public abstract class KafkaPartitioner implements Serializable { * @param parallelInstanceId 0-indexed id of the parallel instance in Flink * @param parallelInstances the total number of parallel instances * @param partitions an array describing the partition IDs of the available Kafka partitions. + * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead. */ + @Deprecated public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { // overwrite this method if needed. } + /** + * + * @param next + * @param serializedKey + * @param serializedValue + * @param numPartitions + * @return + * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], byte[], String, int[])} instead. + */ + @Deprecated public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions); }