flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [7/9] flink git commit: [FLINK-3058] Add support for Kafka 0.9.0.0
Date Wed, 20 Jan 2016 19:31:52 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
new file mode 100644
index 0000000..9faa249
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
+ * Apache Kafka 0.9.x. The consumer can run in multiple parallel instances, each of which will pull
+ * data from one or more Kafka partitions. 
+ * 
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once". 
+ * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ *
+ * <p>Please refer to Kafka's documentation for the available configuration properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
+ *
+ * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
+ * is constructed. That means that the client that submits the program needs to be able to
+ * reach the Kafka brokers or ZooKeeper.</p>
+ */
+public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
+
+	// ------------------------------------------------------------------------
+	
+	private static final long serialVersionUID = 2324564345203409112L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
+
+	/**  Configuration key to change the polling timeout **/
+	public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
+
+	/** Boolean configuration key to disable metrics tracking **/
+	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+	/**
+	 * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+	 * available. If 0, returns immediately with any records that are available now.
+	 */
+	public static final long DEFAULT_POLL_TIMEOUT = 100L;
+
+	/** User-supplied properties for Kafka **/
+	private final Properties properties;
+	/** Ordered list of all partitions available in all subscribed partitions **/
+	private final List<KafkaTopicPartition> partitionInfos;
+
+	// ------  Runtime State  -------
+
+	/** The partitions actually handled by this consumer at runtime */
+	private transient List<TopicPartition> subscribedPartitions;
+	/** For performance reasons, we are keeping two representations of the subscribed partitions **/
+	private transient List<KafkaTopicPartition> subscribedPartitionsAsFlink;
+	/** The Kafka Consumer instance**/
+	private transient KafkaConsumer<byte[], byte[]> consumer;
+	/** The thread running Kafka's consumer **/
+	private transient ConsumerThread<T> consumerThread;
+	/** Exception set from the ConsumerThread */
+	private transient Throwable consumerThreadException;
+	/** If the consumer doesn't have a Kafka partition assigned at runtime, it'll block on this waitThread **/
+	private transient Thread waitThread;
+
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+		this(Collections.singletonList(topic), valueDeserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+	 *
+	 * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		this(Collections.singletonList(topic), deserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+	 *
+	 * This constructor allows passing multiple topics to the consumer.
+	 *
+	 * @param topics
+	 *           The Kafka topics to read from.
+	 * @param deserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 */
+	public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
+		this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x
+	 *
+	 * This constructor allows passing multiple topics and a key/value deserialization schema.
+	 *
+	 * @param topics
+	 *           The Kafka topics to read from.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 */
+	public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		super(deserializer, props);
+		checkNotNull(topics, "topics");
+		this.properties = checkNotNull(props, "props");
+		setDeserializer(this.properties);
+		KafkaConsumer<byte[], byte[]> consumer = null;
+		try {
+			consumer = new KafkaConsumer<>(this.properties);
+			this.partitionInfos = new ArrayList<>();
+			for (final String topic: topics) {
+				// get partitions for each topic
+				List<PartitionInfo> partitionsForTopic = null;
+				for(int tri = 0; tri < 10; tri++) {
+					LOG.info("Trying to get partitions for topic {}", topic);
+					try {
+						partitionsForTopic = consumer.partitionsFor(topic);
+						if(partitionsForTopic != null && partitionsForTopic.size() > 0) {
+							break; // it worked
+						}
+					} catch (NullPointerException npe) {
+						// workaround for KAFKA-2880: Fetcher.getTopicMetadata NullPointerException when broker cannot be reached
+						// we ignore the NPE.
+					}
+					// create a new consumer
+					consumer.close();
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+					}
+					consumer = new KafkaConsumer<>(properties);
+				}
+				// for non existing topics, the list might be null.
+				if(partitionsForTopic != null) {
+					partitionInfos.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
+				}
+			}
+		} finally {
+			if(consumer != null) {
+				consumer.close();
+			}
+		}
+		if(partitionInfos.isEmpty()) {
+			throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
+		}
+
+		// we now have a list of partitions which is the same for all parallel consumer instances.
+		LOG.info("Got {} partitions from these topics: {}", partitionInfos.size(), topics);
+
+		if (LOG.isInfoEnabled()) {
+			logPartitionInfo(partitionInfos);
+		}
+	}
+
+
+	/**
+	 * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable)
+	 * @param partitions A list of Kafka PartitionInfos.
+	 * @return A list of KafkaTopicPartitions
+	 */
+	public static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
+		checkNotNull(partitions, "The given list of partitions was null");
+		List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size());
+		for(PartitionInfo pi: partitions) {
+			ret.add(new KafkaTopicPartition(pi.topic(), pi.partition()));
+		}
+		return ret;
+	}
+
+	public static List<TopicPartition> convertToKafkaTopicPartition(List<KafkaTopicPartition> partitions) {
+		List<TopicPartition> ret = new ArrayList<>(partitions.size());
+		for(KafkaTopicPartition ktp: partitions) {
+			ret.add(new TopicPartition(ktp.getTopic(), ktp.getPartition()));
+		}
+		return ret;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Source life cycle
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+
+		final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
+		final int thisConsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
+
+		// pick which partitions we work on
+		this.subscribedPartitionsAsFlink = assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex);
+		if(this.subscribedPartitionsAsFlink.isEmpty()) {
+			LOG.info("This consumer doesn't have any partitions assigned");
+			this.offsetsState = null;
+			return;
+		} else {
+			StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
+			// if checkpointing is enabled, we are not automatically committing to Kafka.
+			properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(!streamingRuntimeContext.isCheckpointingEnabled()));
+			this.consumer = new KafkaConsumer<>(properties);
+		}
+		subscribedPartitions = convertToKafkaTopicPartition(subscribedPartitionsAsFlink);
+
+		this.consumer.assign(this.subscribedPartitions);
+
+		// register Kafka metrics to Flink accumulators
+		if(!Boolean.getBoolean(properties.getProperty(KEY_DISABLE_METRICS, "false"))) {
+			Map<MetricName, ? extends Metric> metrics = this.consumer.metrics();
+			if(metrics == null) {
+				// MapR's Kafka implementation returns null here.
+				LOG.info("Consumer implementation does not support metrics");
+			} else {
+				for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
+					String name = "consumer-" + metric.getKey().name();
+					DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
+					// best effort: we only add the accumulator if available.
+					if (kafkaAccumulator != null) {
+						getRuntimeContext().addAccumulator(name, kafkaAccumulator);
+					}
+				}
+			}
+		}
+
+		// check if we need to explicitly seek to a specific offset (restore case)
+		if(restoreToOffset != null) {
+			// we are in a recovery scenario
+			for(Map.Entry<KafkaTopicPartition, Long> offset: restoreToOffset.entrySet()) {
+				// seek all offsets to the right position
+				this.consumer.seek(new TopicPartition(offset.getKey().getTopic(), offset.getKey().getPartition()), offset.getValue() + 1);
+			}
+			this.offsetsState = restoreToOffset;
+		} else {
+			this.offsetsState = new HashMap<>();
+		}
+	}
+
+
+
+	@Override
+	public void run(SourceContext<T> sourceContext) throws Exception {
+		if(consumer != null) {
+			consumerThread = new ConsumerThread<>(this, sourceContext);
+			consumerThread.start();
+			// wait for the consumer to stop
+			while(consumerThread.isAlive()) {
+				if(consumerThreadException != null) {
+					throw new RuntimeException("ConsumerThread threw an exception", consumerThreadException);
+				}
+				try {
+					consumerThread.join(50);
+				} catch (InterruptedException ie) {
+					consumerThread.shutdown();
+				}
+			}
+			// check again for an exception
+			if(consumerThreadException != null) {
+				throw new RuntimeException("ConsumerThread threw an exception", consumerThreadException);
+			}
+		} else {
+			// this source never completes, so emit a Long.MAX_VALUE watermark
+			// to not block watermark forwarding
+			if (getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) {
+				sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
+			}
+
+			final Object waitLock = new Object();
+			this.waitThread = Thread.currentThread();
+			while (running) {
+				// wait until we are canceled
+				try {
+					//noinspection SynchronizationOnLocalVariableOrMethodParameter
+					synchronized (waitLock) {
+						waitLock.wait();
+					}
+				}
+				catch (InterruptedException e) {
+					// do nothing, check our "running" status
+				}
+			}
+		}
+		// close the context after the work was done. this can actually only
+		// happen when the fetcher decides to stop fetching
+		sourceContext.close();
+	}
+
+	@Override
+	public void cancel() {
+		// set ourselves as not running
+		running = false;
+		if(this.consumerThread != null) {
+			this.consumerThread.shutdown();
+		} else {
+			// the consumer thread is not running, so we have to interrupt our own thread
+			if(waitThread != null) {
+				waitThread.interrupt();
+			}
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		cancel();
+		super.close();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpoint and restore
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	protected void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
+		Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = convertToCommitMap(checkpointOffsets);
+		synchronized (this.consumer) {
+			this.consumer.commitSync(kafkaCheckpointOffsets);
+		}
+	}
+
+	public static Map<TopicPartition, OffsetAndMetadata> convertToCommitMap(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
+		Map<TopicPartition, OffsetAndMetadata> ret = new HashMap<>(checkpointOffsets.size());
+		for(Map.Entry<KafkaTopicPartition, Long> partitionOffset: checkpointOffsets.entrySet()) {
+			ret.put(new TopicPartition(partitionOffset.getKey().getTopic(), partitionOffset.getKey().getPartition()),
+					new OffsetAndMetadata(partitionOffset.getValue(), ""));
+		}
+		return ret;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Miscellaneous utilities 
+	// ------------------------------------------------------------------------
+
+
+	protected static void setDeserializer(Properties props) {
+		if (!props.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+			props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+		}
+
+		if (!props.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+			props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+		}
+	}
+
+	/**
+	 * We use a separate thread for executing the KafkaConsumer.poll(timeout) call because Kafka is not
+	 * handling interrupts properly. On an interrupt (which happens automatically by Flink if the task
+	 * doesn't react to cancel() calls), the poll() method might never return.
+	 * On cancel, we'll wakeup the .poll() call and wait for it to return
+	 */
+	private static class ConsumerThread<T> extends Thread {
+		private final FlinkKafkaConsumer09<T> flinkKafkaConsumer;
+		private final SourceContext<T> sourceContext;
+		private boolean running = true;
+
+		public ConsumerThread(FlinkKafkaConsumer09<T> flinkKafkaConsumer, SourceContext<T> sourceContext) {
+			this.flinkKafkaConsumer = flinkKafkaConsumer;
+			this.sourceContext = sourceContext;
+		}
+
+		@Override
+		public void run() {
+			try {
+				long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
+				pollLoop: while (running) {
+					ConsumerRecords<byte[], byte[]> records;
+					//noinspection SynchronizeOnNonFinalField
+					synchronized (flinkKafkaConsumer.consumer) {
+						try {
+							records = flinkKafkaConsumer.consumer.poll(pollTimeout);
+						} catch (WakeupException we) {
+							if (running) {
+								throw we;
+							}
+							// leave loop
+							continue;
+						}
+					}
+					// get the records for each topic partition
+					for (int i = 0; i < flinkKafkaConsumer.subscribedPartitions.size(); i++) {
+						TopicPartition partition = flinkKafkaConsumer.subscribedPartitions.get(i);
+						KafkaTopicPartition flinkPartition = flinkKafkaConsumer.subscribedPartitionsAsFlink.get(i);
+						List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition);
+						//noinspection ForLoopReplaceableByForEach
+						for (int j = 0; j < partitionRecords.size(); j++) {
+							ConsumerRecord<byte[], byte[]> record = partitionRecords.get(j);
+							T value = flinkKafkaConsumer.deserializer.deserialize(record.key(), record.value(), record.topic(), record.partition(),record.offset());
+							if(flinkKafkaConsumer.deserializer.isEndOfStream(value)) {
+								// end of stream signaled
+								running = false;
+								break pollLoop;
+							}
+							synchronized (sourceContext.getCheckpointLock()) {
+								sourceContext.collect(value);
+								flinkKafkaConsumer.offsetsState.put(flinkPartition, record.offset());
+							}
+						}
+					}
+				}
+			} catch(Throwable t) {
+				if(running) {
+					this.flinkKafkaConsumer.stopWithError(t);
+				} else {
+					LOG.debug("Stopped ConsumerThread threw exception", t);
+				}
+			} finally {
+				try {
+					flinkKafkaConsumer.consumer.close();
+				} catch(Throwable t) {
+					LOG.warn("Error while closing consumer", t);
+				}
+			}
+		}
+
+		/**
+		 * Try to shutdown the thread
+		 */
+		public void shutdown() {
+			this.running = false;
+			this.flinkKafkaConsumer.consumer.wakeup();
+		}
+	}
+
+	private void stopWithError(Throwable t) {
+		this.consumerThreadException = t;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
new file mode 100644
index 0000000..6f7f687
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	// ------------------- Keyless serialization schema constructors ----------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined (keyless) serialization schema.
+	 */
+	public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined (keyless) serialization schema.
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 */
+	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+	}
+
+	// ------------------- Key/Value serialization schema constructors ----------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 */
+	public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
new file mode 100644
index 0000000..643da66
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+
+/**
+ * Read Strings from Kafka and print them to standard out.
+ * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file!
+ *
+ * Please pass the following arguments to run the example:
+ * 	--topic test --bootstrap.servers localhost:9092 --group.id myconsumer
+ *
+ */
+public class ReadFromKafka {
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.setNumberOfExecutionRetries(4);
+		env.enableCheckpointing(5000);
+		env.setParallelism(2);
+
+		ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		DataStream<String> messageStream = env
+				.addSource(new FlinkKafkaConsumer09<>(
+						parameterTool.getRequired("topic"),
+						new SimpleStringSchema(),
+						parameterTool.getProperties()));
+
+		messageStream.print();
+
+		env.execute("Read from Kafka example");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
new file mode 100644
index 0000000..fbe53fa
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+
+/**
+ * Generate a String every 500 ms and write it into a Kafka topic
+ *
+ * Please pass the following arguments to run the example:
+ * 	--topic test --bootstrap.servers localhost:9092
+ *
+ */
+public class WriteIntoKafka {
+
+	public static void main(String[] args) throws Exception {
+		StreamExecutionEnvironment env =
+				StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.setNumberOfExecutionRetries(4);
+		env.setParallelism(2);
+
+		ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		// very simple data generator
+		DataStream<String> messageStream = env.addSource(new SourceFunction<String>() {
+			public boolean running = true;
+
+			@Override
+			public void run(SourceContext<String> ctx) throws Exception {
+				long i = 0;
+				while(this.running) {
+					ctx.collect("Element - " + i++);
+					Thread.sleep(500);
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		// write data into Kafka
+		messageStream.addSink(new FlinkKafkaProducer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
+
+		env.execute("Write into Kafka example");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
new file mode 100644
index 0000000..55abaaa
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Test;
+
+
+public class Kafka09ITCase extends KafkaConsumerTestBase {
+
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	// ------------------------------------------------------------------------
+
+	@Test(timeout = 60000)
+	public void testCheckpointing() throws Exception {
+		runCheckpointingTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testFailOnNoBroker() throws Exception {
+		runFailOnNoBrokerTest();
+	}
+
+
+	@Test(timeout = 60000)
+	public void testConcurrentProducerConsumerTopology() throws Exception {
+		runSimpleConcurrentProducerConsumerTopology();
+	}
+
+	@Test(timeout = 60000)
+	public void testKeyValueSupport() throws Exception {
+		runKeyValueTest();
+	}
+
+	// --- canceling / failures ---
+
+	@Test(timeout = 60000)
+	public void testCancelingEmptyTopic() throws Exception {
+		runCancelingOnEmptyInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testCancelingFullTopic() throws Exception {
+		runCancelingOnFullInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testFailOnDeploy() throws Exception {
+		runFailOnDeployTest();
+	}
+
+
+	// --- source to partition mappings and exactly once ---
+
+	@Test(timeout = 60000)
+	public void testOneToOneSources() throws Exception {
+		runOneToOneExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testOneSourceMultiplePartitions() throws Exception {
+		runOneSourceMultiplePartitionsExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleSourcesOnePartition() throws Exception {
+		runMultipleSourcesOnePartitionExactlyOnceTest();
+	}
+
+	// --- broker failure ---
+
+	@Test(timeout = 60000)
+	public void testBrokerFailure() throws Exception {
+		runBrokerFailureTest();
+	}
+
+	// --- special executions ---
+
+	@Test(timeout = 60000)
+	public void testBigRecordJob() throws Exception {
+		runBigRecordTestTopology();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleTopics() throws Exception {
+		runConsumeMultipleTopics();
+	}
+
+	@Test(timeout = 60000)
+	public void testAllDeletes() throws Exception {
+		runAllDeletesTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMetricsAndEndOfStream() throws Exception {
+		runMetricsAndEndOfStreamTest();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
new file mode 100644
index 0000000..1288347
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+
+@SuppressWarnings("serial")
+public class Kafka09ProducerITCase extends KafkaProducerTestBase {
+
+	@Test
+	public void testCustomPartitioning() {
+		runCustomPartitioningTest();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..a2c4f73
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducerBase.class)
+public class KafkaProducerTest extends TestLogger {
+	
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testPropagateExceptions() {
+		try {
+			// mock kafka producer
+			KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
+			
+			// partition setup
+			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+					Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null)));
+
+			// failure when trying to send an element
+			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
+				.thenAnswer(new Answer<Future<RecordMetadata>>() {
+					@Override
+					public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
+						Callback callback = (Callback) invocation.getArguments()[1];
+						callback.onCompletion(null, new Exception("Test error"));
+						return null;
+					}
+				});
+			
+			// make sure the FlinkKafkaProducer instantiates our mock producer
+			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+			
+			// (1) producer that propagates errors
+
+			FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
+					"mock_topic", new SimpleStringSchema(), new Properties(), null);
+
+			producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
+			producerPropagating.open(new Configuration());
+			
+			try {
+				producerPropagating.invoke("value");
+				producerPropagating.invoke("value");
+				fail("This should fail with an exception");
+			}
+			catch (Exception e) {
+				assertNotNull(e.getCause());
+				assertNotNull(e.getCause().getMessage());
+				assertTrue(e.getCause().getMessage().contains("Test error"));
+			}
+
+			// (2) producer that only logs errors
+
+			FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
+					"mock_topic", new SimpleStringSchema(), new Properties(), null);
+			producerLogging.setLogFailuresOnly(true);
+			
+			producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
+			producerLogging.open(new Configuration());
+
+			producerLogging.invoke("value");
+			producerLogging.invoke("value");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..0855ba6
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.consumer.ConsumerConfig;
+import kafka.api.PartitionMetadata;
+import kafka.network.SocketServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.9
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+	private File tmpZkDir;
+	private File tmpKafkaParent;
+	private List<File> tmpKafkaDirs;
+	private List<KafkaServer> brokers;
+	private TestingServer zookeeper;
+	private String zookeeperConnectionString;
+	private String brokerConnectionString = "";
+	private Properties standardProps;
+	private ConsumerConfig standardCC;
+
+
+	public String getBrokerConnectionString() {
+		return brokerConnectionString;
+	}
+
+	@Override
+	public ConsumerConfig getStandardConsumerConfig() {
+		return standardCC;
+	}
+
+	@Override
+	public Properties getStandardProperties() {
+		return standardProps;
+	}
+
+	@Override
+	public String getVersion() {
+		return "0.9";
+	}
+
+	@Override
+	public List<KafkaServer> getBrokers() {
+		return brokers;
+	}
+
+	@Override
+	public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
+		return new FlinkKafkaConsumer09<>(topics, readSchema, props);
+	}
+
+	@Override
+	public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+		return new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+	}
+
+	@Override
+	public void restartBroker(int leaderId) throws Exception {
+		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId), KAFKA_HOST, zookeeperConnectionString));
+	}
+
+	@Override
+	public int getLeaderToShutDown(String topic) throws Exception {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			PartitionMetadata firstPart = null;
+			do {
+				if (firstPart != null) {
+					LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+					// not the first try. Sleep a bit
+					Thread.sleep(150);
+				}
+
+				Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata();
+				firstPart = partitionMetadata.head();
+			}
+			while (firstPart.errorCode() != 0);
+
+			return firstPart.leader().get().id();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	@Override
+	public int getBrokerId(KafkaServer server) {
+		return server.config().brokerId();
+	}
+
+	@Override
+	public void prepare(int numKafkaServers) {
+		File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
+
+		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
+
+		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
+		for (int i = 0; i < numKafkaServers; i++) {
+			File tmpDir = new File(tmpKafkaParent, "server-" + i);
+			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
+			tmpKafkaDirs.add(tmpDir);
+		}
+
+		zookeeper = null;
+		brokers = null;
+
+		try {
+			LOG.info("Starting Zookeeper");
+			zookeeper = new TestingServer(-1, tmpZkDir);
+			zookeeperConnectionString = zookeeper.getConnectString();
+
+			LOG.info("Starting KafkaServer");
+			brokers = new ArrayList<>(numKafkaServers);
+
+			for (int i = 0; i < numKafkaServers; i++) {
+				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i), KafkaTestEnvironment.KAFKA_HOST, zookeeperConnectionString));
+
+				SocketServer socketServer = brokers.get(i).socketServer();
+				brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+			}
+
+			LOG.info("ZK and KafkaServer started.");
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail("Test setup failed: " + t.getMessage());
+		}
+
+		standardProps = new Properties();
+		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
+		standardProps.setProperty("group.id", "flink-tests");
+		standardProps.setProperty("auto.commit.enable", "false");
+		standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis.
+		standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
+		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
+		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
+		Properties consumerConfigProps = new Properties();
+		consumerConfigProps.putAll(standardProps);
+		consumerConfigProps.setProperty("auto.offset.reset", "smallest");
+		standardCC = new ConsumerConfig(consumerConfigProps);
+	}
+
+	@Override
+	public void shutdown() {
+		for (KafkaServer broker : brokers) {
+			if (broker != null) {
+				broker.shutdown();
+			}
+		}
+		brokers.clear();
+
+		if (zookeeper != null) {
+			try {
+				zookeeper.stop();
+			}
+			catch (Exception e) {
+				LOG.warn("ZK.stop() failed", e);
+			}
+			zookeeper = null;
+		}
+
+		// clean up the temp spaces
+
+		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpKafkaParent);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+		if (tmpZkDir != null && tmpZkDir.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpZkDir);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+	}
+
+	public ZkUtils getZkUtils() {
+		ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
+				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+		return ZkUtils.apply(creator, false);
+	}
+
+	@Override
+	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
+		// create topic with one client
+		Properties topicConfig = new Properties();
+		LOG.info("Creating topic {}", topic);
+
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig);
+		} finally {
+			zkUtils.close();
+		}
+
+		// validate that the topic has been created
+		final long deadline = System.currentTimeMillis() + 30000;
+		do {
+			try {
+				Thread.sleep(100);
+			} catch (InterruptedException e) {
+				// restore interrupted state
+			}
+			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
+			// not always correct.
+
+			// create a new ZK utils connection
+			ZkUtils checkZKConn = getZkUtils();
+			if(AdminUtils.topicExists(checkZKConn, topic)) {
+				checkZKConn.close();
+				return;
+			}
+			checkZKConn.close();
+		}
+		while (System.currentTimeMillis() < deadline);
+		fail("Test topic could not be created");
+	}
+
+	@Override
+	public void deleteTestTopic(String topic) {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			LOG.info("Deleting topic {}", topic);
+
+			ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
+					standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+
+			AdminUtils.deleteTopic(zkUtils, topic);
+
+			zk.close();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	/**
+	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+	 */
+	protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
+												String kafkaHost,
+												String zookeeperConnectionString) throws Exception {
+		Properties kafkaProperties = new Properties();
+
+		// properties have to be Strings
+		kafkaProperties.put("advertised.host.name", kafkaHost);
+		kafkaProperties.put("broker.id", Integer.toString(brokerId));
+		kafkaProperties.put("log.dir", tmpFolder.toString());
+		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+		kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
+		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
+
+		// for CI stability, increase zookeeper session timeout
+		kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
+
+		final int numTries = 5;
+
+		for (int i = 1; i <= numTries; i++) {
+			int kafkaPort = NetUtils.getAvailablePort();
+			kafkaProperties.put("port", Integer.toString(kafkaPort));
+			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+			try {
+				scala.Option<String> stringNone = scala.Option.apply(null);
+				KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+				server.startup();
+				return server;
+			}
+			catch (KafkaException e) {
+				if (e.getCause() instanceof BindException) {
+					// port conflict, retry...
+					LOG.info("Port conflict when starting Kafka Broker. Retrying...");
+				}
+				else {
+					throw e;
+				}
+			}
+		}
+
+		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
new file mode 100644
index 0000000..354d353
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
@@ -0,0 +1,169 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>1.0-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-kafka-base</artifactId>
+	<name>flink-connector-kafka-base</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<kafka.version>0.8.2.0</kafka.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_${scala.binary.version}</artifactId>
+			<version>${kafka.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>net.sf.jopt-simple</groupId>
+					<artifactId>jopt-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-reflect</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.yammer.metrics</groupId>
+					<artifactId>metrics-annotation</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- force using the latest zkclient -->
+		<dependency>
+			<groupId>com.101tec</groupId>
+			<artifactId>zkclient</artifactId>
+			<version>0.7</version>
+			<type>jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<dependencyManagement>
+		<dependencies>
+			<dependency>
+				<groupId>com.101tec</groupId>
+				<artifactId>zkclient</artifactId>
+				<version>0.7</version>
+			</dependency>
+		</dependencies>
+	</dependencyManagement>
+	
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-failsafe-plugin</artifactId>
+				<configuration>
+					<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
+					<forkCount>1</forkCount>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+	
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
new file mode 100644
index 0000000..3c36586
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
+		implements CheckpointNotifier, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T> {
+
+	// ------------------------------------------------------------------------
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+
+	private static final long serialVersionUID = -6272159445203409112L;
+
+	/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
+	public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+
+	/** The schema to convert between Kafka#s byte messages, and Flink's objects */
+	protected final KeyedDeserializationSchema<T> deserializer;
+
+	// ------  Runtime State  -------
+
+	/** Data for pending but uncommitted checkpoints */
+	protected final LinkedMap pendingCheckpoints = new LinkedMap();
+
+	/** The offsets of the last returned elements */
+	protected transient HashMap<KafkaTopicPartition, Long> offsetsState;
+
+	/** The offsets to restore to, if the consumer restores state from a checkpoint */
+	protected transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
+
+	/** Flag indicating whether the consumer is still running **/
+	protected volatile boolean running = true;
+
+	// ------------------------------------------------------------------------
+
+
+	/**
+	 * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
+	 *
+	 * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
+	 * at the beginnign of this class.</p>
+	 *
+	 * @param deserializer
+	 *           The deserializer to turn raw byte messages into Java/Scala objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 */
+	public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> deserializer, Properties props) {
+		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpoint and restore
+	// ------------------------------------------------------------------------
+
+	@Override
+	public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		if (offsetsState == null) {
+			LOG.debug("snapshotState() requested on not yet opened source; returning null.");
+			return null;
+		}
+		if (!running) {
+			LOG.debug("snapshotState() called on closed source");
+			return null;
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
+					KafkaTopicPartition.toString(offsetsState), checkpointId, checkpointTimestamp);
+		}
+
+		// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
+		//noinspection unchecked
+		HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) offsetsState.clone();
+
+		// the map cannot be asynchronously updated, because only one checkpoint call can happen
+		// on this function at a time: either snapshotState() or notifyCheckpointComplete()
+		pendingCheckpoints.put(checkpointId, currentOffsets);
+			
+		while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+			pendingCheckpoints.remove(0);
+		}
+
+		return currentOffsets;
+	}
+
+	@Override
+	public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
+		LOG.info("Setting restore state in Kafka");
+		restoreToOffset = restoredOffsets;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		if (offsetsState == null) {
+			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+			return;
+		}
+		if (!running) {
+			LOG.debug("notifyCheckpointComplete() called on closed source");
+			return;
+		}
+		
+		// only one commit operation must be in progress
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Committing offsets externally for checkpoint {}", checkpointId);
+		}
+
+		try {
+			HashMap<KafkaTopicPartition, Long> checkpointOffsets;
+	
+			// the map may be asynchronously updates when snapshotting state, so we synchronize
+			synchronized (pendingCheckpoints) {
+				final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+				if (posInMap == -1) {
+					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+					return;
+				}
+
+				//noinspection unchecked
+				checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
+
+				
+				// remove older checkpoints in map
+				for (int i = 0; i < posInMap; i++) {
+					pendingCheckpoints.remove(0);
+				}
+			}
+			if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
+				LOG.debug("Checkpoint state was empty.");
+				return;
+			}
+			commitOffsets(checkpointOffsets);
+		}
+		catch (Exception e) {
+			if (running) {
+				throw e;
+			}
+			// else ignore exception if we are no longer running
+		}
+	}
+
+	protected abstract void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) throws Exception;
+
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return deserializer.getProducedType();
+	}
+
+	protected static <T> List<T> assignPartitions(List<T> partitions, int numConsumers, int consumerIndex) {
+		checkArgument(numConsumers > 0);
+		checkArgument(consumerIndex < numConsumers);
+
+		List<T> partitionsToSub = new ArrayList<>();
+
+		for (int i = 0; i < partitions.size(); i++) {
+			if (i % numConsumers == consumerIndex) {
+				partitionsToSub.add(partitions.get(i));
+			}
+		}
+		return partitionsToSub;
+	}
+
+	/**
+	 * Method to log partition information.
+	 * @param partitionInfos List of subscribed partitions
+	 */
+	public static void logPartitionInfo(List<KafkaTopicPartition> partitionInfos) {
+		Map<String, Integer> countPerTopic = new HashMap<>();
+		for (KafkaTopicPartition partition : partitionInfos) {
+			Integer count = countPerTopic.get(partition.getTopic());
+			if (count == null) {
+				count = 1;
+			} else {
+				count++;
+			}
+			countPerTopic.put(partition.getTopic(), count);
+		}
+		StringBuilder sb = new StringBuilder();
+		for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
+			sb.append(e.getKey()).append(" (").append(e.getValue()).append("), ");
+		}
+		LOG.info("Consumer is going to read the following topics (with number of partitions): ", sb.toString());
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
new file mode 100644
index 0000000..ebc02c9
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>  {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Configuration key for disabling the metrics reporting
+	 */
+	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+	/**
+	 * Array with the partition ids of the given topicId
+	 * The size of this array is the number of partitions
+	 */
+	protected final int[] partitions;
+
+	/**
+	 * User defined properties for the Producer
+	 */
+	protected final Properties producerConfig;
+
+	/**
+	 * The name of the topic this producer is writing data to
+	 */
+	protected final String topicId;
+
+	/**
+	 * (Serializable) SerializationSchema for turning objects used with Flink into
+	 * byte[] for Kafka.
+	 */
+	protected final KeyedSerializationSchema<IN> schema;
+
+	/**
+	 * User-provided partitioner for assigning an object to a Kafka partition.
+	 */
+	protected final KafkaPartitioner<IN> partitioner;
+
+	/**
+	 * Flag indicating whether to accept failures (and log them), or to fail on failures
+	 */
+	protected boolean logFailuresOnly;
+	
+	// -------------------------------- Runtime fields ------------------------------------------
+
+	/** KafkaProducer instance */
+	protected transient KafkaProducer<byte[], byte[]> producer;
+
+	/** The callback than handles error propagation or logging callbacks */
+	protected transient Callback callback;
+	
+	/** Errors encountered in the async producer are stored here */
+	protected transient volatile Exception asyncException;
+
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner
+	 */
+	public FlinkKafkaProducerBase(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		Preconditions.checkNotNull(topicId, "TopicID not set");
+		Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
+		Preconditions.checkNotNull(producerConfig, "producerConfig not set");
+		ClosureCleaner.ensureSerializable(customPartitioner);
+		ClosureCleaner.ensureSerializable(serializationSchema);
+
+		this.topicId = topicId;
+		this.schema = serializationSchema;
+		this.producerConfig = producerConfig;
+
+		// set the producer configuration properties.
+
+		if (!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+		}
+
+		if (!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+			this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+		}
+
+
+		// create a local KafkaProducer to get the list of partitions.
+		// this will also ensure locally that all required ProducerConfig values are set.
+		try (KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig)) {
+			List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId);
+
+			this.partitions = new int[partitionsList.size()];
+			for (int i = 0; i < partitions.length; i++) {
+				partitions[i] = partitionsList.get(i).partition();
+			}
+			getPartitionsProd.close();
+		}
+
+		this.partitioner = customPartitioner;
+	}
+
+	// ---------------------------------- Properties --------------------------
+
+	/**
+	 * Defines whether the producer should fail on errors, or only log them.
+	 * If this is set to true, then exceptions will be only logged, if set to false,
+	 * exceptions will be eventually thrown and cause the streaming program to 
+	 * fail (and enter recovery).
+	 * 
+	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+	 */
+	public void setLogFailuresOnly(boolean logFailuresOnly) {
+		this.logFailuresOnly = logFailuresOnly;
+	}
+
+	// ----------------------------------- Utilities --------------------------
+	
+	/**
+	 * Initializes the connection to Kafka.
+	 */
+	@Override
+	public void open(Configuration configuration) {
+		producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
+
+		RuntimeContext ctx = getRuntimeContext();
+		if(partitioner != null) {
+			partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
+		}
+
+		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", 
+				ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), topicId);
+
+		// register Kafka metrics to Flink accumulators
+		if(!Boolean.getBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
+			Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
+
+			if(metrics == null) {
+				// MapR's Kafka implementation returns null here.
+				LOG.info("Producer implementation does not support metrics");
+			} else {
+				for(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
+					String name = "producer-" + metric.getKey().name();
+					DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
+					// best effort: we only add the accumulator if available.
+					if(kafkaAccumulator != null) {
+						getRuntimeContext().addAccumulator(name, kafkaAccumulator);
+					}
+				}
+			}
+		}
+
+		if (logFailuresOnly) {
+			callback = new Callback() {
+				
+				@Override
+				public void onCompletion(RecordMetadata metadata, Exception e) {
+					if (e != null) {
+						LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
+					}
+				}
+			};
+		}
+		else {
+			callback = new Callback() {
+				@Override
+				public void onCompletion(RecordMetadata metadata, Exception exception) {
+					if (exception != null && asyncException == null) {
+						asyncException = exception;
+					}
+				}
+			};
+		}
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Kafka.
+	 *
+	 * @param next
+	 * 		The incoming data
+	 */
+	@Override
+	public void invoke(IN next) throws Exception {
+		// propagate asynchronous errors
+		checkErroneous();
+
+		byte[] serializedKey = schema.serializeKey(next);
+		byte[] serializedValue = schema.serializeValue(next);
+		ProducerRecord<byte[], byte[]> record;
+		if(partitioner == null) {
+			record = new ProducerRecord<>(topicId, serializedKey, serializedValue);
+		} else {
+			record = new ProducerRecord<>(topicId, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);
+		}
+
+		producer.send(record, callback);
+	}
+
+
+	@Override
+	public void close() throws Exception {
+		if (producer != null) {
+			producer.close();
+		}
+		
+		// make sure we propagate pending errors
+		checkErroneous();
+	}
+
+
+	// ----------------------------------- Utilities --------------------------
+
+	protected void checkErroneous() throws Exception {
+		Exception e = asyncException;
+		if (e != null) {
+			// prevent double throwing
+			asyncException = null;
+			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+		}
+	}
+	
+	public static Properties getPropertiesFromBrokerList(String brokerList) {
+		String[] elements = brokerList.split(",");
+		
+		// validate the broker addresses
+		for (String broker: elements) {
+			NetUtils.getCorrectHostnamePort(broker);
+		}
+		
+		Properties props = new Properties();
+		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+		return props;
+	}
+}


Mime
View raw message