flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [3/6] flink git commit: [FLINK-6288] [kafka] New custom partitioner API that correctly handles multiple Kafka sink topics
Date Fri, 19 May 2017 06:42:43 GMT
[FLINK-6288] [kafka] New custom partitioner API that correctly handles multiple Kafka sink topics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58c4eed5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58c4eed5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58c4eed5

Branch: refs/heads/release-1.3
Commit: 58c4eed5accb0912472735bb00c148f6344a5679
Parents: 1a43bad
Author: zjureel <zjureel@gmail.com>
Authored: Mon May 15 17:41:47 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Fri May 19 14:41:54 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java |  82 ++++++++++--
 .../connectors/kafka/Kafka010ITCase.java        |   6 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   8 +-
 .../connectors/kafka/FlinkKafkaProducer.java    |   9 +-
 .../connectors/kafka/FlinkKafkaProducer08.java  |  40 +++++-
 .../connectors/kafka/Kafka08JsonTableSink.java  |  17 +++
 .../kafka/Kafka08JsonTableSinkTest.java         |  14 ++
 .../connectors/kafka/KafkaProducerTest.java     |   5 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   6 +-
 .../connectors/kafka/FlinkKafkaProducer09.java  |  42 +++++-
 .../connectors/kafka/Kafka09JsonTableSink.java  |  29 +++++
 .../kafka/Kafka09JsonTableSinkTest.java         |  15 +++
 .../connectors/kafka/KafkaProducerTest.java     |   6 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  10 +-
 .../kafka/FlinkKafkaProducerBase.java           | 127 ++++++++++++-------
 .../connectors/kafka/KafkaJsonTableSink.java    |  16 ++-
 .../connectors/kafka/KafkaTableSink.java        |  35 ++++-
 .../kafka/partitioner/FixedPartitioner.java     |   3 +-
 .../partitioner/FlinkFixedPartitioner.java      |  71 +++++++++++
 .../FlinkKafkaDelegatePartitioner.java          |  47 +++++++
 .../partitioner/FlinkKafkaPartitioner.java      |  39 ++++++
 .../kafka/partitioner/KafkaPartitioner.java     |  14 ++
 .../connectors/kafka/KafkaConsumerTestBase.java | 106 ++++++++--------
 .../connectors/kafka/KafkaProducerTestBase.java |  11 +-
 .../kafka/KafkaTableSinkTestBase.java           |  56 +++++++-
 .../connectors/kafka/KafkaTestEnvironment.java  |  14 +-
 .../kafka/TestFlinkFixedPartitioner.java        | 104 +++++++++++++++
 .../TestFlinkKafkaDelegatePartitioner.java      | 111 ++++++++++++++++
 .../kafka/testutils/DataGenerators.java         |  20 +--
 .../kafka/testutils/Tuple2FlinkPartitioner.java |  45 +++++++
 .../kafka/testutils/Tuple2Partitioner.java      |   9 +-
 31 files changed, 937 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index cc0194b..7addafa 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.util.Properties;
+
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -27,7 +29,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -35,8 +38,6 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
-import java.util.Properties;
-
 import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
 
 
@@ -87,7 +88,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 																					String topicId,
 																					KeyedSerializationSchema<T> serializationSchema,
 																					Properties producerConfig) {
-		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
+		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
 	}
 
 
@@ -106,7 +107,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 																					String topicId,
 																					SerializationSchema<T> serializationSchema,
 																					Properties producerConfig) {
-		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
+		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
 	}
 
 	/**
@@ -120,7 +121,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *  @deprecated Use {@link FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
 																					String topicId,
 																					KeyedSerializationSchema<T> serializationSchema,
@@ -133,6 +136,30 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
 	}
 
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 *  @param inStream The stream to write to Kafka
+	 *  @param topicId The name of the target topic
+	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 */
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig,
+																					FlinkKafkaPartitioner<T> customPartitioner) {
+
+		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
+		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+	}
+
 	// ---------------------- Regular constructors w/o timestamp support  ------------------
 
 	/**
@@ -147,7 +174,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * 			User defined (keyless) serialization schema.
 	 */
 	public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
 	}
 
 	/**
@@ -162,7 +189,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
 	}
 
 	/**
@@ -173,11 +200,26 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 	}
 
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 */
+	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+	}
+
 	// ------------------- Key/Value serialization schema constructors ----------------------
 
 	/**
@@ -192,7 +234,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * 			User defined serialization schema supporting key/value messages
 	 */
 	public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
-		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
 	}
 
 	/**
@@ -207,19 +249,32 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
-		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
+		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
 	}
 
 	/**
 	 * Create Kafka producer
 	 *
 	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
 		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
 		// invoke call.
 		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
 	}
+	
+	/**
+	 * Create Kafka producer
+	 *
+	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 */
+	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
+		// invoke call.
+		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+	}
 
 
 	// ----------------------------- Generic element processing  ---------------------------
@@ -243,10 +298,15 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		}
 
 		ProducerRecord<byte[], byte[]> record;
-		if (internalProducer.partitioner == null) {
+		int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
+		if(null == partitions) {
+			partitions = internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer);
+			internalProducer.topicPartitionsMap.put(targetTopic, partitions);
+		}
+		if (internalProducer.flinkKafkaPartitioner == null) {
 			record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
 		} else {
-			record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
+			record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
 		}
 		if (internalProducer.flushOnCheckpoint) {
 			synchronized (internalProducer.pendingRecordsLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 39b2b8f..add623e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -208,11 +208,11 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		});
 
 		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
-		FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() {
+		FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new FlinkKafkaPartitioner<Long>() {
 			private static final long serialVersionUID = -6730989584364230617L;
 
 			@Override
-			public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+			public int partition(Long next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
 				return (int)(next % 3);
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index d27e53a..c88c858 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -30,8 +30,8 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -46,9 +46,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -116,7 +116,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return new StreamSink<>(prod);
@@ -124,7 +124,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 
 	@Override
-	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return stream.addSink(prod);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index a7b89f8..98dac3e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -37,7 +38,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null);
+		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
 	}
 
 	/**
@@ -45,7 +46,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
+		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, (FlinkKafkaPartitioner<IN>)null);
 	}
 
 	/**
@@ -62,7 +63,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
-		super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+		super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
 	}
 
 	/**
@@ -70,7 +71,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		super(topicId, serializationSchema, producerConfig, null);
+		super(topicId, serializationSchema, producerConfig, (FlinkKafkaPartitioner<IN>)null);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 65de5fc..64d3716 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -50,7 +51,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * 			User defined (keyless) serialization schema.
 	 */
 	public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -65,7 +66,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -75,12 +76,27 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 
 	}
 
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+	}
+
 	// ------------------- Key/Value serialization schema constructors ----------------------
 
 	/**
@@ -95,7 +111,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * 			User defined serialization schema supporting key/value messages
 	 */
 	public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
-		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -110,7 +126,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -120,11 +136,25 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	}
+
 	@Override
 	protected void flush() {
 		// The Kafka 0.8 producer doesn't support flushing, we wait here

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 839388f..5a066ec0 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -38,6 +39,17 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
+	
+	/**
+	 * Creates {@link KafkaTableSink} for Kafka 0.8
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 * @param partitioner Kafka partitioner
+	 */
+	public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+		super(topic, properties, partitioner);
+	}
 
 	@Override
 	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
@@ -45,6 +57,11 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	}
 
 	@Override
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+		return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
+	}
+
+	@Override
 	protected Kafka08JsonTableSink createCopy() {
 		return new Kafka08JsonTableSink(topic, properties, partitioner);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 0ac452e..164c162 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
@@ -40,6 +41,19 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 	}
 
 	@Override
+	protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
+			final FlinkKafkaProducerBase<Row> kafkaProducer) {
+		
+		return new Kafka08JsonTableSink(topic, properties, partitioner) {
+			@Override
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+					SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+				return kafkaProducer;
+			}
+		};
+	}
+
+	@Override
 	@SuppressWarnings("unchecked")
 	protected SerializationSchema<Row> getSerializationSchema() {
 		return new JsonRowSerializationSchema(FIELD_NAMES);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 65d7596..c7da5af 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
@@ -83,7 +84,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (1) producer that propagates errors
 
 			FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null);
 
 			OneInputStreamOperatorTestHarness<String, Object> testHarness =
 					new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
@@ -106,7 +107,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (2) producer that only logs errors
 
 			FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null);
 			producerLogging.setLogFailuresOnly(true);
 
 			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 643ee8e..2419b53 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,8 +36,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -109,7 +109,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			String topic,
 			KeyedSerializationSchema<T> serSchema,
 			Properties props,
-			KafkaPartitioner<T> partitioner) {
+			FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(
 				topic,
 				serSchema,
@@ -120,7 +120,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return stream.addSink(prod);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 2a3e39d..4f41c43 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -51,7 +52,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * 			User defined (keyless) serialization schema.
 	 */
 	public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -66,7 +67,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -77,12 +78,28 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 
 	}
 
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 */
+	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+	}
+
 	// ------------------- Key/Value serialization schema constructors ----------------------
 
 	/**
@@ -97,7 +114,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * 			User defined serialization schema supporting key/value messages
 	 */
 	public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
-		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -112,7 +129,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -123,11 +140,26 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	}
+
 	@Override
 	protected void flush() {
 		if (this.producer != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index edbebd0..b82ebc4 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -33,17 +34,45 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
+	 * @deprecated Use {@link Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
+	
+	/**
+	 * Creates {@link KafkaTableSink} for Kafka 0.9
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 * @param partitioner Kafka partitioner
+	 */
+	public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+		super(topic, properties, partitioner);
+	}
 
+	/**
+	 *
+	 * @param topic               Kafka topic to produce to.
+	 * @param properties          Properties for the Kafka producer.
+	 * @param serializationSchema Serialization schema to use to create Kafka records.
+	 * @param partitioner         Partitioner to select Kafka partition.
+	 * @return The version-specific Kafka producer
+	 * @deprecated Use {@link Kafka09JsonTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
+	 */
+	@Deprecated
 	@Override
 	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
 		return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
 	}
 
 	@Override
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+		return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
+	}
+
+	@Override
 	protected Kafka09JsonTableSink createCopy() {
 		return new Kafka09JsonTableSink(topic, properties, partitioner);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index df84a0f..ad8f623 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
@@ -26,6 +27,7 @@ import java.util.Properties;
 
 public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 
+	@Deprecated
 	@Override
 	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
 			final FlinkKafkaProducerBase<Row> kafkaProducer) {
@@ -40,6 +42,19 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 	}
 
 	@Override
+	protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
+			final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+		return new Kafka09JsonTableSink(topic, properties, partitioner) {
+			@Override
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+					SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+				return kafkaProducer;
+			}
+		};
+	}
+
+	@Override
 	@SuppressWarnings("unchecked")
 	protected SerializationSchema<Row> getSerializationSchema() {
 		return new JsonRowSerializationSchema(FIELD_NAMES);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 18b2aec..e9a4947 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
@@ -83,9 +84,8 @@ public class KafkaProducerTest extends TestLogger {
 			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
 			
 			// (1) producer that propagates errors
-
 			FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>)null);
 
 			OneInputStreamOperatorTestHarness<String, Object> testHarness =
 					new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
@@ -106,7 +106,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (2) producer that only logs errors
 
 			FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>)null);
 			producerLogging.setLogFailuresOnly(true);
 
 			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c9ef6da..84fdbf8 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
 import kafka.api.PartitionMetadata;
+import kafka.common.KafkaException;
 import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
@@ -32,8 +32,8 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -48,9 +48,9 @@ import scala.collection.Seq;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -106,14 +106,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			String topic,
 			KeyedSerializationSchema<T> serSchema,
 			Properties props,
-			KafkaPartitioner<T> partitioner) {
+			FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return new StreamSink<>(prod);
 	}
 
 	@Override
-	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return stream.addSink(prod);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 6a7b17f..f9a1e41 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -17,6 +17,14 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -30,6 +38,8 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -45,13 +55,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Collections;
-import java.util.Comparator;
-
 import static java.util.Objects.requireNonNull;
 
 
@@ -76,12 +79,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
 
 	/**
-	 * Array with the partition ids of the given defaultTopicId
-	 * The size of this array is the number of partitions
-	 */
-	protected int[] partitions;
-
-	/**
 	 * User defined properties for the Producer
 	 */
 	protected final Properties producerConfig;
@@ -98,9 +95,14 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	protected final KeyedSerializationSchema<IN> schema;
 
 	/**
-	 * User-provided partitioner for assigning an object to a Kafka partition.
+	 * User-provided partitioner for assigning an object to a Kafka partition for each topic
+	 */
+	protected final FlinkKafkaPartitioner flinkKafkaPartitioner;
+
+	/**
+	 * Partitions for each topic
 	 */
-	protected final KafkaPartitioner<IN> partitioner;
+	protected final Map<String, int[]> topicPartitionsMap;
 
 	/**
 	 * Flag indicating whether to accept failures (and log them), or to fail on failures
@@ -111,7 +113,12 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	 * If true, the producer will wait until all outstanding records have been send to the broker.
 	 */
 	protected boolean flushOnCheckpoint;
-	
+
+	/**
+	 * Retry times of fetching kafka meta
+	 */
+	protected long kafkaMetaRetryTimes;
+
 	// -------------------------------- Runtime fields ------------------------------------------
 
 	/** KafkaProducer instance */
@@ -133,14 +140,28 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * The main constructor for creating a FlinkKafkaProducer. For customPartitioner parameter, use {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} instead
 	 *
 	 * @param defaultTopicId The default topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 * @deprecated Use {@link FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		this(defaultTopicId, serializationSchema, producerConfig, null == customPartitioner ? null : new FlinkKafkaDelegatePartitioner<>(customPartitioner));
+	}
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param defaultTopicId The default topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 */
+	public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
 		requireNonNull(defaultTopicId, "TopicID not set");
 		requireNonNull(serializationSchema, "serializationSchema not set");
 		requireNonNull(producerConfig, "producerConfig not set");
@@ -150,6 +171,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		this.defaultTopicId = defaultTopicId;
 		this.schema = serializationSchema;
 		this.producerConfig = producerConfig;
+		this.flinkKafkaPartitioner = customPartitioner;
 
 		// set the producer configuration properties for kafka record key value serializers.
 		if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
@@ -169,7 +191,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 			throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
 		}
 
-		this.partitioner = customPartitioner;
+		this.topicPartitionsMap = new HashMap<>();
 	}
 
 	// ---------------------------------- Properties --------------------------
@@ -177,9 +199,9 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	/**
 	 * Defines whether the producer should fail on errors, or only log them.
 	 * If this is set to true, then exceptions will be only logged, if set to false,
-	 * exceptions will be eventually thrown and cause the streaming program to 
+	 * exceptions will be eventually thrown and cause the streaming program to
 	 * fail (and enter recovery).
-	 * 
+	 *
 	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
 	 */
 	public void setLogFailuresOnly(boolean logFailuresOnly) {
@@ -205,7 +227,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	}
 
 	// ----------------------------------- Utilities --------------------------
-	
+
 	/**
 	 * Initializes the connection to Kafka.
 	 */
@@ -214,27 +236,14 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		producer = getKafkaProducer(this.producerConfig);
 
 		RuntimeContext ctx = getRuntimeContext();
-		if (partitioner != null) {
-			// the fetched list is immutable, so we're creating a mutable copy in order to sort it
-			List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
-
-			// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
-			Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
-				@Override
-				public int compare(PartitionInfo o1, PartitionInfo o2) {
-					return Integer.compare(o1.partition(), o2.partition());
-				}
-			});
-
-			partitions = new int[partitionsList.size()];
-			for (int i = 0; i < partitions.length; i++) {
-				partitions[i] = partitionsList.get(i).partition();
+		if(null != flinkKafkaPartitioner) {
+			if(flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
+				((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId, this.producer));
 			}
-
-			partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
+			flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
 		}
 
-		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", 
+		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}",
 				ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
 
 		// register Kafka metrics to Flink accumulators
@@ -281,6 +290,26 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		}
 	}
 
+	protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
+		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
+		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
+
+		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+				@Override
+				public int compare(PartitionInfo o1, PartitionInfo o2) {
+				return Integer.compare(o1.partition(), o2.partition());
+			}
+			});
+
+		int[] partitions = new int[partitionsList.size()];
+		for (int i = 0; i < partitions.length; i++) {
+			partitions[i] = partitionsList.get(i).partition();
+		}
+
+		return partitions;
+	}
+
 	/**
 	 * Called when new data arrives to the sink, and forwards it to Kafka.
 	 *
@@ -299,11 +328,17 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 			targetTopic = defaultTopicId;
 		}
 
+		int[] partitions = this.topicPartitionsMap.get(targetTopic);
+		if(null == partitions) {
+			partitions = this.getPartitionsByTopic(targetTopic, producer);
+			this.topicPartitionsMap.put(targetTopic, partitions);
+		}
+
 		ProducerRecord<byte[], byte[]> record;
-		if (partitioner == null) {
+		if (flinkKafkaPartitioner == null) {
 			record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
 		} else {
-			record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);
+			record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue);
 		}
 		if (flushOnCheckpoint) {
 			synchronized (pendingRecordsLock) {
@@ -319,7 +354,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		if (producer != null) {
 			producer.close();
 		}
-		
+
 		// make sure we propagate pending errors
 		checkErroneous();
 	}
@@ -376,15 +411,15 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
 		}
 	}
-	
+
 	public static Properties getPropertiesFromBrokerList(String brokerList) {
 		String[] elements = brokerList.split(",");
-		
+
 		// validate the broker addresses
 		for (String broker: elements) {
 			NetUtils.getCorrectHostnamePort(broker);
 		}
-		
+
 		Properties props = new Properties();
 		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
 		return props;

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index 27c4de7..a0b5033 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -17,10 +17,11 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.types.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
@@ -35,10 +36,23 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink {
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
+	 * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
+	
+	/**
+	 * Creates KafkaJsonTableSink
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 * @param partitioner Kafka partitioner
+	 */
+	public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+		super(topic, properties, partitioner);
+	}
 
 	@Override
 	protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index a8a2fd0..0a937d6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
@@ -39,7 +41,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	protected final String topic;
 	protected final Properties properties;
 	protected SerializationSchema<Row> serializationSchema;
-	protected final KafkaPartitioner<Row> partitioner;
+	protected final FlinkKafkaPartitioner<Row> partitioner;
 	protected String[] fieldNames;
 	protected TypeInformation[] fieldTypes;
 
@@ -49,12 +51,27 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	 * @param topic                 Kafka topic to write to.
 	 * @param properties            Properties for the Kafka consumer.
 	 * @param partitioner           Partitioner to select Kafka partition for each item
+	 * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public KafkaTableSink(
 			String topic,
 			Properties properties,
 			KafkaPartitioner<Row> partitioner) {
+		this(topic, properties, new FlinkKafkaDelegatePartitioner<Row>(partitioner));
+	}
 
+	/**
+	 * Creates KafkaTableSink
+	 * 
+	 * @param topic                 Kafka topic to write to.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param partitioner           Partitioner to select Kafka partition for each item
+	 */
+	public KafkaTableSink(
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner) {
 		this.topic = Preconditions.checkNotNull(topic, "topic");
 		this.properties = Preconditions.checkNotNull(properties, "properties");
 		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
@@ -68,13 +85,29 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	 * @param serializationSchema Serialization schema to use to create Kafka records.
 	 * @param partitioner         Partitioner to select Kafka partition.
 	 * @return The version-specific Kafka producer
+	 * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
 		String topic, Properties properties,
 		SerializationSchema<Row> serializationSchema,
 		KafkaPartitioner<Row> partitioner);
 
 	/**
+	 * Returns the version-specifid Kafka producer.
+	 *
+	 * @param topic               Kafka topic to produce to.
+	 * @param properties          Properties for the Kafka producer.
+	 * @param serializationSchema Serialization schema to use to create Kafka records.
+	 * @param partitioner         Partitioner to select Kafka partition.
+	 * @return The version-specific Kafka producer
+	 */
+	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
+		String topic, Properties properties,
+		SerializationSchema<Row> serializationSchema,
+		FlinkKafkaPartitioner<Row> partitioner);
+
+	/**
 	 * Create serialization schema for converting table rows into bytes.
 	 *
 	 * @param fieldNames Field names in table rows.

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
index 9b848e0..edabfe0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -48,9 +48,10 @@ import java.io.Serializable;
  *  Not all Kafka partitions contain data
  *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
  *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
+ *  @deprecated Use {@link FlinkFixedPartitioner} instead.
  *
  */
+@Deprecated
 public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
 	private static final long serialVersionUID = 1627268846962918126L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
new file mode 100644
index 0000000..d2eb7af
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * 	# More Flink partitions than kafka partitions
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	----------------&gt;	1
+ * 			2   --------------/
+ * 			3   -------------/
+ * 			4	------------/
+ * </pre>
+ * Some (or all) kafka partitions contain the output of more than one flink partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	----------------&gt;	1
+ * 			2	----------------&gt;	2
+ * 										3
+ * 										4
+ * 										5
+ * </pre>
+ *
+ *  Not all Kafka partitions contain data
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
+ *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
+ *
+ */
+public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
+
+	private int parallelInstanceId;
+
+	@Override
+	public void open(int parallelInstanceId, int parallelInstances) {
+		if (parallelInstanceId < 0 || parallelInstances <= 0) {
+			throw new IllegalArgumentException();
+		}
+		this.parallelInstanceId = parallelInstanceId;
+	}
+	
+	@Override
+	public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+		if(null == partitions || partitions.length == 0) {
+			throw new IllegalArgumentException();
+		}
+		
+		return partitions[parallelInstanceId % partitions.length];
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
new file mode 100644
index 0000000..469fd1b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+/**
+ * Delegate for KafkaPartitioner
+ * @param <T>
+ * @deprecated Delegate for {@link KafkaPartitioner}, use {@link FlinkKafkaPartitioner} instead
+ */
+@Deprecated
+public class FlinkKafkaDelegatePartitioner<T> extends FlinkKafkaPartitioner<T> {
+	private final KafkaPartitioner<T> kafkaPartitioner;
+	private int[] partitions;
+
+	public FlinkKafkaDelegatePartitioner(KafkaPartitioner<T> kafkaPartitioner) {
+		this.kafkaPartitioner = kafkaPartitioner;
+	}
+
+	public void setPartitions(int[] partitions) {
+		this.partitions = partitions;
+	}
+
+	@Override
+	public void open(int parallelInstanceId, int parallelInstances) {
+		this.kafkaPartitioner.open(parallelInstanceId, parallelInstances, partitions);
+	}
+
+	@Override
+	public int partition(T next, byte[] serializedKey, byte[] serializedValue, String targetTopic, int[] partitions) {
+		return this.kafkaPartitioner.partition(next, serializedKey, serializedValue, this.partitions.length);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
new file mode 100644
index 0000000..e074b9b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners must be serializable!
+ */
+public abstract class FlinkKafkaPartitioner<T> implements Serializable {
+	private static final long serialVersionUID = -9086719227828020494L;
+
+	/**
+	 * Initializer for the Partitioner.
+	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+	 * @param parallelInstances the total number of parallel instances
+	 */
+	public void open(int parallelInstanceId, int parallelInstances) {
+		// overwrite this method if needed.
+	}
+	
+	public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 37e2ef6..7c82bd1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -22,7 +22,9 @@ import java.io.Serializable;
 /**
  * It contains a open() method which is called on each parallel instance.
  * Partitioners must be serializable!
+ * @deprecated Use {@link FlinkKafkaPartitioner} instead.
  */
+@Deprecated
 public abstract class KafkaPartitioner<T> implements Serializable {
 
 	private static final long serialVersionUID = -1974260817778593473L;
@@ -32,10 +34,22 @@ public abstract class KafkaPartitioner<T> implements Serializable {
 	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
 	 * @param parallelInstances the total number of parallel instances
 	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
+	 * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead.
 	 */
+	@Deprecated
 	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
 		// overwrite this method if needed.
 	}
 
+	/**
+	 *
+	 * @param next
+	 * @param serializedKey
+	 * @param serializedValue
+	 * @param numPartitions
+	 * @return
+	 * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], byte[], String, int[])} instead.
+	 */
+	@Deprecated
 	public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
 }


Mime
View raw message