flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/5] flink git commit: [FLINK-2372] Add new FlinkKafkaProducer bases on the new producer API
Date Mon, 07 Sep 2015 15:35:20 GMT
[FLINK-2372] Add new FlinkKafkaProducer bases on the new producer API

This closes #1082


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97ad55f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97ad55f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97ad55f1

Branch: refs/heads/master
Commit: 97ad55f1d89839dc9df0066540b5fbc260ea3f4a
Parents: 9292e5c
Author: Robert Metzger <rmetzger@apache.org>
Authored: Fri Aug 28 14:33:49 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Sep 7 15:24:02 2015 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer.java    | 225 +++++++++++++++++++
 .../streaming/connectors/kafka/KafkaSink.java   | 187 ---------------
 .../kafka/SerializableKafkaPartitioner.java     |  25 ---
 .../connectors/kafka/api/KafkaSink.java         |   3 +-
 .../kafka/partitioner/FixedPartitioner.java     |  80 +++++++
 .../kafka/partitioner/KafkaPartitioner.java     |  42 ++++
 .../connectors/kafka/KafkaConsumerTestBase.java |  23 +-
 .../connectors/kafka/KafkaProducerITCase.java   |   8 +-
 .../connectors/kafka/KafkaTestBase.java         |  14 +-
 .../connectors/kafka/TestFixedPartitioner.java  | 104 +++++++++
 .../kafka/testutils/DataGenerators.java         |  19 +-
 .../kafka/testutils/Tuple2Partitioner.java      |   6 +-
 .../util/serialization/SerializationSchema.java |   2 +-
 13 files changed, 493 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
new file mode 100644
index 0000000..3d666ee
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.List;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Array with the partition ids of the given topicId
+	 * The size of this array is the number of partitions
+	 */
+	private final int[] partitions;
+
+	/**
+	 * User defined properties for the Producer
+	 */
+	private final Properties producerConfig;
+
+	/**
+	 * The name of the topic this producer is writing data to
+	 */
+	private String topicId;
+
+	/**
+	 * (Serializable) SerializationSchema for turning objects used with Flink into
+	 * byte[] for Kafka.
+	 */
+	private SerializationSchema<IN, byte[]> schema;
+
+	/**
+	 * User-provided partitioner for assigning an object to a Kafka partition.
+	 */
+	private KafkaPartitioner partitioner;
+
+	// -------------------------------- Runtime fields ------------------------------------------
+
+	/**
+	 * KafkaProducer instance.
+	 */
+	private transient KafkaProducer<byte[], byte[]> producer;
+
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema.
+	 */
+	public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN,
byte[]> serializationSchema) {
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema.
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema,
Properties producerConfig) {
+		this(topicId, serializationSchema, producerConfig, null);
+	}
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects
into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.'
is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema,
Properties producerConfig, KafkaPartitioner customPartitioner) {
+		Preconditions.checkNotNull(topicId, "TopicID not set");
+		Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
+		Preconditions.checkNotNull(producerConfig, "producerConfig not set");
+		ClosureCleaner.ensureSerializable(customPartitioner);
+		ClosureCleaner.ensureSerializable(serializationSchema);
+
+		this.topicId = topicId;
+		this.schema = serializationSchema;
+		this.producerConfig = producerConfig;
+
+		// set the producer configuration properties.
+
+		if(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+		}
+
+		if(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+			this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+		}
+
+
+		// create a local KafkaProducer to get the list of partitions.
+		// this will also ensure locally that all required ProducerConfig values are set.
+		{
+			KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig);
+			List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId);
+
+			this.partitions = new int[partitionsList.size()];
+			for (int i = 0; i < partitions.length; i++) {
+				partitions[i] = partitionsList.get(i).partition();
+			}
+			getPartitionsProd.close();
+		}
+
+		if(customPartitioner == null) {
+			this.partitioner = new FixedPartitioner();
+		} else {
+			this.partitioner = customPartitioner;
+		}
+	}
+
+
+	/**
+	 * Initializes the connection to Kafka.
+	 */
+	@Override
+	public void open(Configuration configuration) {
+		producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
+
+		partitioner.open(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(),
partitions);
+
+		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(), topicId);
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Kafka.
+	 *
+	 * @param next
+	 * 		The incoming data
+	 */
+	@Override
+	public void invoke(IN next) {
+		byte[] serialized = schema.serialize(next);
+
+		producer.send(new ProducerRecord<byte[], byte[]>(topicId,
+			partitioner.partition(next, partitions.length),
+			null,
+			serialized),
+			new ErrorLoggingCallback(topicId, null, serialized, false));
+	}
+
+
+	@Override
+	public void close() {
+		if (producer != null) {
+			producer.close();
+		}
+	}
+
+
+	// ----------------------------------- Utilities --------------------------
+
+	public static Properties getPropertiesFromBrokerList(String brokerList) {
+		String[] elements = brokerList.split(",");
+		for(String broker: elements) {
+			NetUtils.getCorrectHostnamePort(broker);
+		}
+		Properties props = new Properties();
+		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+		return props;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
deleted file mode 100644
index d115913..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.google.common.base.Preconditions;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.DefaultEncoder;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.PartitionerWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-
-/**
- * Sink that emits its inputs to a Kafka topic.
- *
- * @param <IN>
- * 		Type of the sink input
- */
-public class KafkaSink<IN> extends RichSinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
-
-	private Producer<IN, byte[]> producer;
-	private Properties userDefinedProperties;
-	private String topicId;
-	private String brokerList;
-	private SerializationSchema<IN, byte[]> schema;
-	private SerializableKafkaPartitioner partitioner;
-	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
-
-	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 *			Addresses of the brokers
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 */
-	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]>
serializationSchema) {
-		this(brokerList, topicId, new Properties(), serializationSchema);
-	}
-
-	/**
-	 * Creates a KafkaSink for a given topic with custom Producer configuration.
-	 * If you use this constructor, the broker should be set with the "metadata.broker.list"
-	 * configuration.
-	 *
-	 * @param brokerList
-	 * 		Addresses of the brokers
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param producerConfig
-	 * 		Configurations of the Kafka producer
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 */
-	public KafkaSink(String brokerList, String topicId, Properties producerConfig, SerializationSchema<IN,
byte[]> serializationSchema) {
-		String[] elements = brokerList.split(",");
-		for(String broker: elements) {
-			NetUtils.getCorrectHostnamePort(broker);
-		}
-		Preconditions.checkNotNull(topicId, "TopicID not set");
-
-		this.brokerList = brokerList;
-		this.topicId = topicId;
-		this.schema = serializationSchema;
-		this.partitionerClass = null;
-		this.userDefinedProperties = producerConfig;
-	}
-
-	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 * @param partitioner
-	 * 		User defined partitioner.
-	 */
-	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]>
serializationSchema, SerializableKafkaPartitioner partitioner) {
-		this(brokerList, topicId, serializationSchema);
-		ClosureCleaner.ensureSerializable(partitioner);
-		this.partitioner = partitioner;
-	}
-
-	public KafkaSink(String brokerList,
-					String topicId,
-					SerializationSchema<IN, byte[]> serializationSchema,
-					Class<? extends SerializableKafkaPartitioner> partitioner) {
-		this(brokerList, topicId, serializationSchema);
-		this.partitionerClass = partitioner;
-	}
-
-	/**
-	 * Initializes the connection to Kafka.
-	 */
-	@Override
-	public void open(Configuration configuration) {
-
-		Properties properties = new Properties();
-
-		properties.put("metadata.broker.list", brokerList);
-		properties.put("request.required.acks", "-1");
-		properties.put("message.send.max.retries", "10");
-
-		properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
-
-		// this will not be used as the key will not be serialized
-		properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
-
-		for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet())
{
-			properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
-		}
-
-		if (partitioner != null) {
-			properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
-			// java serialization will do the rest.
-			properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
-		}
-		if (partitionerClass != null) {
-			properties.put("partitioner.class", partitionerClass);
-		}
-
-		ProducerConfig config = new ProducerConfig(properties);
-
-		try {
-			producer = new Producer<IN, byte[]>(config);
-		} catch (NullPointerException e) {
-			throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
-		}
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Kafka.
-	 *
-	 * @param next
-	 * 		The incoming data
-	 */
-	@Override
-	public void invoke(IN next) {
-		byte[] serialized = schema.serialize(next);
-
-		// Sending message without serializable key.
-		producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
-	}
-
-	@Override
-	public void close() {
-		if (producer != null) {
-			producer.close();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
deleted file mode 100644
index 7b9f991..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.producer.Partitioner;
-
-import java.io.Serializable;
-
-public interface SerializableKafkaPartitioner extends Serializable, Partitioner {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index c8400a5..f856926 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -17,6 +17,7 @@
 package org.apache.flink.streaming.connectors.kafka.api;
 
 
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 /**
@@ -26,7 +27,7 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
  * This class will be removed in future releases of Flink.
  */
 @Deprecated
-public class KafkaSink<IN> extends org.apache.flink.streaming.connectors.kafka.KafkaSink<IN>
{
+public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
 	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]>
serializationSchema) {
 		super(brokerList, topicId, serializationSchema);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
new file mode 100644
index 0000000..346a7d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * 	# More Flink partitions than kafka partitions
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	---------------->	1
+ * 			2   --------------/
+ * 			3   -------------/
+ * 			4	------------/
+ * </pre>
+ * 	--> Some (or all) kafka partitions contain the output of more than one flink partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	---------------->	1
+ * 			2	---------------->	2
+ * 									3
+ * 									4
+ * 									5
+ * </pre>
+ *
+ *  --> Not all Kafka partitions contain data
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note
that this will
+ *  cause a lot of network connections between all the Flink instances and all the Kafka
brokers
+ *
+ *
+ */
+public class FixedPartitioner extends KafkaPartitioner implements Serializable {
+	private static final long serialVersionUID = 1627268846962918126L;
+
+	int targetPartition = -1;
+
+	@Override
+	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+		int p = 0;
+		for(int i = 0; i < parallelInstances; i++) {
+			if(i == parallelInstanceId) {
+				targetPartition = partitions[p];
+				return;
+			}
+			if(++p == partitions.length) {
+				p = 0;
+			}
+		}
+	}
+
+	@Override
+	public int partition(Object element, int numPartitions) {
+		if(targetPartition == -1) {
+			throw new RuntimeException("The partitioner has not been initialized properly");
+		}
+		return targetPartition;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
new file mode 100644
index 0000000..55519f0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+
+import kafka.producer.Partitioner;
+
+import java.io.Serializable;
+
+/**
+ * Extended Kafka Partitioner.
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners have to be serializable!
+ */
+public abstract class KafkaPartitioner implements Partitioner, Serializable {
+
+	private static final long serialVersionUID = -1974260817778593473L;
+
+	/**
+	 * Initializer for the Partitioner.
+	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+	 * @param parallelInstances the total number of parallel instances
+	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
+	 */
+	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+		// overwrite this method if needed.
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index effb3c8..f105183 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-
 import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
 import kafka.consumer.Consumer;
@@ -67,6 +66,7 @@ import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.util.Collector;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.Assert;
 
 import scala.collection.Seq;
@@ -290,7 +290,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings,
topic, sinkSchema));
+		stream.addSink(new FlinkKafkaProducer<>(brokerConnectionStrings, topic, sinkSchema));
 
 		// ----------- add consumer dataflow ----------
 
@@ -722,7 +722,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// add producing topology
 		Properties producerProps = new Properties();
-		producerProps.setProperty("max.message.size", Integer.toString(1024 * 1024 * 30));
+		producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30));
+		producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
 
 		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long,
byte[]>>() {
 
@@ -760,8 +761,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 		});
 
-		stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings,
topic,
-				producerProps, deserSchema));
+		stream.addSink(new FlinkKafkaProducer<>(topic, deserSchema, producerProps));
 
 		tryExecute(env, "big topology test");
 
@@ -806,6 +806,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		zkClient.close();
 
 		final String leaderToShutDown = firstPart.leader().get().connectionString();
+		final int leaderIdToShutDown = firstPart.leader().get().id();
 		LOG.info("Leader to shutdown {}", leaderToShutDown);
 
 
@@ -832,10 +833,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		BrokerKillingMapper.killedLeaderBefore = false;
 		tryExecute(env, "One-to-one exactly once test");
 
-		// this cannot be reliably checked, as checkpoints come in time intervals, and
-		// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					BrokerKillingMapper.hasBeenCheckpointedBeforeFailure);
+		// start a new broker:
+		brokers.set(leaderIdToShutDown, getKafkaServer(leaderIdToShutDown, tmpKafkaDirs.get(leaderIdToShutDown),
kafkaHost, zookeeperConnectionString));
 
 		LOG.info("finished runBrokerFailureTest()");
 	}
@@ -920,9 +919,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 		}).setParallelism(parallelism);
 		
-		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
-				topicName,
-				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType,
env.getConfig()),
+		stream.addSink(new FlinkKafkaProducer<>(topicName,
+				new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
+				FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings),
 				new Tuple2Partitioner(parallelism)
 		)).setParallelism(parallelism);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
index 5903844..5001364 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
@@ -26,11 +26,14 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
 import org.junit.Test;
 
+import java.io.Serializable;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -100,8 +103,7 @@ public class KafkaProducerITCase extends KafkaTestBase {
 			.setParallelism(1);
 			
 			// sink partitions into 
-			stream.addSink(new KafkaSink<Tuple2<Long, String>>(
-					brokerConnectionStrings, topic,serSchema, new CustomPartitioner(parallelism)))
+			stream.addSink(new FlinkKafkaProducer<>(topic, serSchema, FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings),
new CustomPartitioner(parallelism)))
 			.setParallelism(parallelism);
 
 			// ------ consuming topology ---------
@@ -165,7 +167,7 @@ public class KafkaProducerITCase extends KafkaTestBase {
 	
 	// ------------------------------------------------------------------------
 
-	public static class CustomPartitioner implements SerializableKafkaPartitioner {
+	public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
 
 		private final int expectedPartitions;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 1f44dc2..271ae74 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -96,9 +96,11 @@ public abstract class KafkaTestBase extends TestLogger {
 	protected static int flinkPort;
 
 	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-	
-	
-	
+
+	protected static List<File> tmpKafkaDirs;
+
+	protected static String kafkaHost = "localhost";
+
 	// ------------------------------------------------------------------------
 	//  Setup and teardown of the mini clusters
 	// ------------------------------------------------------------------------
@@ -119,14 +121,14 @@ public abstract class KafkaTestBase extends TestLogger {
 		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
 
-		List<File> tmpKafkaDirs = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
+		tmpKafkaDirs = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
 		for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
 			File tmpDir = new File(tmpKafkaParent, "server-" + i);
 			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
 			tmpKafkaDirs.add(tmpDir);
 		}
 
-		String kafkaHost = "localhost";
+
 		int zkPort = NetUtils.getAvailablePort();
 		zookeeperConnectionString = "localhost:" + zkPort;
 
@@ -241,7 +243,7 @@ public abstract class KafkaTestBase extends TestLogger {
 	/**
 	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
 	 */
-	private static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
+	protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
 												String kafkaHost,
 												String zookeeperConnectionString) throws Exception {
 		Properties kafkaProperties = new Properties();

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
new file mode 100644
index 0000000..75fdd46
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+
+public class TestFixedPartitioner {
+
+
+	/**
+	 * <pre>
+	 *   		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2   --------------/
+	 * 			3   -------------/
+	 * 			4	------------/
+	 * </pre>
+	 */
+	@Test
+	public void testMoreFlinkThanBrokers() {
+		FixedPartitioner part = new FixedPartitioner();
+
+		int[] partitions = new int[]{0};
+
+		part.open(0, 4, partitions);
+		Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+		part.open(1, 4, partitions);
+		Assert.assertEquals(0, part.partition("abc2", partitions.length));
+
+		part.open(2, 4, partitions);
+		Assert.assertEquals(0, part.partition("abc3", partitions.length));
+		Assert.assertEquals(0, part.partition("abc3", partitions.length)); // check if it is changing
;)
+
+		part.open(3, 4, partitions);
+		Assert.assertEquals(0, part.partition("abc4", partitions.length));
+	}
+
+	/**
+	 *
+	 * <pre>
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2	---------------->	2
+	 * 									3
+	 * 									4
+	 * 									5
+	 *
+	 * </pre>
+	 */
+	@Test
+	public void testFewerPartitions() {
+		FixedPartitioner part = new FixedPartitioner();
+
+		int[] partitions = new int[]{0, 1, 2, 3, 4};
+		part.open(0, 2, partitions);
+		Assert.assertEquals(0, part.partition("abc1", partitions.length));
+		Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+		part.open(1, 2, partitions);
+		Assert.assertEquals(1, part.partition("abc1", partitions.length));
+		Assert.assertEquals(1, part.partition("abc1", partitions.length));
+	}
+
+	/*
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	------------>--->	1
+	 * 			2	-----------/----> 	2
+	 * 			3	----------/
+	 */
+	@Test
+	public void testMixedCase() {
+		FixedPartitioner part = new FixedPartitioner();
+		int[] partitions = new int[]{0,1};
+
+		part.open(0, 3, partitions);
+		Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+		part.open(1, 3, partitions);
+		Assert.assertEquals(1, part.partition("abc1", partitions.length));
+
+		part.open(2, 3, partitions);
+		Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index a887e4f..32377ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -26,8 +26,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.KafkaSink;
-import org.apache.flink.streaming.connectors.kafka.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
@@ -69,8 +69,9 @@ public class DataGenerators {
 					}
 				});
 
-		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnection, topic,
-				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType,
env.getConfig()),
+		stream.addSink(new FlinkKafkaProducer<>(topic,
+				new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
+				FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
 				new Tuple2Partitioner(numPartitions)
 		));
 
@@ -131,9 +132,10 @@ public class DataGenerators {
 
 		stream
 				.rebalance()
-				.addSink(new KafkaSink<>(brokerConnection, topic,
+				.addSink(new FlinkKafkaProducer<>(topic,
 						new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
-						new SerializableKafkaPartitioner() {
+						FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
+						new KafkaPartitioner() {
 							@Override
 							public int partition(Object key, int numPartitions) {
 								return ((Integer) key) % numPartitions;
@@ -164,9 +166,10 @@ public class DataGenerators {
 		@Override
 		public void run() {
 			// we manually feed data into the Kafka sink
-			KafkaSink<String> producer = null;
+			FlinkKafkaProducer<String> producer = null;
 			try {
-				producer = new KafkaSink<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
+				producer = new FlinkKafkaProducer<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
+				producer.setRuntimeContext(new MockRuntimeContext(1,0));
 				producer.open(new Configuration());
 				
 				final StringBuilder bld = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
index 6ffaac4..b762e21 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -19,13 +19,15 @@
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.kafka.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+
+import java.io.Serializable;
 
 /**
  * Special partitioner that uses the first field of a 2-tuple as the partition,
  * and that expects a specific number of partitions.
  */
-public class Tuple2Partitioner implements SerializableKafkaPartitioner {
+public class Tuple2Partitioner extends KafkaPartitioner implements Serializable {
 	
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97ad55f1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
index 553ad61..21342b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
@@ -36,5 +36,5 @@ public interface SerializationSchema<T, R> extends Serializable {
 	 *            The incoming element to be serialized
 	 * @return The serialized element.
 	 */
-	public R serialize(T element);
+	R serialize(T element);
 }


Mime
View raw message