flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [10/14] flink git commit: [FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition
Date Wed, 13 Apr 2016 08:31:08 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
deleted file mode 100644
index a38c3bd..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Hacky wrapper to send an object instance through a Properties - map.
- *
- * This works as follows:
- * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
- *
- * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
- * This is set in the key-value (java.util.Properties) map.
- * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
- * This is a hack because the put() method is called on the underlying Hashmap.
- *
- * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
- *
- * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
- */
-public class PartitionerWrapper implements Partitioner {
-	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
-
-	private Partitioner wrapped;
-	public PartitionerWrapper(VerifiableProperties properties) {
-		wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
-	}
-
-	@Override
-	public int partition(Object value, int numberOfPartitions) {
-		return wrapped.partition(value, numberOfPartitions);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
new file mode 100644
index 0000000..6aaeca9
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.HashMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A thread that periodically writes the current Kafka partition offsets to Zookeeper.
+ */
+public class PeriodicOffsetCommitter extends Thread {
+
+	/** The ZooKeeper handler */
+	private final ZookeeperOffsetHandler offsetHandler;
+	
+	private final KafkaTopicPartitionState<?>[] partitionStates;
+	
+	/** The proxy to forward exceptions to the main thread */
+	private final ExceptionProxy errorHandler;
+	
+	/** Interval in which to commit, in milliseconds */
+	private final long commitInterval;
+	
+	/** Flag to mark the periodic committer as running */
+	private volatile boolean running = true;
+
+	PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler,
+			KafkaTopicPartitionState<?>[] partitionStates,
+			ExceptionProxy errorHandler,
+			long commitInterval)
+	{
+		this.offsetHandler = checkNotNull(offsetHandler);
+		this.partitionStates = checkNotNull(partitionStates);
+		this.errorHandler = checkNotNull(errorHandler);
+		this.commitInterval = commitInterval;
+		
+		checkArgument(commitInterval > 0);
+	}
+
+	@Override
+	public void run() {
+		try {
+			while (running) {
+				Thread.sleep(commitInterval);
+
+				// create copy a deep copy of the current offsets
+				HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>(partitionStates.length);
+				for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
+					currentOffsets.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
+				}
+				
+				offsetHandler.writeOffsets(currentOffsets);
+			}
+		}
+		catch (Throwable t) {
+			if (running) {
+				errorHandler.reportError(
+						new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t));
+			}
+		}
+	}
+
+	public void shutdown() {
+		this.running = false;
+		this.interrupt();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
new file mode 100644
index 0000000..491ffad
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.kafka.common.Node;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig;
+
+/**
+ * This class implements a thread with a connection to a single Kafka broker. The thread
+ * pulls records for a set of topic partitions for which the connected broker is currently
+ * the leader. The thread deserializes these records and emits them. 
+ * 
+ * @param <T> The type of elements that this consumer thread creates from Kafka's byte messages
+ *            and emits into the Flink DataStream.
+ */
+class SimpleConsumerThread<T> extends Thread {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class);
+
+	private static final KafkaTopicPartitionState<TopicAndPartition> MARKER = Kafka08Fetcher.MARKER;
+	
+	// ------------------------------------------------------------------------
+
+	private final Kafka08Fetcher<T> owner;
+	
+	private final KeyedDeserializationSchema<T> deserializer;
+
+	private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions;
+
+	private final Node broker;
+
+	/** Queue containing new fetch partitions for the consumer thread */
+	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue;
+	
+	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions;
+	
+	private final ExceptionProxy errorHandler;
+	
+	private final long invalidOffsetBehavior;
+	
+	private volatile boolean running = true;
+	
+
+	// ----------------- Simple Consumer ----------------------
+	private volatile SimpleConsumer consumer;
+
+	private final int soTimeout;
+	private final int minBytes;
+	private final int maxWait;
+	private final int fetchSize;
+	private final int bufferSize;
+	private final int reconnectLimit;
+
+
+	// exceptions are thrown locally
+	public SimpleConsumerThread(
+			Kafka08Fetcher<T> owner,
+			ExceptionProxy errorHandler,
+			Properties config,
+			Node broker,
+			List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
+			ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions,
+			KeyedDeserializationSchema<T> deserializer,
+			long invalidOffsetBehavior)
+	{
+		this.owner = owner;
+		this.errorHandler = errorHandler;
+		this.broker = broker;
+		this.partitions = seedPartitions;
+		this.deserializer = requireNonNull(deserializer);
+		this.unassignedPartitions = requireNonNull(unassignedPartitions);
+		this.newPartitionsQueue = new ClosableBlockingQueue<>();
+		this.invalidOffsetBehavior = invalidOffsetBehavior;
+		
+		// these are the actual configuration values of Kafka + their original default values.
+		this.soTimeout = getIntFromConfig(config, "socket.timeout.ms", 30000);
+		this.minBytes = getIntFromConfig(config, "fetch.min.bytes", 1);
+		this.maxWait = getIntFromConfig(config, "fetch.wait.max.ms", 100);
+		this.fetchSize = getIntFromConfig(config, "fetch.message.max.bytes", 1048576);
+		this.bufferSize = getIntFromConfig(config, "socket.receive.buffer.bytes", 65536);
+		this.reconnectLimit = getIntFromConfig(config, "flink.simple-consumer-reconnectLimit", 3);
+	}
+
+	public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() {
+		return newPartitionsQueue;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  main work loop
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void run() {
+		LOG.info("Starting to fetch from {}", this.partitions);
+
+		// set up the config values
+		final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
+
+		try {
+			// create the Kafka consumer that we actually use for fetching
+			consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+			
+			// make sure that all partitions have some offsets to start with
+			// those partitions that do not have an offset from a checkpoint need to get
+			// their start offset from ZooKeeper
+			getMissingOffsetsFromKafka(partitions);
+
+			// Now, the actual work starts :-)
+			int offsetOutOfRangeCount = 0;
+			int reconnects = 0;
+			while (running) {
+
+				// ----------------------------------- partitions list maintenance ----------------------------
+
+				// check queue for new partitions to read from:
+				List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch();
+				if (newPartitions != null) {
+					// found some new partitions for this thread's broker
+					
+					// check if the new partitions need an offset lookup
+					getMissingOffsetsFromKafka(newPartitions);
+					
+					// add the new partitions (and check they are not already in there)
+					for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
+						if (partitions.contains(newPartition)) {
+							throw new IllegalStateException("Adding partition " + newPartition + 
+									" to subscribed partitions even though it is already subscribed");
+						}
+						partitions.add(newPartition);
+					}
+					
+					LOG.info("Adding {} new partitions to consumer thread {}", newPartitions.size(), getName());
+					LOG.debug("Partitions list: {}", newPartitions);
+				}
+
+				if (partitions.size() == 0) {
+					if (newPartitionsQueue.close()) {
+						// close succeeded. Closing thread
+						running = false;
+						
+						LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.", 
+								getName());
+
+						// add the wake-up marker into the queue to make the main thread
+						// immediately wake up and termination faster
+						unassignedPartitions.add(MARKER);
+
+						break;
+					} else {
+						// close failed: fetcher main thread concurrently added new partitions into the queue.
+						// go to top of loop again and get the new partitions
+						continue; 
+					}
+				}
+
+				// ----------------------------------- request / response with kafka ----------------------------
+
+				FetchRequestBuilder frb = new FetchRequestBuilder();
+				frb.clientId(clientId);
+				frb.maxWait(maxWait);
+				frb.minBytes(minBytes);
+
+				for (KafkaTopicPartitionState<?> partition : partitions) {
+					frb.addFetch(
+							partition.getKafkaTopicPartition().getTopic(),
+							partition.getKafkaTopicPartition().getPartition(),
+							partition.getOffset() + 1, // request the next record
+							fetchSize);
+				}
+				
+				kafka.api.FetchRequest fetchRequest = frb.build();
+				LOG.debug("Issuing fetch request {}", fetchRequest);
+
+				FetchResponse fetchResponse;
+				try {
+					fetchResponse = consumer.fetch(fetchRequest);
+				}
+				catch (Throwable cce) {
+					//noinspection ConstantConditions
+					if (cce instanceof ClosedChannelException) {
+						LOG.warn("Fetch failed because of ClosedChannelException.");
+						LOG.debug("Full exception", cce);
+						
+						// we don't know if the broker is overloaded or unavailable.
+						// retry a few times, then return ALL partitions for new leader lookup
+						if (++reconnects >= reconnectLimit) {
+							LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit);
+							for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
+								unassignedPartitions.add(fp);
+							}
+							this.partitions.clear();
+							continue; // jump to top of loop: will close thread or subscribe to new partitions
+						}
+						try {
+							consumer.close();
+						} catch (Throwable t) {
+							LOG.warn("Error while closing consumer connection", t);
+						}
+						// delay & retry
+						Thread.sleep(100);
+						consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+						continue; // retry
+					} else {
+						throw cce;
+					}
+				}
+				reconnects = 0;
+
+				// ---------------------------------------- error handling ----------------------------
+
+				if (fetchResponse == null) {
+					throw new IOException("Fetch from Kafka failed (request returned null)");
+				}
+				
+				if (fetchResponse.hasError()) {
+					String exception = "";
+					List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
+					
+					// iterate over partitions to get individual error codes
+					Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
+					boolean partitionsRemoved = false;
+					
+					while (partitionsIterator.hasNext()) {
+						final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
+						short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition());
+
+						if (code == ErrorMapping.OffsetOutOfRangeCode()) {
+							// we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
+							// Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
+							partitionsToGetOffsetsFor.add(fp);
+						}
+						else if (code == ErrorMapping.NotLeaderForPartitionCode() ||
+								code == ErrorMapping.LeaderNotAvailableCode() ||
+								code == ErrorMapping.BrokerNotAvailableCode() ||
+								code == ErrorMapping.UnknownCode())
+						{
+							// the broker we are connected to is not the leader for the partition.
+							LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp);
+							LOG.debug("Error code = {}", code);
+
+							unassignedPartitions.add(fp);
+
+							partitionsIterator.remove(); // unsubscribe the partition ourselves
+							partitionsRemoved = true;
+						}
+						else if (code != ErrorMapping.NoError()) {
+							exception += "\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
+									StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+						}
+					}
+					if (partitionsToGetOffsetsFor.size() > 0) {
+						// safeguard against an infinite loop.
+						if (offsetOutOfRangeCount++ > 3) {
+							throw new RuntimeException("Found invalid offsets more than three times in partitions "
+									+ partitionsToGetOffsetsFor + " Exceptions: " + exception);
+						}
+						// get valid offsets for these partitions and try again.
+						LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
+						getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
+						
+						LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
+						continue; // jump back to create a new fetch request. The offset has not been touched.
+					}
+					else if (partitionsRemoved) {
+						continue; // create new fetch request
+					}
+					else {
+						// partitions failed on an error
+						throw new IOException("Error while fetching from broker '" + broker +"': " + exception);
+					}
+				} else {
+					// successful fetch, reset offsetOutOfRangeCount.
+					offsetOutOfRangeCount = 0;
+				}
+
+				// ----------------------------------- process fetch response ----------------------------
+
+				int messagesInFetch = 0;
+				int deletedMessages = 0;
+				Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
+				
+				partitionsLoop:
+				while (partitionsIterator.hasNext()) {
+					final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next();
+					
+					final ByteBufferMessageSet messageSet = fetchResponse.messageSet(
+							currentPartition.getTopic(), currentPartition.getPartition());
+
+					for (MessageAndOffset msg : messageSet) {
+						if (running) {
+							messagesInFetch++;
+							final ByteBuffer payload = msg.message().payload();
+							final long offset = msg.offset();
+							
+							if (offset <= currentPartition.getOffset()) {
+								// we have seen this message already
+								LOG.info("Skipping message with offset " + msg.offset()
+										+ " because we have seen messages until (including) "
+										+ currentPartition.getOffset()
+										+ " from topic/partition " + currentPartition.getTopic() + '/'
+										+ currentPartition.getPartition() + " already");
+								continue;
+							}
+
+							// If the message value is null, this represents a delete command for the message key.
+							// Log this and pass it on to the client who might want to also receive delete messages.
+							byte[] valueBytes;
+							if (payload == null) {
+								deletedMessages++;
+								valueBytes = null;
+							} else {
+								valueBytes = new byte[payload.remaining()];
+								payload.get(valueBytes);
+							}
+
+							// put key into byte array
+							byte[] keyBytes = null;
+							int keySize = msg.message().keySize();
+
+							if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
+								ByteBuffer keyPayload = msg.message().key();
+								keyBytes = new byte[keySize];
+								keyPayload.get(keyBytes);
+							}
+
+							final T value = deserializer.deserialize(keyBytes, valueBytes, 
+									currentPartition.getTopic(), currentPartition.getPartition(), offset);
+							
+							if (deserializer.isEndOfStream(value)) {
+								// remove partition from subscribed partitions.
+								partitionsIterator.remove();
+								continue partitionsLoop;
+							}
+							
+							owner.emitRecord(value, currentPartition, offset);
+						}
+						else {
+							// no longer running
+							return;
+						}
+					}
+				}
+				LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages);
+			} // end of fetch loop
+
+			if (!newPartitionsQueue.close()) {
+				throw new Exception("Bug: Cleanly leaving fetcher thread without having a closed queue.");
+			}
+		}
+		catch (Throwable t) {
+			// report to the fetcher's error handler
+			errorHandler.reportError(t);
+		}
+		finally {
+			if (consumer != null) {
+				// closing the consumer should not fail the program
+				try {
+					consumer.close();
+				}
+				catch (Throwable t) {
+					LOG.error("Error while closing the Kafka simple consumer", t);
+				}
+			}
+		}
+	}
+
+	private void getMissingOffsetsFromKafka(
+			List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
+	{
+		// collect which partitions we should fetch offsets for
+		List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
+		for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+			if (!part.isOffsetDefined()) {
+				// retrieve the offset from the consumer
+				partitionsToGetOffsetsFor.add(part);
+			}
+		}
+		
+		if (partitionsToGetOffsetsFor.size() > 0) {
+			getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior);
+			
+			LOG.info("No checkpoint/savepoint offsets found for some partitions. " +
+					"Fetched the following start offsets {}", partitionsToGetOffsetsFor);
+		}
+	}
+
+	/**
+	 * Cancels this fetch thread. The thread will release all resources and terminate.
+	 */
+	public void cancel() {
+		this.running = false;
+
+		// interrupt whatever the consumer is doing
+		if (consumer != null) {
+			consumer.close();
+		}
+
+		this.interrupt();
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Kafka Request Utils
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Request latest offsets for a set of partitions, via a Kafka consumer.
+	 *
+	 * <p>This method retries three times if the response has an error.
+	 *
+	 * @param consumer The consumer connected to lead broker
+	 * @param partitions The list of partitions we need offsets for
+	 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
+	 */
+	private static void getLastOffsetFromKafka(
+			SimpleConsumer consumer,
+			List<KafkaTopicPartitionState<TopicAndPartition>> partitions,
+			long whichTime) throws IOException
+	{
+		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
+		for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+			requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1));
+		}
+
+		int retries = 0;
+		OffsetResponse response;
+		while (true) {
+			kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+					requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+			response = consumer.getOffsetsBefore(request);
+
+			if (response.hasError()) {
+				StringBuilder exception = new StringBuilder();
+				for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+					short code;
+					if ((code = response.errorCode(part.getTopic(), part.getPartition())) != ErrorMapping.NoError()) {
+						exception.append("\nException for topic=").append(part.getTopic())
+								.append(" partition=").append(part.getPartition()).append(": ")
+								.append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code)));
+					}
+				}
+				if (++retries >= 3) {
+					throw new IOException("Unable to get last offset for partitions " + partitions + ": "
+							+ exception.toString());
+				} else {
+					LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception);
+				}
+			} else {
+				break; // leave retry loop
+			}
+		}
+
+		for (KafkaTopicPartitionState<TopicAndPartition> part: partitions) {
+			final long offset = response.offsets(part.getTopic(), part.getPartition())[0];
+			
+			// the offset returned is that of the next record to fetch. because our state reflects the latest
+			// successfully emitted record, we subtract one
+			part.setOffset(offset - 1);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index 328cab0..a1a81ed 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -24,7 +24,6 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import org.slf4j.Logger;
@@ -39,11 +38,9 @@ import java.util.Properties;
 /**
  * Handler for committing Kafka offsets to Zookeeper and to retrieve them again.
  */
-public class ZookeeperOffsetHandler implements OffsetHandler {
+public class ZookeeperOffsetHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
-	
-	private static final long OFFSET_NOT_SET = FlinkKafkaConsumerBase.OFFSET_NOT_SET;
 
 	private final String groupId;
 
@@ -74,27 +71,40 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
 		curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
 		curatorClient.start();
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Offset access and manipulation
+	// ------------------------------------------------------------------------
 
-
-	@Override
-	public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception {
-		for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToCommit.entrySet()) {
+	/**
+	 * Writes given set of offsets for Kafka partitions to ZooKeeper.
+	 * 
+	 * @param offsetsToWrite The offsets for the partitions to write.
+	 * @throws Exception The method forwards exceptions.
+	 */
+	public void writeOffsets(Map<KafkaTopicPartition, Long> offsetsToWrite) throws Exception {
+		for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToWrite.entrySet()) {
 			KafkaTopicPartition tp = entry.getKey();
 			long offset = entry.getValue();
-			
+
 			if (offset >= 0) {
 				setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset);
 			}
 		}
 	}
 
-	@Override
+	/**
+	 * 
+	 * @param partitions The partitions to read offsets for.
+	 * @return The mapping from partition to offset.
+	 * @throws Exception This method forwards exceptions.
+	 */
 	public Map<KafkaTopicPartition, Long> getOffsets(List<KafkaTopicPartition> partitions) throws Exception {
 		Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size());
 		for (KafkaTopicPartition tp : partitions) {
-			long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition());
+			Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition());
 
-			if (offset != OFFSET_NOT_SET) {
+			if (offset != null) {
 				LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.",
 						tp.getTopic(), tp.getPartition(), offset);
 				ret.put(tp, offset);
@@ -103,7 +113,11 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
 		return ret;
 	}
 
-	@Override
+	/**
+	 * Closes the offset handler.
+	 * 
+	 * @throws IOException Thrown, if the handler cannot be closed properly.
+	 */
 	public void close() throws IOException {
 		curatorClient.close();
 	}
@@ -120,7 +134,7 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
 		curatorClient.setData().forPath(path, data);
 	}
 
-	public static long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
+	public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
 		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
 		String path = topicDirs.consumerOffsetDir() + "/" + partition;
 		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
@@ -128,18 +142,20 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
 		byte[] data = curatorClient.getData().forPath(path);
 		
 		if (data == null) {
-			return OFFSET_NOT_SET;
+			return null;
 		} else {
 			String asString = new String(data);
 			if (asString.length() == 0) {
-				return OFFSET_NOT_SET;
+				return null;
 			} else {
 				try {
-					return Long.parseLong(asString);
-				} catch (NumberFormatException e) {
-					throw new Exception(String.format(
-						"The offset in ZooKeeper for group '%s', topic '%s', partition %d is a malformed string: %s",
-						groupId, topic, partition, asString));
+					return Long.valueOf(asString);
+				}
+				catch (NumberFormatException e) {
+					LOG.error(
+							"The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
+						groupId, topic, partition, asString);
+					return null;
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index d6ee968..0aef3bd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -35,8 +34,11 @@ import org.junit.Test;
 
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class Kafka08ITCase extends KafkaConsumerTestBase {
 
@@ -45,11 +47,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 	// ------------------------------------------------------------------------
 
 	@Test(timeout = 60000)
-	public void testCheckpointing() throws Exception {
-		runCheckpointingTest();
-	}
-
-	@Test(timeout = 60000)
 	public void testFailOnNoBroker() throws Exception {
 		runFailOnNoBrokerTest();
 	}
@@ -60,15 +57,15 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		runSimpleConcurrentProducerConsumerTopology();
 	}
 
-	@Test(timeout = 60000)
-	public void testPunctuatedExplicitWMConsumer() throws Exception {
-		runExplicitPunctuatedWMgeneratingConsumerTest(false);
-	}
+//	@Test(timeout = 60000)
+//	public void testPunctuatedExplicitWMConsumer() throws Exception {
+//		runExplicitPunctuatedWMgeneratingConsumerTest(false);
+//	}
 
-	@Test(timeout = 60000)
-	public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
-		runExplicitPunctuatedWMgeneratingConsumerTest(true);
-	}
+//	@Test(timeout = 60000)
+//	public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
+//		runExplicitPunctuatedWMgeneratingConsumerTest(true);
+//	}
 
 	@Test(timeout = 60000)
 	public void testKeyValueSupport() throws Exception {
@@ -164,7 +161,31 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		runMetricsAndEndOfStreamTest();
 	}
 
+	@Test
+	public void runOffsetManipulationInZooKeeperTest() {
+		try {
+			final String topicName = "ZookeeperOffsetHandlerTest-Topic";
+			final String groupId = "ZookeeperOffsetHandlerTest-Group";
 
+			final Long offset = (long) (Math.random() * Long.MAX_VALUE);
+
+			CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
+			kafkaServer.createTestTopic(topicName, 3, 2);
+
+			ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset);
+
+			Long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0);
+
+			curatorFramework.close();
+
+			assertEquals(offset, fetchedOffset);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
 	/**
 	 * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
 	 *
@@ -202,15 +223,15 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 		CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
 
-		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0);
-		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1);
-		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2);
+		Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0);
+		Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1);
+		Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2);
 
 		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
 
-		assertTrue(o1 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-		assertTrue(o2 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o2 >= 0 && o2 <= 100));
-		assertTrue(o3 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o3 >= 0 && o3 <= 100));
+		assertTrue(o1 == null || (o1 >= 0 && o1 <= 100));
+		assertTrue(o2 == null || (o2 >= 0 && o2 <= 100));
+		assertTrue(o3 == null || (o3 >= 0 && o3 <= 100));
 
 		LOG.info("Manipulating offsets");
 
@@ -264,16 +285,16 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		// get the offset
 		CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
 
-		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
-		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
-		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
+		Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
+		Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
+		Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
 		curatorFramework.close();
 		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
 
 		// ensure that the offset has been committed
-		boolean atLeastOneOffsetSet = (o1 > 0 && o1 <= 100) ||
-				(o2 > 0 && o2 <= 100) ||
-				(o3 > 0 && o3 <= 100);
+		boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 100) ||
+				(o2 != null && o2 > 0 && o2 <= 100) ||
+				(o3 != null && o3 > 0 && o3 <= 100);
 		assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet);
 
 		deleteTestTopic(topicName);
@@ -295,7 +316,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 	public void testKafkaOffsetRetrievalToZookeeper() throws Exception {
 		final String topicName = "testKafkaOffsetToZk";
 		final int parallelism = 3;
+		
 		createTestTopic(topicName, parallelism, 1);
+		
 		StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env1.getConfig().disableSysoutLogging();
 		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
@@ -305,9 +328,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		writeSequence(env1, topicName, 50, parallelism);
 
 
-		StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
-		// enable checkpointing
+		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env2.getConfig().disableSysoutLogging();
 		env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		env2.setParallelism(parallelism);
@@ -320,78 +341,55 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		DataStream<String> stream = env2.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps));
 		stream.addSink(new DiscardingSink<String>());
 
-		CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-		String consumerGroupDir = standardProps.getProperty("group.id");
-		TreeCache tc1 = new TreeCache(curatorFramework, "/consumers/" + consumerGroupDir + "/offsets/" + topicName + "/0");
-		TreeCache tc2 = new TreeCache(curatorFramework, "/consumers/" + consumerGroupDir + "/offsets/" + topicName + "/1");
-		TreeCache tc3 = new TreeCache(curatorFramework, "/consumers/" + consumerGroupDir + "/offsets/" + topicName + "/2");
-
-		// add listener to wait until first partition is updated in ZK
-		TreeCacheListener stopListener = new TreeCacheListener() {
-			AtomicInteger counter = new AtomicInteger(0);
-			@Override
-			public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
-				LOG.info("Updated {}", event);
-				if (event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
-					if(counter.incrementAndGet() == 3) {
-						// cancel job, node has been created
-						LOG.info("Cancelling job after all three ZK nodes were updated");
-						JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-					}
-				}
-			}
-		};
-		tc1.getListenable().addListener(stopListener);
-		tc1.start();
-		tc2.getListenable().addListener(stopListener);
-		tc2.start();
-		tc3.getListenable().addListener(stopListener);
-		tc3.start();
-
-		// the curator listener is not always working properly. Stop job after 10 seconds
-		final Tuple1<Throwable> error = new Tuple1<>();
-		Thread canceller = new Thread(new Runnable() {
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+		final Thread runner = new Thread("runner") {
 			@Override
 			public void run() {
 				try {
-					Thread.sleep(10_000L);
-					LOG.info("Cancelling job after 10 seconds");
-					JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-				} catch (Throwable t) {
-					if (!(t instanceof InterruptedException)) {
-						error.f0 = t;
+					env2.execute();
+				}
+				catch (Throwable t) {
+					if (!(t.getCause() instanceof JobCancellationException)) {
+						errorRef.set(t);
 					}
 				}
 			}
-		});
-		canceller.start();
-
-		try {
-			env2.execute("Idlying Kafka source");
-		} catch( Throwable thr) {
-			if(!(thr.getCause() instanceof JobCancellationException)) {
-				throw thr;
+		};
+		runner.start();
+
+		final CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+		final Long l49 = 49L;
+				
+		final long deadline = 30000 + System.currentTimeMillis();
+		do {
+			Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
+			Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
+			Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
+
+			if (l49.equals(o1) && l49.equals(o2) && l49.equals(o3)) {
+				break;
 			}
+			
+			Thread.sleep(100);
 		}
-		tc1.close();
-		tc2.close();
-		tc3.close();
-
-		canceller.interrupt();
-		canceller.join();
-		if(error.f0 != null) {
-			throw new RuntimeException("Delayed cancelling thread had an error", error.f0);
+		while (System.currentTimeMillis() < deadline);
+		
+		// cancel the job
+		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		
+		final Throwable t = errorRef.get();
+		if (t != null) {
+			throw new RuntimeException("Job failed with an exception", t);
 		}
 
 		// check if offsets are correctly in ZK
-		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
-		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
-		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
-		Assert.assertEquals(49L, o1);
-		Assert.assertEquals(49L, o2);
-		Assert.assertEquals(49L, o3);
+		Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
+		Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
+		Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
+		Assert.assertEquals(Long.valueOf(49L), o1);
+		Assert.assertEquals(Long.valueOf(49L), o2);
+		Assert.assertEquals(Long.valueOf(49L), o3);
 
 		curatorFramework.close();
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
new file mode 100644
index 0000000..36fb7e6
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+public class KafkaConsumer08Test {
+
+	@Test
+	public void testValidateZooKeeperConfig() {
+		try {
+			// empty
+			Properties emptyProperties = new Properties();
+			try {
+				FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
+				fail("should fail with an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+
+			// no connect string (only group string)
+			Properties noConnect = new Properties();
+			noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
+			try {
+				FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
+				fail("should fail with an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+
+			// no group string (only connect string)
+			Properties noGroup = new Properties();
+			noGroup.put("zookeeper.connect", "localhost:47574");
+			try {
+				FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
+				fail("should fail with an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCreateSourceWithoutCluster() {
+		try {
+			Properties props = new Properties();
+			props.setProperty("zookeeper.connect", "localhost:56794");
+			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
+			props.setProperty("group.id", "non-existent-group");
+
+			new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
+			fail();
+		}
+		catch (Exception e) {
+			assertTrue(e.getMessage().contains("Unable to retrieve any partitions"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
deleted file mode 100644
index 7337f65..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class KafkaConsumerTest {
-
-	@Test
-	public void testValidateZooKeeperConfig() {
-		try {
-			// empty
-			Properties emptyProperties = new Properties();
-			try {
-				FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no connect string (only group string)
-			Properties noConnect = new Properties();
-			noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
-			try {
-				FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no group string (only connect string)
-			Properties noGroup = new Properties();
-			noGroup.put("zookeeper.connect", "localhost:47574");
-			try {
-				FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSnapshot() {
-		try {
-			Field offsetsField = FlinkKafkaConsumerBase.class.getDeclaredField("partitionState");
-			Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running");
-			Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
-
-			offsetsField.setAccessible(true);
-			runningField.setAccessible(true);
-			mapField.setAccessible(true);
-
-			FlinkKafkaConsumer08<?> consumer = mock(FlinkKafkaConsumer08.class);
-			when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
-
-			HashMap<KafkaTopicPartition, KafkaPartitionState> testState = new HashMap<>();
-			HashMap<KafkaTopicPartition, Long> testOffsets = new HashMap<>();
-			long[] offsets = new long[] { 43, 6146, 133, 16, 162, 616 };
-			int j = 0;
-			for (long i: offsets) {
-				KafkaTopicPartition ktp = new KafkaTopicPartition("topic", j++);
-				testState.put(ktp, new KafkaPartitionState(ktp.getPartition(), i));
-				testOffsets.put(ktp, i);
-			}
-
-			LinkedMap map = new LinkedMap();
-
-			offsetsField.set(consumer, testState);
-			runningField.set(consumer, true);
-			mapField.set(consumer, map);
-
-			assertTrue(map.isEmpty());
-
-			// make multiple checkpoints
-			for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
-				HashMap<KafkaTopicPartition, Long> checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
-				assertEquals(testOffsets, checkpoint);
-
-				// change the offsets, make sure the snapshot did not change
-				HashMap<KafkaTopicPartition, Long> checkpointCopy = (HashMap<KafkaTopicPartition, Long>) checkpoint.clone();
-
-				for (Map.Entry<KafkaTopicPartition, Long> e: testOffsets.entrySet()) {
-					KafkaTopicPartition ktp = e.getKey();
-					testState.put(ktp, new KafkaPartitionState(ktp.getPartition(), e.getValue() + 1));
-					testOffsets.put(ktp, e.getValue() + 1);
-				}
-
-				assertEquals(checkpointCopy, checkpoint);
-
-				assertTrue(map.size() > 0);
-				assertTrue(map.size() <= FlinkKafkaConsumer08.MAX_NUM_PENDING_CHECKPOINTS);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	@Ignore("Kafka consumer internally makes an infinite loop")
-	public void testCreateSourceWithoutCluster() {
-		try {
-			Properties props = new Properties();
-			props.setProperty("zookeeper.connect", "localhost:56794");
-			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
-			props.setProperty("group.id", "non-existent-group");
-
-			new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
index 21140dd..c28799c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
+@SuppressWarnings("serial")
 public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase {
 
 	@Test(timeout=60000)
@@ -28,6 +29,6 @@ public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase {
 
 	@Test(timeout=60000)
 	public void testAutoOffsetResetNone() throws Exception {
-		runFailOnAutoOffsetResetNone();
+		runFailOnAutoOffsetResetNoneEager();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
deleted file mode 100644
index c99e133..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
-import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
-	
-	@Test
-	public void runOffsetManipulationinZooKeeperTest() {
-		try {
-			final String topicName = "ZookeeperOffsetHandlerTest-Topic";
-			final String groupId = "ZookeeperOffsetHandlerTest-Group";
-			
-			final long offset = (long) (Math.random() * Long.MAX_VALUE);
-
-			CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
-			kafkaServer.createTestTopic(topicName, 3, 2);
-
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset);
-	
-			long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0);
-
-			curatorFramework.close();
-			
-			assertEquals(offset, fetchedOffset);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
index 6bdfb48..fbeb110 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
@@ -25,5 +25,6 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 55f9875..d34cd2f 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -17,38 +17,31 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.SerializedValue;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
@@ -73,10 +66,8 @@ import static java.util.Objects.requireNonNull;
  */
 public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 
-	// ------------------------------------------------------------------------
-	
 	private static final long serialVersionUID = 2324564345203409112L;
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
 
 	/**  Configuration key to change the polling timeout **/
@@ -85,35 +76,18 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 	/** Boolean configuration key to disable metrics tracking **/
 	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
 
-	/**
-	 * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
-	 * available. If 0, returns immediately with any records that are available now.
-	 */
+	/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+	 * available. If 0, returns immediately with any records that are available now. */
 	public static final long DEFAULT_POLL_TIMEOUT = 100L;
 
+	// ------------------------------------------------------------------------
+
 	/** User-supplied properties for Kafka **/
 	private final Properties properties;
-	/** Ordered list of all partitions available in all subscribed partitions **/
-	private final List<KafkaTopicPartition> partitionInfos;
-
-	/** Unique ID identifying the consumer */
-	private final String consumerId;
-
-	// ------  Runtime State  -------
-
-	/** The partitions actually handled by this consumer at runtime */
-	private transient List<TopicPartition> subscribedPartitions;
-	/** For performance reasons, we are keeping two representations of the subscribed partitions **/
-	private transient List<KafkaTopicPartition> subscribedPartitionsAsFlink;
-	/** The Kafka Consumer instance**/
-	private transient KafkaConsumer<byte[], byte[]> consumer;
-	/** The thread running Kafka's consumer **/
-	private transient ConsumerThread<T> consumerThread;
-	/** Exception set from the ConsumerThread */
-	private transient Throwable consumerThreadException;
-	/** If the consumer doesn't have a Kafka partition assigned at runtime, it'll block on this waitThread **/
-	private transient Thread waitThread;
 
+	/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+	 * available. If 0, returns immediately with any records that are available now */
+	private final long pollTimeout;
 
 	// ------------------------------------------------------------------------
 
@@ -177,14 +151,30 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 	 *           The properties that are used to configure both the fetcher and the offset handler.
 	 */
 	public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
-		super(deserializer, props);
-		requireNonNull(topics, "topics");
-		this.properties = requireNonNull(props, "props");
+		super(deserializer);
+
+		checkNotNull(topics, "topics");
+		this.properties = checkNotNull(props, "props");
 		setDeserializer(this.properties);
+
+		// configure the polling timeout
+		try {
+			if (properties.containsKey(KEY_POLL_TIMEOUT)) {
+				this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
+			} else {
+				this.pollTimeout = DEFAULT_POLL_TIMEOUT;
+			}
+		}
+		catch (Exception e) {
+			throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
+		}
+
+		// read the partitions that belong to the listed topics
+		final List<KafkaTopicPartition> partitions = new ArrayList<>();
 		KafkaConsumer<byte[], byte[]> consumer = null;
+
 		try {
 			consumer = new KafkaConsumer<>(this.properties);
-			this.partitionInfos = new ArrayList<>();
 			for (final String topic: topics) {
 				// get partitions for each topic
 				List<PartitionInfo> partitionsForTopic = null;
@@ -203,307 +193,93 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 					consumer.close();
 					try {
 						Thread.sleep(1000);
-					} catch (InterruptedException e) {
-					}
+					} catch (InterruptedException ignored) {}
+					
 					consumer = new KafkaConsumer<>(properties);
 				}
 				// for non existing topics, the list might be null.
-				if(partitionsForTopic != null) {
-					partitionInfos.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
+				if (partitionsForTopic != null) {
+					partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
 				}
 			}
-		} finally {
+		}
+		finally {
 			if(consumer != null) {
 				consumer.close();
 			}
 		}
-		if(partitionInfos.isEmpty()) {
+
+		if (partitions.isEmpty()) {
 			throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
 		}
 
 		// we now have a list of partitions which is the same for all parallel consumer instances.
-		LOG.info("Got {} partitions from these topics: {}", partitionInfos.size(), topics);
+		LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics);
 
 		if (LOG.isInfoEnabled()) {
-			logPartitionInfo(partitionInfos);
+			logPartitionInfo(LOG, partitions);
 		}
 
-		this.consumerId = UUID.randomUUID().toString();
+		// register these partitions
+		setSubscribedPartitions(partitions);
+	}
+
+	@Override
+	protected AbstractFetcher<T, ?> createFetcher(
+			SourceContext<T> sourceContext,
+			List<KafkaTopicPartition> thisSubtaskPartitions,
+			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+			StreamingRuntimeContext runtimeContext) throws Exception {
+
+		boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
+
+		return new Kafka09Fetcher<>(sourceContext, thisSubtaskPartitions,
+				watermarksPeriodic, watermarksPunctuated,
+				runtimeContext, deserializer,
+				properties, pollTimeout, useMetrics);
+		
 	}
 
+	// ------------------------------------------------------------------------
+	//  Utilities 
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable)
+	 * 
 	 * @param partitions A list of Kafka PartitionInfos.
 	 * @return A list of KafkaTopicPartitions
 	 */
-	public static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
-		requireNonNull(partitions, "The given list of partitions was null");
+	private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
+		checkNotNull(partitions);
+
 		List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size());
-		for(PartitionInfo pi: partitions) {
+		for (PartitionInfo pi : partitions) {
 			ret.add(new KafkaTopicPartition(pi.topic(), pi.partition()));
 		}
 		return ret;
 	}
 
-	public static List<TopicPartition> convertToKafkaTopicPartition(List<KafkaTopicPartition> partitions) {
-		List<TopicPartition> ret = new ArrayList<>(partitions.size());
-		for(KafkaTopicPartition ktp: partitions) {
-			ret.add(new TopicPartition(ktp.getTopic(), ktp.getPartition()));
-		}
-		return ret;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Source life cycle
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-
-		final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
-		final int thisConsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
-
-		// pick which partitions we work on
-		this.subscribedPartitionsAsFlink = assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex);
-		if(this.subscribedPartitionsAsFlink.isEmpty()) {
-			LOG.info("This consumer doesn't have any partitions assigned");
-			this.partitionState = null;
-			return;
-		} else {
-			StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
-			// if checkpointing is enabled, we are not automatically committing to Kafka.
-			properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(!streamingRuntimeContext.isCheckpointingEnabled()));
-			this.consumer = new KafkaConsumer<>(properties);
-		}
-		subscribedPartitions = convertToKafkaTopicPartition(subscribedPartitionsAsFlink);
-
-		this.consumer.assign(this.subscribedPartitions);
-
-		// register Kafka metrics to Flink accumulators
-		if(!Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"))) {
-			Map<MetricName, ? extends Metric> metrics = this.consumer.metrics();
-			if(metrics == null) {
-				// MapR's Kafka implementation returns null here.
-				LOG.info("Consumer implementation does not support metrics");
-			} else {
-				for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
-					String name = consumerId + "-consumer-" + metric.getKey().name();
-					DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
-					// best effort: we only add the accumulator if available.
-					if (kafkaAccumulator != null) {
-						getRuntimeContext().addAccumulator(name, kafkaAccumulator);
-					}
-				}
-			}
-		}
-
-		// check if we need to explicitly seek to a specific offset (restore case)
-		if(restoreToOffset != null) {
-			// we are in a recovery scenario
-			for(Map.Entry<KafkaTopicPartition, Long> info: restoreToOffset.entrySet()) {
-				// seek all offsets to the right position
-				this.consumer.seek(new TopicPartition(info.getKey().getTopic(), info.getKey().getPartition()), info.getValue() + 1);
-			}
-			this.partitionState = restoreInfoFromCheckpoint();
-		} else {
-			this.partitionState = new HashMap<>();
-		}
-	}
-
-
-
-	@Override
-	public void run(SourceContext<T> sourceContext) throws Exception {
-		if(consumer != null) {
-			consumerThread = new ConsumerThread<>(this, sourceContext);
-			consumerThread.setDaemon(true);
-			consumerThread.start();
-			// wait for the consumer to stop
-			while(consumerThread.isAlive()) {
-				if(consumerThreadException != null) {
-					throw new RuntimeException("ConsumerThread threw an exception: " + consumerThreadException.getMessage(), consumerThreadException);
-				}
-				try {
-					consumerThread.join(50);
-				} catch (InterruptedException ie) {
-					consumerThread.shutdown();
-				}
-			}
-			// check again for an exception
-			if(consumerThreadException != null) {
-				throw new RuntimeException("ConsumerThread threw an exception: " + consumerThreadException.getMessage(), consumerThreadException);
-			}
-		} else {
-			// this source never completes, so emit a Long.MAX_VALUE watermark
-			// to not block watermark forwarding
-			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
-
-			final Object waitLock = new Object();
-			this.waitThread = Thread.currentThread();
-			while (running) {
-				// wait until we are canceled
-				try {
-					//noinspection SynchronizationOnLocalVariableOrMethodParameter
-					synchronized (waitLock) {
-						waitLock.wait();
-					}
-				}
-				catch (InterruptedException e) {
-					// do nothing, check our "running" status
-				}
-			}
-		}
-		// close the context after the work was done. this can actually only
-		// happen when the fetcher decides to stop fetching
-		sourceContext.close();
-	}
-
-	@Override
-	public void cancel() {
-		// set ourselves as not running
-		running = false;
-		if(this.consumerThread != null) {
-			this.consumerThread.shutdown();
-		} else {
-			// the consumer thread is not running, so we have to interrupt our own thread
-			if(waitThread != null) {
-				waitThread.interrupt();
-			}
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		cancel();
-		super.close();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checkpoint and restore
-	// ------------------------------------------------------------------------
-
-
-	@Override
-	protected void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
-		if(!running) {
-			LOG.warn("Unable to commit offsets on closed consumer");
-			return;
-		}
-		Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = convertToCommitMap(checkpointOffsets);
-		synchronized (this.consumer) {
-			this.consumer.commitSync(kafkaCheckpointOffsets);
-		}
-	}
-
-	public static Map<TopicPartition, OffsetAndMetadata> convertToCommitMap(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
-		Map<TopicPartition, OffsetAndMetadata> ret = new HashMap<>(checkpointOffsets.size());
-		for(Map.Entry<KafkaTopicPartition, Long> partitionOffset: checkpointOffsets.entrySet()) {
-			ret.put(new TopicPartition(partitionOffset.getKey().getTopic(), partitionOffset.getKey().getPartition()),
-					new OffsetAndMetadata(partitionOffset.getValue(), ""));
-		}
-		return ret;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Miscellaneous utilities 
-	// ------------------------------------------------------------------------
-
-
-	protected static void setDeserializer(Properties props) {
-		if (!props.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
-			props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
-		} else {
-			LOG.warn("Overwriting the '{}' is not recommended", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-		}
-
-		if (!props.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
-			props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
-		} else {
-			LOG.warn("Overwriting the '{}' is not recommended", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-		}
-	}
-
 	/**
-	 * We use a separate thread for executing the KafkaConsumer.poll(timeout) call because Kafka is not
-	 * handling interrupts properly. On an interrupt (which happens automatically by Flink if the task
-	 * doesn't react to cancel() calls), the poll() method might never return.
-	 * On cancel, we'll wakeup the .poll() call and wait for it to return
+	 * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
+	 * 
+	 * @param props The Kafka properties to register the serializer in.
 	 */
-	private static class ConsumerThread<T> extends Thread {
-		private final FlinkKafkaConsumer09<T> flinkKafkaConsumer;
-		private final SourceContext<T> sourceContext;
-		private boolean running = true;
-
-		public ConsumerThread(FlinkKafkaConsumer09<T> flinkKafkaConsumer, SourceContext<T> sourceContext) {
-			this.flinkKafkaConsumer = flinkKafkaConsumer;
-			this.sourceContext = sourceContext;
-		}
+	private static void setDeserializer(Properties props) {
+		final String deSerName = ByteArrayDeserializer.class.getCanonicalName();
 
-		@Override
-		public void run() {
-			try {
-				long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
-				pollLoop: while (running) {
-					ConsumerRecords<byte[], byte[]> records;
-					//noinspection SynchronizeOnNonFinalField
-					synchronized (flinkKafkaConsumer.consumer) {
-						try {
-							records = flinkKafkaConsumer.consumer.poll(pollTimeout);
-						} catch (WakeupException we) {
-							if (running) {
-								throw we;
-							}
-							// leave loop
-							continue;
-						}
-					}
-					// get the records for each topic partition
-					for (int i = 0; i < flinkKafkaConsumer.subscribedPartitions.size(); i++) {
-						TopicPartition partition = flinkKafkaConsumer.subscribedPartitions.get(i);
-						KafkaTopicPartition flinkPartition = flinkKafkaConsumer.subscribedPartitionsAsFlink.get(i);
-						List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition);
-						//noinspection ForLoopReplaceableByForEach
-						for (int j = 0; j < partitionRecords.size(); j++) {
-							ConsumerRecord<byte[], byte[]> record = partitionRecords.get(j);
-							T value = flinkKafkaConsumer.deserializer.deserialize(record.key(), record.value(), record.topic(), record.partition(),record.offset());
-							if(flinkKafkaConsumer.deserializer.isEndOfStream(value)) {
-								// end of stream signaled
-								running = false;
-								break pollLoop;
-							}
-							synchronized (sourceContext.getCheckpointLock()) {
-								flinkKafkaConsumer.processElement(sourceContext, flinkPartition, value, record.offset());
-							}
-						}
-					}
-				}
-			} catch(Throwable t) {
-				if(running) {
-					this.flinkKafkaConsumer.stopWithError(t);
-				} else {
-					LOG.debug("Stopped ConsumerThread threw exception", t);
-				}
-			} finally {
-				try {
-					flinkKafkaConsumer.consumer.close();
-				} catch(Throwable t) {
-					LOG.warn("Error while closing consumer", t);
-				}
-			}
-		}
+		Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+		Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
 
-		/**
-		 * Try to shutdown the thread
-		 */
-		public void shutdown() {
-			this.running = false;
-			this.flinkKafkaConsumer.consumer.wakeup();
+		if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
+			LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+		}
+		if (valDeSer != null && !valDeSer.equals(deSerName)) {
+			LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
 		}
-	}
 
-	private void stopWithError(Throwable t) {
-		this.consumerThreadException = t;
+		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
+		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
 	}
 }


Mime
View raw message