flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [2/2] flink git commit: [FLINK-3102] Allow reading from multiple topics with one FlinkKafkaConsumer instance
Date Tue, 08 Dec 2015 17:22:57 GMT
[FLINK-3102] Allow reading from multiple topics with one FlinkKafkaConsumer instance

This closes #1437


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc8be1ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc8be1ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc8be1ca

Branch: refs/heads/master
Commit: fc8be1ca6bab85dca8be5c8c92a2ab16449ea928
Parents: 4dbb10f
Author: Robert Metzger <rmetzger@apache.org>
Authored: Thu Dec 3 16:32:55 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Dec 8 17:33:11 2015 +0100

----------------------------------------------------------------------
 .../relational/EmptyFieldsCountAccumulator.java |   2 +-
 .../connectors/kafka/FlinkKafkaConsumer.java    | 266 +++++++++----------
 .../connectors/kafka/FlinkKafkaConsumer081.java |   3 +-
 .../connectors/kafka/FlinkKafkaConsumer082.java |  20 +-
 .../connectors/kafka/FlinkKafkaProducer.java    |   4 +-
 .../connectors/kafka/api/KafkaSink.java         |  36 ---
 .../api/persistent/PersistentKafkaSource.java   |  57 ----
 .../connectors/kafka/internals/Fetcher.java     |  23 +-
 .../kafka/internals/KafkaTopicPartition.java    | 124 +++++++++
 .../internals/KafkaTopicPartitionLeader.java    | 129 +++++++++
 .../kafka/internals/LegacyFetcher.java          | 249 +++++++++--------
 .../kafka/internals/OffsetHandler.java          |   6 +-
 .../kafka/internals/ZookeeperOffsetHandler.java |  19 +-
 .../kafka/partitioner/FixedPartitioner.java     |   8 +-
 .../KafkaConsumerPartitionAssignmentTest.java   | 194 +++++++-------
 .../connectors/kafka/KafkaConsumerTest.java     |  45 ++--
 .../connectors/kafka/KafkaConsumerTestBase.java | 130 +++++++--
 .../streaming/connectors/kafka/KafkaITCase.java |   6 +
 .../connectors/kafka/KafkaProducerITCase.java   |   3 +-
 .../connectors/kafka/KafkaTestBase.java         |   4 +-
 .../KeyedDeserializationSchema.java             |   2 +-
 .../KeyedDeserializationSchemaWrapper.java      |   2 +-
 ...eInformationKeyValueSerializationSchema.java |   2 +-
 tools/maven/checkstyle.xml                      |   5 +
 24 files changed, 822 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
index e7ac474..6813c62 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.java.DataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index 2d1d91a..7ff5e29 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -33,8 +33,10 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
@@ -43,16 +45,12 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW
 import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -124,7 +122,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * reach the Kafka brokers or ZooKeeper.</p>
  */
 public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
-		implements CheckpointNotifier, CheckpointedAsynchronously<long[]>, ResultTypeQueryable<T> {
+		implements CheckpointNotifier, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T> {
 
 	/**
 	 * The offset store defines how acknowledged offsets are committed back to Kafka. Different
@@ -198,19 +196,17 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 
 	/** The type of fetcher to be used to pull data from Kafka */
 	private final FetcherType fetcherType;
-	
-	/** name of the topic consumed by this source */
-	private final String topic;
+
+	/** List of partitions (including topics and leaders) to consume  */
+	private final List<KafkaTopicPartitionLeader> partitionInfos;
 	
 	/** The properties to parametrize the Kafka consumer and ZooKeeper client */ 
 	private final Properties props;
-	
-	/** The ids of the partitions that are read by this consumer */
-	private final int[] partitions;
-	
+
 	/** The schema to convert between Kafka#s byte messages, and Flink's objects */
 	private final KeyedDeserializationSchema<T> deserializer;
 
+
 	// ------  Runtime State  -------
 
 	/** Data for pending but uncommitted checkpoints */
@@ -222,18 +218,18 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	/** The committer that persists the committed offsets */
 	private transient OffsetHandler offsetHandler;
 	
-	/** The partitions actually handled by this consumer */
-	private transient List<TopicPartition> subscribedPartitions;
+	/** The partitions actually handled by this consumer at runtime */
+	private transient List<KafkaTopicPartitionLeader> subscribedPartitions;
 
 	/** The offsets of the last returned elements */
-	private transient long[] lastOffsets;
+	private transient HashMap<KafkaTopicPartition, Long> lastOffsets;
 
 	/** The latest offsets that have been committed to Kafka or ZooKeeper. These are never
 	 * newer then the last offsets (Flink's internal view is fresher) */
-	private transient long[] commitedOffsets;
+	private transient HashMap<KafkaTopicPartition, Long> committedOffsets;
 	
 	/** The offsets to restore to, if the consumer restores state from a checkpoint */
-	private transient long[] restoreToOffset;
+	private transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
 	
 	private volatile boolean running = true;
 	
@@ -257,7 +253,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	 * @param fetcherType
 	 *           The type of fetcher to use (new high-level API, old low-level API).
 	 */
-	public FlinkKafkaConsumer(String topic, DeserializationSchema<T> deserializer, Properties props,
+	public FlinkKafkaConsumer(List<String> topic, DeserializationSchema<T> deserializer, Properties props,
 							OffsetStore offsetStore, FetcherType fetcherType) {
 		this(topic, new KeyedDeserializationSchemaWrapper<>(deserializer),
 				props, offsetStore, fetcherType);
@@ -269,8 +265,8 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	 * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
 	 * at the beginnign of this class.</p>
 	 * 
-	 * @param topic 
-	 *           The Kafka topic to read from.
+	 * @param topics
+	 *           The Kafka topics to read from.
 	 * @param deserializer
 	 *           The deserializer to turn raw byte messages into Java/Scala objects.
 	 * @param props
@@ -280,7 +276,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	 * @param fetcherType
 	 *           The type of fetcher to use (new high-level API, old low-level API).
 	 */
-	public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props,
+	public FlinkKafkaConsumer(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props,
 								OffsetStore offsetStore, FetcherType fetcherType) {
 		this.offsetStore = checkNotNull(offsetStore);
 		this.fetcherType = checkNotNull(fetcherType);
@@ -294,7 +290,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 					"The Kafka offset handler cannot be used together with the old low-level fetcher.");
 		}
 		
-		this.topic = checkNotNull(topic, "topic");
+		checkNotNull(topics, "topics");
 		this.props = checkNotNull(props, "props");
 		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
 
@@ -303,25 +299,31 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 			validateZooKeeperConfig(props);
 		}
 		
-		// Connect to a broker to get the partitions
-		List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props);
+		// Connect to a broker to get the partitions for all topics
+		this.partitionInfos = getPartitionsForTopic(topics, props);
 
 		if (partitionInfos.size() == 0) {
-			throw new RuntimeException("Unable to retrieve any partitions for topic " + topic + "." +
+			throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics.toString() + "." +
 					"Please check previous log entries");
 		}
 
-		// get initial partitions list. The order of the partitions is important for consistent 
-		// partition id assignment in restart cases.
-		this.partitions = new int[partitionInfos.size()];
-		for (int i = 0; i < partitionInfos.size(); i++) {
-			partitions[i] = partitionInfos.get(i).partition();
-			
-			if (partitions[i] >= partitions.length) {
-				throw new RuntimeException("Kafka partition numbers are sparse");
+		if (LOG.isInfoEnabled()) {
+			Map<String, Integer> countPerTopic = new HashMap<>();
+			for (KafkaTopicPartitionLeader partition : partitionInfos) {
+				Integer count = countPerTopic.get(partition.getTopicPartition().getTopic());
+				if (count == null) {
+					count = 1;
+				} else {
+					count++;
+				}
+				countPerTopic.put(partition.getTopicPartition().getTopic(), count);
+			}
+			StringBuilder sb = new StringBuilder();
+			for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
+				sb.append(e.getKey()).append(" (").append(e.getValue()).append("), ");
 			}
+			LOG.info("Consumer is going to read the following topics (with number of partitions): ", sb.toString());
 		}
-		LOG.info("Topic {} has {} partitions", topic, partitions.length);
 	}
 
 	// ------------------------------------------------------------------------
@@ -333,19 +335,19 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		super.open(parameters);
 		
 		final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
-		final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
+		final int thisConsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
 		
 		// pick which partitions we work on
-		subscribedPartitions = assignPartitions(this.partitions, this.topic, numConsumers, thisComsumerIndex);
+		subscribedPartitions = assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex);
 		
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Kafka consumer {} will read partitions {} out of partitions {}",
-					thisComsumerIndex, subscribedPartitions, Arrays.toString(partitions));
+					thisConsumerIndex, KafkaTopicPartitionLeader.toString(subscribedPartitions), this.partitionInfos.size());
 		}
 
 		// we leave the fetcher as null, if we have no partitions
 		if (subscribedPartitions.isEmpty()) {
-			LOG.info("Kafka consumer {} has no partitions (empty source)", thisComsumerIndex);
+			LOG.info("Kafka consumer {} has no partitions (empty source)", thisConsumerIndex);
 			return;
 		}
 		
@@ -354,12 +356,11 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 			case NEW_HIGH_LEVEL:
 				throw new UnsupportedOperationException("Currently unsupported");
 			case LEGACY_LOW_LEVEL:
-				fetcher = new LegacyFetcher(topic, props, getRuntimeContext().getTaskName());
+				fetcher = new LegacyFetcher(this.subscribedPartitions, props, getRuntimeContext().getTaskName());
 				break;
 			default:
 				throw new RuntimeException("Requested unknown fetcher " + fetcher);
 		}
-		fetcher.setPartitionsToRead(subscribedPartitions);
 
 		// offset handling
 		switch (offsetStore){
@@ -372,34 +373,29 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 				throw new RuntimeException("Requested unknown offset store " + offsetStore);
 		}
 		
-		// set up operator state
-		lastOffsets = new long[partitions.length];
-		commitedOffsets = new long[partitions.length];
-		
-		Arrays.fill(lastOffsets, OFFSET_NOT_SET);
-		Arrays.fill(commitedOffsets, OFFSET_NOT_SET);
-		
+		committedOffsets = new HashMap<>();
+
 		// seek to last known pos, from restore request
 		if (restoreToOffset != null) {
 			if (LOG.isInfoEnabled()) {
-				LOG.info("Consumer {} found offsets from previous checkpoint: {}",
-						thisComsumerIndex,  Arrays.toString(restoreToOffset));
+				LOG.info("Consumer {} is restored from previous checkpoint: {}",
+						thisConsumerIndex, KafkaTopicPartition.toString(restoreToOffset));
 			}
 			
-			for (int i = 0; i < restoreToOffset.length; i++) {
-				long restoredOffset = restoreToOffset[i];
-				if (restoredOffset != OFFSET_NOT_SET) {
-					// if this fails because we are not subscribed to the topic, then the
-					// partition assignment is not deterministic!
-					
-					// we set the offset +1 here, because seek() is accepting the next offset to read,
-					// but the restore offset is the last read offset
-					fetcher.seek(new TopicPartition(topic, i), restoredOffset + 1);
-					lastOffsets[i] = restoredOffset;
-				}
+			for (Map.Entry<KafkaTopicPartition, Long> restorePartition: restoreToOffset.entrySet()) {
+				// seek fetcher to restore position
+				// we set the offset +1 here, because seek() is accepting the next offset to read,
+				// but the restore offset is the last read offset
+				fetcher.seek(restorePartition.getKey(), restorePartition.getValue() + 1);
 			}
+			// initialize offsets with restored state
+			this.lastOffsets = restoreToOffset;
+			restoreToOffset = null;
 		}
 		else {
+			// start with empty offsets
+			lastOffsets = new HashMap<>();
+
 			// no restore request. Let the offset handler take care of the initial offset seeking
 			offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
 		}
@@ -409,7 +405,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	public void run(SourceContext<T> sourceContext) throws Exception {
 		if (fetcher != null) {
 			// For non-checkpointed sources, a thread which periodically commits the current offset into ZK.
-			PeriodicOffsetCommitter offsetCommitter = null;
+			PeriodicOffsetCommitter<T> offsetCommitter = null;
 
 			// check whether we need to start the periodic checkpoint committer
 			StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
@@ -418,7 +414,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 				// Note that the default configuration value in Kafka is 60 * 1000, so we use the
 				// same here.
 				long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
-				offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
+				offsetCommitter = new PeriodicOffsetCommitter<>(commitInterval, this);
 				offsetCommitter.setDaemon(true);
 				offsetCommitter.start();
 				LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
@@ -504,7 +500,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	// ------------------------------------------------------------------------
 
 	@Override
-	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+	public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
 		if (lastOffsets == null) {
 			LOG.debug("snapshotState() requested on not yet opened source; returning null.");
 			return null;
@@ -516,10 +512,12 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
-					Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+					KafkaTopicPartition.toString(lastOffsets), checkpointId, checkpointTimestamp);
 		}
 
-		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
+		// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
+		//noinspection unchecked
+		HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) lastOffsets.clone();
 
 		// the map cannot be asynchronously updated, because only one checkpoint call can happen
 		// on this function at a time: either snapshotState() or notifyCheckpointComplete()
@@ -533,7 +531,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	}
 
 	@Override
-	public void restoreState(long[] restoredOffsets) {
+	public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
 		restoreToOffset = restoredOffsets;
 	}
 
@@ -554,7 +552,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		}
 
 		try {
-			long[] checkpointOffsets;
+			HashMap<KafkaTopicPartition, Long> checkpointOffsets;
 	
 			// the map may be asynchronously updates when snapshotting state, so we synchronize
 			synchronized (pendingCheckpoints) {
@@ -563,39 +561,21 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
 					return;
 				}
-	
-				checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
+
+				//noinspection unchecked
+				checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
+
 				
 				// remove older checkpoints in map
 				for (int i = 0; i < posInMap; i++) {
 					pendingCheckpoints.remove(0);
 				}
 			}
-	
-			if (LOG.isInfoEnabled()) {
-				LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
-			}
-	
-			// build the map of (topic,partition) -> committed offset
-			Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
-			for (TopicPartition tp : subscribedPartitions) {
-				
-				int partition = tp.partition();
-				long offset = checkpointOffsets[partition];
-				long lastCommitted = commitedOffsets[partition];
-				
-				if (offset != OFFSET_NOT_SET) {
-					if (offset > lastCommitted) {
-						offsetsToCommit.put(tp, offset);
-						LOG.debug("Committing offset {} for partition {}", offset, partition);
-					}
-					else {
-						LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
-					}
-				}
+			if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
+				LOG.info("Checkpoint state was empty.");
+				return;
 			}
-			
-			offsetHandler.commit(offsetsToCommit);
+			commitOffsets(checkpointOffsets, this);
 		}
 		catch (Exception e) {
 			if (running) {
@@ -609,16 +589,15 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	//  Miscellaneous utilities 
 	// ------------------------------------------------------------------------
 
-	protected static List<TopicPartition> assignPartitions(int[] partitions, String topicName,
-															int numConsumers, int consumerIndex) {
+	protected static List<KafkaTopicPartitionLeader> assignPartitions(List<KafkaTopicPartitionLeader> partitions, int numConsumers, int consumerIndex) {
 		checkArgument(numConsumers > 0);
 		checkArgument(consumerIndex < numConsumers);
 		
-		List<TopicPartition> partitionsToSub = new ArrayList<>();
+		List<KafkaTopicPartitionLeader> partitionsToSub = new ArrayList<>();
 
-		for (int i = 0; i < partitions.length; i++) {
+		for (int i = 0; i < partitions.size(); i++) {
 			if (i % numConsumers == consumerIndex) {
-				partitionsToSub.add(new TopicPartition(topicName, partitions[i]));
+				partitionsToSub.add(partitions.get(i));
 			}
 		}
 		return partitionsToSub;
@@ -627,12 +606,12 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	/**
 	 * Thread to periodically commit the current read offset into Zookeeper.
 	 */
-	private static class PeriodicOffsetCommitter extends Thread {
+	private static class PeriodicOffsetCommitter<T> extends Thread {
 		private final long commitInterval;
-		private final FlinkKafkaConsumer consumer;
+		private final FlinkKafkaConsumer<T> consumer;
 		private volatile boolean running = true;
 
-		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
+		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer<T> consumer) {
 			this.commitInterval = commitInterval;
 			this.consumer = consumer;
 		}
@@ -640,31 +619,15 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		@Override
 		public void run() {
 			try {
+
 				while (running) {
 					try {
 						Thread.sleep(commitInterval);
 						//  ------------  commit current offsets ----------------
 
 						// create copy of current offsets
-						long[] currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
-
-						Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
-						for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
-							int partition = tp.partition();
-							long offset = currentOffsets[partition];
-							long lastCommitted = consumer.commitedOffsets[partition];
-
-							if (offset != OFFSET_NOT_SET) {
-								if (offset > lastCommitted) {
-									offsetsToCommit.put(tp, offset);
-									LOG.debug("Committing offset {} for partition {}", offset, partition);
-								} else {
-									LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
-								}
-							}
-						}
-
-						consumer.offsetHandler.commit(offsetsToCommit);
+						HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) consumer.lastOffsets.clone();
+						commitOffsets(currentOffsets, this.consumer);
 					} catch (InterruptedException e) {
 						if (running) {
 							// throw unexpected interruption
@@ -686,6 +649,40 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		}
 
 	}
+
+	/**
+	 * Utility method to commit offsets.
+	 *
+	 * @param toCommit the offsets to commit
+	 * @param consumer consumer reference
+	 * @param <T> message type
+	 * @throws Exception
+	 */
+	private static <T> void commitOffsets(HashMap<KafkaTopicPartition, Long> toCommit, FlinkKafkaConsumer<T> consumer) throws Exception {
+		Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>();
+		for (KafkaTopicPartitionLeader tp : consumer.subscribedPartitions) {
+			long offset = toCommit.get(tp.getTopicPartition());
+			Long lastCommitted = consumer.committedOffsets.get(tp.getTopicPartition());
+			if (lastCommitted == null) {
+				lastCommitted = OFFSET_NOT_SET;
+			}
+			if (offset != OFFSET_NOT_SET) {
+				if (offset > lastCommitted) {
+					offsetsToCommit.put(tp.getTopicPartition(), offset);
+					consumer.committedOffsets.put(tp.getTopicPartition(), offset);
+					LOG.debug("Committing offset {} for partition {}", offset, tp.getTopicPartition());
+				} else {
+					LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, tp.getTopicPartition());
+				}
+			}
+		}
+
+		if (LOG.isDebugEnabled() && offsetsToCommit.size() > 0) {
+			LOG.debug("Committing offsets {} to offset store: {}", KafkaTopicPartition.toString(offsetsToCommit), consumer.offsetStore);
+		}
+
+		consumer.offsetHandler.commit(offsetsToCommit);
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Kafka / ZooKeeper communication utilities
@@ -694,19 +691,19 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	/**
 	 * Send request to Kafka to get partitions for topic.
 	 * 
-	 * @param topic The name of the topic.
+	 * @param topics The name of the topics.
 	 * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. 
 	 */
-	public static List<PartitionInfo> getPartitionsForTopic(final String topic, final Properties properties) {
+	public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(final List<String> topics, final Properties properties) {
 		String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
 		final int numRetries = Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES)));
 
 		checkNotNull(seedBrokersConfString, "Configuration property " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set");
 		String[] seedBrokers = seedBrokersConfString.split(",");
-		List<PartitionInfo> partitions = new ArrayList<>();
+		List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
 
 		Random rnd = new Random();
-		retryLoop: for(int retry = 0; retry < numRetries; retry++) {
+		retryLoop: for (int retry = 0; retry < numRetries; retry++) {
 			// we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
 			// parallel source instances start. Still, we try all available brokers.
 			int index = rnd.nextInt(seedBrokers.length);
@@ -725,7 +722,6 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 					final int bufferSize = Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536"));
 					consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);
 
-					List<String> topics = Collections.singletonList(topic);
 					TopicMetadataRequest req = new TopicMetadataRequest(topics);
 					kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 
@@ -740,32 +736,24 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 								throw new RuntimeException("Requested partitions for unknown topic", ErrorMapping.exceptionFor(item.errorCode()));
 							}
 							// warn and try more brokers
-							LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions for " + topic,
-									ErrorMapping.exceptionFor(item.errorCode()));
+							LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " +
+									"for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage());
 							continue brokersLoop;
 						}
-						if (!item.topic().equals(topic)) {
+						if (!topics.contains(item.topic())) {
 							LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
 							continue brokersLoop;
 						}
 						for (PartitionMetadata part : item.partitionsMetadata()) {
 							Node leader = brokerToNode(part.leader());
-							Node[] replicas = new Node[part.replicas().size()];
-							for (int i = 0; i < part.replicas().size(); i++) {
-								replicas[i] = brokerToNode(part.replicas().get(i));
-							}
-
-							Node[] ISRs = new Node[part.isr().size()];
-							for (int i = 0; i < part.isr().size(); i++) {
-								ISRs[i] = brokerToNode(part.isr().get(i));
-							}
-							PartitionInfo pInfo = new PartitionInfo(topic, part.partitionId(), leader, replicas, ISRs);
+							KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
+							KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
 							partitions.add(pInfo);
 						}
 					}
 					break retryLoop; // leave the loop through the brokers
 				} catch (Exception e) {
-					LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topic, e);
+					LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString(), e);
 				} finally {
 					if (consumer != null) {
 						consumer.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
index 21f24e6..abe33aa 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
+import java.util.Collections;
 import java.util.Properties;
 
 /**
@@ -52,6 +53,6 @@ public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer<T> {
 	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
 	 */
 	public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
-		super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+		super(Collections.singletonList(topic), valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
index ab4a88a..adc42de 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 
 /**
@@ -47,9 +49,12 @@ public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
 	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
 	 */
 	public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
-		super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+		super(Collections.singletonList(topic), valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
 	}
 
+
+	//----- key-value deserializer constructor
+
 	/**
 	 * Creates a new Kafka 0.8.2.x streaming source consumer.
 	 *
@@ -64,6 +69,17 @@ public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
 	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
 	 */
 	public FlinkKafkaConsumer082(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
-		super(topic, deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+		super(Collections.singletonList(topic), deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+	}
+
+	//----- topic list constructors
+
+
+	public FlinkKafkaConsumer082(List<String> topics, DeserializationSchema<T> valueDeserializer, Properties props) {
+		super(topics, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+	}
+
+	public FlinkKafkaConsumer082(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		super(topics, deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index a8d913b..7e01b54 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -198,13 +198,13 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 
 		// set the producer configuration properties.
 
-		if(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+		if (!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
 			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
 		} else {
 			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
 		}
 
-		if(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+		if (!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
 			this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
 		} else {
 			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
deleted file mode 100644
index afa2e42..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.api;
-
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-/**
- * Sink that emits its inputs to a Kafka topic.
- *
- * The KafkaSink has been relocated to org.apache.flink.streaming.connectors.kafka.KafkaSink.
- * This class will be removed in future releases of Flink.
- * 
- * @deprecated Please use the {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} instead.
- */
-@Deprecated
-public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
-	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		super(brokerList, topicId, serializationSchema);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
deleted file mode 100644
index 2efeb20..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.persistent;
-
-import kafka.consumer.ConsumerConfig;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.1+ consumers.
- *
- * This class is provided as a migration path from the old Flink kafka connectors to the new, updated implemntations.
- *
- * Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082.
- *
- * @param <T> The type of elements produced by this consumer.
- * 
- * @deprecated Due to Kafka protocol and architecture (offset handling) changes, please use the
- *             Kafka version specific consumers, like
- *             {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081}, 
- *             {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082}, etc.
- */
-@Deprecated
-public class PersistentKafkaSource<T> extends FlinkKafkaConsumer<T> {
-
-	private static final long serialVersionUID = -8450689820627198228L;
-
-	/**
-	 * Creates a new Kafka 0.8.2.x streaming source consumer.
-	 *
-	 * @param topic
-	 *           The name of the topic that should be consumed.
-	 * @param valueDeserializer
-	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
-	 * @param consumerConfig
-	 *           The consumer config used to configure the Kafka consumer client, and the ZooKeeper client.
-	 */
-	public PersistentKafkaSource(String topic, DeserializationSchema<T> valueDeserializer, ConsumerConfig consumerConfig) {
-		super(topic, valueDeserializer, consumerConfig.props().props(), OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
index dabafa9..4f1a2a6 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
@@ -19,10 +19,9 @@ package org.apache.flink.streaming.connectors.kafka.internals;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.kafka.common.TopicPartition;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.HashMap;
 
 /**
  * A fetcher pulls data from Kafka, from a fix set of partitions.
@@ -31,15 +30,8 @@ import java.util.List;
 public interface Fetcher {
 
 	/**
-	 * Set which partitions the fetcher should pull from.
-	 * 
-	 * @param partitions The list of partitions for a topic that the fetcher will pull from.
-	 */
-	void setPartitionsToRead(List<TopicPartition> partitions);
-
-	/**
 	 * Closes the fetcher. This will stop any operation in the
-	 * {@link #run(SourceFunction.SourceContext, DeserializationSchema, long[])} method and eventually
+	 * {@link #run(SourceFunction.SourceContext, KeyedDeserializationSchema, HashMap)} method and eventually
 	 * close underlying connections and release all resources.
 	 */
 	void close() throws IOException;
@@ -61,15 +53,14 @@ public interface Fetcher {
 	 *     }
 	 * }
 	 * }</pre>
-	 * 
+	 *
+	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
 	 * @param sourceContext The source context to emit elements to.
 	 * @param valueDeserializer The deserializer to decode the raw values with.
-	 * @param lastOffsets The array into which to store the offsets for which elements are emitted (operator state)
-	 * 
-	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
+	 * @param lastOffsets The map into which to store the offsets for which elements are emitted (operator state)
 	 */
 	<T> void run(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> valueDeserializer,
-					long[] lastOffsets) throws Exception;
+				HashMap<KafkaTopicPartition, Long> lastOffsets) throws Exception;
 	
 	/**
 	 * Set the next offset to read from for the given partition.
@@ -79,7 +70,7 @@ public interface Fetcher {
 	 * @param topicPartition The partition for which to seek the offset.
 	 * @param offsetToRead To offset to seek to.
 	 */
-	void seek(TopicPartition topicPartition, long offsetToRead);
+	void seek(KafkaTopicPartition topicPartition, long offsetToRead);
 
 	/**
 	 * Exit run loop with given error and release all resources.

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
new file mode 100644
index 0000000..f269aa3
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a kafka topic and a partition.
+ * Used as an operator state for the Kafka consumer
+ */
+public class KafkaTopicPartition implements Serializable {
+
+	private static final long serialVersionUID = 722083576322742325L;
+
+	private final String topic;
+	private final int partition;
+	private final int cachedHash;
+
+	public KafkaTopicPartition(String topic, int partition) {
+		this.topic = checkNotNull(topic);
+		this.partition = partition;
+		this.cachedHash = 31 * topic.hashCode() + partition;
+	}
+
+	public String getTopic() {
+		return topic;
+	}
+
+	public int getPartition() {
+		return partition;
+	}
+
+	@Override
+	public String toString() {
+		return "KafkaTopicPartition{" +
+				"topic='" + topic + '\'' +
+				", partition=" + partition +
+				'}';
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof KafkaTopicPartition)) {
+			return false;
+		}
+
+		KafkaTopicPartition that = (KafkaTopicPartition) o;
+
+		if (partition != that.partition) {
+			return false;
+		}
+		return topic.equals(that.topic);
+	}
+
+	@Override
+	public int hashCode() {
+		return cachedHash;
+	}
+
+
+	// ------------------- Utilities -------------------------------------
+
+	/**
+	 * Returns a unique list of topics from the topic partition map
+	 *
+	 * @param topicPartitionMap A map of KafkaTopicPartition's
+	 * @return A unique list of topics from the input map
+	 */
+	public static List<String> getTopics(Map<KafkaTopicPartition, ?> topicPartitionMap) {
+		HashSet<String> uniqueTopics = new HashSet<>();
+		for (KafkaTopicPartition ktp: topicPartitionMap.keySet()) {
+			uniqueTopics.add(ktp.getTopic());
+		}
+		return new ArrayList<>(uniqueTopics);
+	}
+
+	public static String toString(Map<KafkaTopicPartition, Long> map) {
+		StringBuilder sb = new StringBuilder();
+		for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
+			KafkaTopicPartition ktp = p.getKey();
+			sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", ");
+		}
+		return sb.toString();
+	}
+
+	/**
+	 * Checks whether this partition is contained in the map with KafkaTopicPartitionLeaders
+	 *
+	 * @param map The map of KafkaTopicPartitionLeaders
+	 * @return true if the element is contained.
+	 */
+	public boolean isContained(Map<KafkaTopicPartitionLeader, ?> map) {
+		for(Map.Entry<KafkaTopicPartitionLeader, ?> entry : map.entrySet()) {
+			if(entry.getKey().getTopicPartition().equals(this)) {
+				return true;
+			}
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
new file mode 100644
index 0000000..8dd9a52
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.kafka.common.Node;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serializable Topic Partition info with leader Node information.
+ * This class is used at runtime.
+ */
+public class KafkaTopicPartitionLeader implements Serializable {
+
+	private static final long serialVersionUID = 9145855900303748582L;
+
+	private final int leaderId;
+	private final int leaderPort;
+	private final String leaderHost;
+	private final KafkaTopicPartition topicPartition;
+	private final int cachedHash;
+
+	public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) {
+		this.topicPartition = topicPartition;
+		if (leader == null) {
+			this.leaderId = -1;
+			this.leaderHost = null;
+			this.leaderPort = -1;
+		} else {
+			this.leaderId = leader.id();
+			this.leaderPort = leader.port();
+			this.leaderHost = leader.host();
+		}
+		int cachedHash = (leader == null) ? 14 : leader.hashCode();
+		this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
+	}
+
+	public KafkaTopicPartition getTopicPartition() {
+		return topicPartition;
+	}
+
+	public Node getLeader() {
+		if (this.leaderId == -1) {
+			return null;
+		} else {
+			return new Node(leaderId, leaderHost, leaderPort);
+		}
+	}
+
+	public static Object toString(List<KafkaTopicPartitionLeader> partitions) {
+		StringBuilder sb = new StringBuilder();
+		for (KafkaTopicPartitionLeader p: partitions) {
+			sb.append(p.getTopicPartition().getTopic()).append(":").append(p.getTopicPartition().getPartition()).append(", ");
+		}
+		return sb.toString();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof KafkaTopicPartitionLeader)) {
+			return false;
+		}
+
+		KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
+
+		if (!topicPartition.equals(that.topicPartition)) {
+			return false;
+		}
+		return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost);
+	}
+
+	@Override
+	public int hashCode() {
+		return cachedHash;
+	}
+
+	@Override
+	public String toString() {
+		return "KafkaTopicPartitionLeader{" +
+				"leaderId=" + leaderId +
+				", leaderPort=" + leaderPort +
+				", leaderHost='" + leaderHost + '\'' +
+				", topic=" + topicPartition.getTopic() +
+				", partition=" + topicPartition.getPartition() +
+				'}';
+	}
+
+
+	/**
+	 * Replaces an existing KafkaTopicPartition ignoring the leader in the given map.
+	 *
+	 * @param newKey new topicpartition
+	 * @param newValue new offset
+	 * @param map map to do the search in
+	 * @return oldValue the old value (offset)
+	 */
+	public static Long replaceIgnoringLeader(KafkaTopicPartitionLeader newKey, Long newValue, Map<KafkaTopicPartitionLeader, Long> map) {
+		for(Map.Entry<KafkaTopicPartitionLeader, Long> entry: map.entrySet()) {
+			if(entry.getKey().getTopicPartition().equals(newKey.getTopicPartition())) {
+				Long oldValue = map.remove(entry.getKey());
+				if(map.put(newKey, newValue) != null) {
+					throw new IllegalStateException("Key was not removed before");
+				}
+				return oldValue;
+			}
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index 95683ce..4233c18 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -31,18 +31,17 @@ import kafka.message.MessageAndOffset;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.StringUtils;
 
+import org.apache.flink.util.StringUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -53,16 +52,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * This fetcher uses Kafka's low-level API to pull data from a specific
- * set of partitions and offsets for a certain topic.
+ * set of topics and partitions.
  * 
  * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p>
  */
 public class LegacyFetcher implements Fetcher {
 	
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
+	private static final Logger LOG = LoggerFactory.getLogger(LegacyFetcher.class);
 
-	/** The topic from which this fetcher pulls data */
-	private final String topic;
 	
 	/** The properties that configure the Kafka connection */
 	private final Properties config;
@@ -74,7 +71,13 @@ public class LegacyFetcher implements Fetcher {
 	private final AtomicReference<Throwable> error;
 
 	/** The partitions that the fetcher should read, with their starting offsets */
-	private Map<TopicPartition, Long> partitionsToRead;
+	private Map<KafkaTopicPartitionLeader, Long> partitionsToRead;
+
+	/** The seek() method might receive KafkaTopicPartition's without leader information
+	 * (for example when restoring).
+	 * If there are elements in this list, we'll fetch the leader from Kafka.
+	 **/
+	private Map<KafkaTopicPartition, Long> partitionsToReadWithoutLeader;
 	
 	/** Reference the the thread that executed the run() method. */
 	private volatile Thread mainThread;
@@ -82,9 +85,13 @@ public class LegacyFetcher implements Fetcher {
 	/** Flag to shot the fetcher down */
 	private volatile boolean running = true;
 
-	public LegacyFetcher(String topic, Properties props, String taskName) {
+	public LegacyFetcher(List<KafkaTopicPartitionLeader> partitions, Properties props, String taskName) {
 		this.config = checkNotNull(props, "The config properties cannot be null");
-		this.topic = checkNotNull(topic, "The topic cannot be null");
+		//this.topic = checkNotNull(topic, "The topic cannot be null");
+		this.partitionsToRead = new HashMap<>();
+		for (KafkaTopicPartitionLeader p: partitions) {
+			partitionsToRead.put(p, FlinkKafkaConsumer.OFFSET_NOT_SET);
+		}
 		this.taskName = taskName;
 		this.error = new AtomicReference<>();
 	}
@@ -94,23 +101,18 @@ public class LegacyFetcher implements Fetcher {
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public void setPartitionsToRead(List<TopicPartition> partitions) {
-		partitionsToRead = new HashMap<>(partitions.size());
-		for (TopicPartition tp: partitions) {
-			partitionsToRead.put(tp, FlinkKafkaConsumer.OFFSET_NOT_SET);
-		}
-	}
-
-	@Override
-	public void seek(TopicPartition topicPartition, long offsetToRead) {
+	public void seek(KafkaTopicPartition topicPartition, long offsetToRead) {
 		if (partitionsToRead == null) {
 			throw new IllegalArgumentException("No partitions to read set");
 		}
-		if (!partitionsToRead.containsKey(topicPartition)) {
+		if (!topicPartition.isContained(partitionsToRead)) {
 			throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition
 					+ ") we are not going to read. Partitions to read " + partitionsToRead);
 		}
-		partitionsToRead.put(topicPartition, offsetToRead);
+		if (partitionsToReadWithoutLeader == null) {
+			partitionsToReadWithoutLeader = new HashMap<>();
+		}
+		partitionsToReadWithoutLeader.put(topicPartition, offsetToRead);
 	}
 	
 	@Override
@@ -124,7 +126,7 @@ public class LegacyFetcher implements Fetcher {
 	@Override
 	public <T> void run(SourceFunction.SourceContext<T> sourceContext,
 						KeyedDeserializationSchema<T> deserializer,
-						long[] lastOffsets) throws Exception {
+						HashMap<KafkaTopicPartition, Long> lastOffsets) throws Exception {
 		
 		if (partitionsToRead == null || partitionsToRead.size() == 0) {
 			throw new IllegalArgumentException("No partitions set");
@@ -135,54 +137,57 @@ public class LegacyFetcher implements Fetcher {
 		this.mainThread = Thread.currentThread();
 
 		LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher");
-		
-		// get lead broker for each partition
-		
-		// NOTE: The kafka client apparently locks itself in an infinite loop sometimes
-		// when it is interrupted, so we run it only in a separate thread.
-		// since it sometimes refuses to shut down, we resort to the admittedly harsh
-		// means of killing the thread after a timeout.
-		PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(topic, config);
-		infoFetcher.start();
-		
-		KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
-		watchDog.start();
-		
-		final List<PartitionInfo> allPartitionsInTopic = infoFetcher.getPartitions();
-		
-		// brokers to fetch partitions from.
-		int fetchPartitionsCount = 0;
-		Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<>();
-		
-		for (PartitionInfo partitionInfo : allPartitionsInTopic) {
-			if (partitionInfo.leader() == null) {
-				throw new RuntimeException("Unable to consume partition " + partitionInfo.partition()
-						+ " from topic "+partitionInfo.topic()+" because it does not have a leader");
-			}
-			
-			for (Map.Entry<TopicPartition, Long> entry : partitionsToRead.entrySet()) {
-				final TopicPartition topicPartition = entry.getKey();
-				final long offset = entry.getValue();
-				
-				// check if that partition is for us
-				if (topicPartition.partition() == partitionInfo.partition()) {
-					List<FetchPartition> partitions = fetchBrokers.get(partitionInfo.leader());
-					if (partitions == null) {
-						partitions = new ArrayList<>();
-						fetchBrokers.put(partitionInfo.leader(), partitions);
+
+		// get lead broker if necessary
+		if (partitionsToReadWithoutLeader != null && partitionsToReadWithoutLeader.size() > 0) {
+			LOG.info("Refreshing leader information for partitions {}", KafkaTopicPartition.toString(partitionsToReadWithoutLeader));
+			// NOTE: The kafka client apparently locks itself in an infinite loop sometimes
+			// when it is interrupted, so we run it only in a separate thread.
+			// since it sometimes refuses to shut down, we resort to the admittedly harsh
+			// means of killing the thread after a timeout.
+			PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(KafkaTopicPartition.getTopics(partitionsToReadWithoutLeader), config);
+			infoFetcher.start();
+
+			KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
+			watchDog.start();
+
+			List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions();
+
+			// replace potentially outdated leader information in partitionsToRead with fresh data from topicPartitionWithLeader
+			for (Map.Entry<KafkaTopicPartition, Long> pt: partitionsToReadWithoutLeader.entrySet()) {
+				KafkaTopicPartitionLeader topicPartitionWithLeader = null;
+				// go through list
+				for (KafkaTopicPartitionLeader withLeader: topicPartitionWithLeaderList) {
+					if (withLeader.getTopicPartition().equals(pt.getKey())) {
+						topicPartitionWithLeader = withLeader;
+						break;
 					}
-					
-					partitions.add(new FetchPartition(topicPartition.partition(), offset));
-					fetchPartitionsCount++;
-					
 				}
-				// else this partition is not for us
+				if (topicPartitionWithLeader == null) {
+					throw new IllegalStateException("Unable to find topic/partition leader information");
+				}
+				Long removed = KafkaTopicPartitionLeader.replaceIgnoringLeader(topicPartitionWithLeader, pt.getValue(), partitionsToRead);
+				if (removed == null) {
+					throw new IllegalStateException("Seek request on unknown topic partition");
+				}
 			}
 		}
-		
-		if (partitionsToRead.size() != fetchPartitionsCount) {
-			throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "
-					+ fetchPartitionsCount + " partition infos with lead brokers.");
+
+
+		// build a map for each broker with its partitions
+		Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<>();
+
+		for (Map.Entry<KafkaTopicPartitionLeader, Long> entry : partitionsToRead.entrySet()) {
+			final KafkaTopicPartitionLeader topicPartition = entry.getKey();
+			final long offset = entry.getValue();
+
+			List<FetchPartition> partitions = fetchBrokers.get(topicPartition.getLeader());
+			if (partitions == null) {
+				partitions = new ArrayList<>();
+				fetchBrokers.put(topicPartition.getLeader(), partitions);
+			}
+
+			partitions.add(new FetchPartition(topicPartition.getTopicPartition().getTopic(), topicPartition.getTopicPartition().getPartition(), offset));
 		}
 
 		// create SimpleConsumers for each broker
@@ -194,7 +199,7 @@ public class LegacyFetcher implements Fetcher {
 			
 			FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
 
-			SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config, topic,
+			SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config,
 					broker, partitions, sourceContext, deserializer, lastOffsets);
 
 			thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
@@ -274,21 +279,24 @@ public class LegacyFetcher implements Fetcher {
 	 * Representation of a partition to fetch.
 	 */
 	private static class FetchPartition {
+
+		final String topic;
 		
 		/** ID of the partition within the topic (0 indexed, as given by Kafka) */
-		int partition;
+		final int partition;
 		
 		/** Offset pointing at the next element to read from that partition. */
 		long nextOffsetToRead;
 
-		FetchPartition(int partition, long nextOffsetToRead) {
+		FetchPartition(String topic, int partition, long nextOffsetToRead) {
+			this.topic = topic;
 			this.partition = partition;
 			this.nextOffsetToRead = nextOffsetToRead;
 		}
 		
 		@Override
 		public String toString() {
-			return "FetchPartition {partition=" + partition + ", offset=" + nextOffsetToRead + '}';
+			return "FetchPartition {topic=" + topic +", partition=" + partition + ", offset=" + nextOffsetToRead + '}';
 		}
 	}
 
@@ -306,12 +314,12 @@ public class LegacyFetcher implements Fetcher {
 		
 		private final SourceFunction.SourceContext<T> sourceContext;
 		private final KeyedDeserializationSchema<T> deserializer;
-		private final long[] offsetsState;
+		private final HashMap<KafkaTopicPartition, Long> offsetsState;
 		
 		private final FetchPartition[] partitions;
 		
 		private final Node broker;
-		private final String topic;
+
 		private final Properties config;
 
 		private final LegacyFetcher owner;
@@ -323,15 +331,14 @@ public class LegacyFetcher implements Fetcher {
 
 		// exceptions are thrown locally
 		public SimpleConsumerThread(LegacyFetcher owner,
-									Properties config, String topic,
+									Properties config,
 									Node broker,
 									FetchPartition[] partitions,
 									SourceFunction.SourceContext<T> sourceContext,
 									KeyedDeserializationSchema<T> deserializer,
-									long[] offsetsState) {
+									HashMap<KafkaTopicPartition, Long> offsetsState) {
 			this.owner = owner;
 			this.config = config;
-			this.topic = topic;
 			this.broker = broker;
 			this.partitions = partitions;
 			this.sourceContext = checkNotNull(sourceContext);
@@ -341,6 +348,7 @@ public class LegacyFetcher implements Fetcher {
 
 		@Override
 		public void run() {
+			LOG.info("Starting to fetch from {}", Arrays.toString(this.partitions));
 			try {
 				// set up the config values
 				final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
@@ -368,14 +376,13 @@ public class LegacyFetcher implements Fetcher {
 						}
 					}
 					if (partitionsToGetOffsetsFor.size() > 0) {
-						getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
-						LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
-								topic, partitionsToGetOffsetsFor);
+						getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
+						LOG.info("No prior offsets found for some partitions. Fetched the following start offsets {}", partitionsToGetOffsetsFor);
 					}
 				}
 				
 				// Now, the actual work starts :-)
-				int OffsetOutOfRangeCount = 0;
+				int offsetOutOfRangeCount = 0;
 				while (running) {
 					FetchRequestBuilder frb = new FetchRequestBuilder();
 					frb.clientId(clientId);
@@ -383,38 +390,37 @@ public class LegacyFetcher implements Fetcher {
 					frb.minBytes(minBytes);
 					
 					for (FetchPartition fp : partitions) {
-						frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, fetchSize);
+						frb.addFetch(fp.topic, fp.partition, fp.nextOffsetToRead, fetchSize);
 					}
 					kafka.api.FetchRequest fetchRequest = frb.build();
 					LOG.debug("Issuing fetch request {}", fetchRequest);
 
-					FetchResponse fetchResponse;
-					fetchResponse = consumer.fetch(fetchRequest);
+					FetchResponse fetchResponse = consumer.fetch(fetchRequest);
 
 					if (fetchResponse.hasError()) {
 						String exception = "";
 						List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
 						for (FetchPartition fp : partitions) {
-							short code = fetchResponse.errorCode(topic, fp.partition);
+							short code = fetchResponse.errorCode(fp.topic, fp.partition);
 
-							if(code == ErrorMapping.OffsetOutOfRangeCode()) {
+							if (code == ErrorMapping.OffsetOutOfRangeCode()) {
 								// we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
 								// Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
 								partitionsToGetOffsetsFor.add(fp);
-							} else if(code != ErrorMapping.NoError()) {
+							} else if (code != ErrorMapping.NoError()) {
 								exception += "\nException for partition " + fp.partition + ": " +
 										StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
 							}
 						}
 						if (partitionsToGetOffsetsFor.size() > 0) {
 							// safeguard against an infinite loop.
-							if(OffsetOutOfRangeCount++ > 0) {
+							if (offsetOutOfRangeCount++ > 0) {
 								throw new RuntimeException("Found invalid offsets more than once in partitions "+partitionsToGetOffsetsFor.toString()+" " +
 										"Exceptions: "+exception);
 							}
 							// get valid offsets for these partitions and try again.
 							LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
-							getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
+							getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
 							LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
 							continue; // jump back to create a new fetch request. The offset has not been touched.
 						} else {
@@ -424,9 +430,10 @@ public class LegacyFetcher implements Fetcher {
 					}
 
 					int messagesInFetch = 0;
+					int deletedMessages = 0;
 					for (FetchPartition fp : partitions) {
-						final ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
-						final int partition = fp.partition;
+						final ByteBufferMessageSet messageSet = fetchResponse.messageSet(fp.topic, fp.partition);
+						final KafkaTopicPartition topicPartition = new KafkaTopicPartition(fp.topic, fp.partition);
 						
 						for (MessageAndOffset msg : messageSet) {
 							if (running) {
@@ -439,8 +446,19 @@ public class LegacyFetcher implements Fetcher {
 									continue;
 								}
 
+								final long offset = msg.offset();
+
 								// put value into byte array
 								ByteBuffer payload = msg.message().payload();
+								if (payload == null) {
+									// This message has no value (which means it has been deleted from the Kafka topic)
+									deletedMessages++;
+									// advance offset in state to avoid re-reading the message
+									synchronized (sourceContext.getCheckpointLock()) {
+										offsetsState.put(topicPartition, offset);
+									}
+									continue;
+								}
 								byte[] valueBytes = new byte[payload.remaining()];
 								payload.get(valueBytes);
 
@@ -454,12 +472,10 @@ public class LegacyFetcher implements Fetcher {
 									keyPayload.get(keyBytes);
 								}
 
-								final long offset = msg.offset();
-								final T value = deserializer.deserialize(keyBytes, valueBytes, offset);
-
+								final T value = deserializer.deserialize(keyBytes, valueBytes, fp.topic, offset);
 								synchronized (sourceContext.getCheckpointLock()) {
 									sourceContext.collect(value);
-									offsetsState[partition] = offset;
+									offsetsState.put(topicPartition, offset);
 								}
 								
 								// advance offset for the next request
@@ -471,7 +487,7 @@ public class LegacyFetcher implements Fetcher {
 							}
 						}
 					}
-					LOG.debug("This fetch contained {} messages", messagesInFetch);
+					LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages);
 				}
 			}
 			catch (Throwable t) {
@@ -510,15 +526,14 @@ public class LegacyFetcher implements Fetcher {
 		 * Request latest offsets for a set of partitions, via a Kafka consumer.
 		 *
 		 * @param consumer The consumer connected to lead broker
-		 * @param topic The topic name
 		 * @param partitions The list of partitions we need offsets for
 		 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
 		 */
-		private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
+		private static void getLastOffset(SimpleConsumer consumer, List<FetchPartition> partitions, long whichTime) {
 
 			Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
 			for (FetchPartition fp: partitions) {
-				TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
+				TopicAndPartition topicAndPartition = new TopicAndPartition(fp.topic, fp.partition);
 				requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
 			}
 
@@ -529,18 +544,17 @@ public class LegacyFetcher implements Fetcher {
 				String exception = "";
 				for (FetchPartition fp: partitions) {
 					short code;
-					if ( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
+					if ( (code=response.errorCode(fp.topic, fp.partition)) != ErrorMapping.NoError()) {
 						exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
 					}
 				}
-				throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions
-						+ ". " + exception);
+				throw new RuntimeException("Unable to get last offset for partitions " + partitions + ". " + exception);
 			}
 
 			for (FetchPartition fp: partitions) {
 				// the resulting offset is the next offset we are going to read
 				// for not-yet-consumed partitions, it is 0.
-				fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
+				fp.nextOffsetToRead = response.offsets(fp.topic, fp.partition)[0];
 			}
 		}
 
@@ -554,41 +568,42 @@ public class LegacyFetcher implements Fetcher {
 			return timeType;
 		}
 	}
-	
+
+
 	private static class PartitionInfoFetcher extends Thread {
 
-		private final String topic;
+		private final List<String> topics;
 		private final Properties properties;
-		
-		private volatile List<PartitionInfo> result;
+
+		private volatile List<KafkaTopicPartitionLeader> result;
 		private volatile Throwable error;
 
-		
-		PartitionInfoFetcher(String topic, Properties properties) {
-			this.topic = topic;
+
+		PartitionInfoFetcher(List<String> topics, Properties properties) {
+			this.topics = topics;
 			this.properties = properties;
 		}
 
 		@Override
 		public void run() {
 			try {
-				result = FlinkKafkaConsumer.getPartitionsForTopic(topic, properties);
+				result = FlinkKafkaConsumer.getPartitionsForTopic(topics, properties);
 			}
 			catch (Throwable t) {
 				this.error = t;
 			}
 		}
-		
-		public List<PartitionInfo> getPartitions() throws Exception {
+
+		public List<KafkaTopicPartitionLeader> getPartitions() throws Exception {
 			try {
 				this.join();
 			}
 			catch (InterruptedException e) {
 				throw new Exception("Partition fetching was cancelled before completion");
 			}
-			
+
 			if (error != null) {
-				throw new Exception("Failed to fetch partitions for topic " + topic, error);
+				throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error);
 			}
 			if (result != null) {
 				return result;
@@ -598,14 +613,14 @@ public class LegacyFetcher implements Fetcher {
 	}
 
 	private static class KillerWatchDog extends Thread {
-		
+
 		private final Thread toKill;
 		private final long timeout;
 
 		private KillerWatchDog(Thread toKill, long timeout) {
 			super("KillerWatchDog");
 			setDaemon(true);
-			
+
 			this.toKill = toKill;
 			this.timeout = timeout;
 		}
@@ -615,7 +630,7 @@ public class LegacyFetcher implements Fetcher {
 		public void run() {
 			final long deadline = System.currentTimeMillis() + timeout;
 			long now;
-			
+
 			while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
 				try {
 					toKill.join(deadline - now);
@@ -624,7 +639,7 @@ public class LegacyFetcher implements Fetcher {
 					// ignore here, our job is important!
 				}
 			}
-			
+
 			// this is harsh, but this watchdog is a last resort
 			if (toKill.isAlive()) {
 				toKill.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
index 2a82561..fdd89c6 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
@@ -19,8 +19,6 @@
 package org.apache.flink.streaming.connectors.kafka.internals;
 
 
-import org.apache.kafka.common.TopicPartition;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -38,7 +36,7 @@ public interface OffsetHandler {
 	 *
 	 * @param offsetsToCommit The offset to commit, per partition.
 	 */
-	void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
+	void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception;
 
 	/**
 	 * Positions the given fetcher to the initial read offsets where the stream consumption
@@ -47,7 +45,7 @@ public interface OffsetHandler {
 	 * @param partitions The partitions for which to seeks the fetcher to the beginning.
 	 * @param fetcher The fetcher that will pull data from Kafka and must be positioned.
 	 */
-	void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
+	void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions, Fetcher fetcher) throws Exception;
 
 	/**
 	 * Closes the offset handler, releasing all resources.

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index 42a5951..f9b8448 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -25,7 +25,6 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.zookeeper.data.Stat;
 
 import org.slf4j.Logger;
@@ -71,28 +70,28 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
 
 
 	@Override
-	public void commit(Map<TopicPartition, Long> offsetsToCommit) {
-		for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
-			TopicPartition tp = entry.getKey();
+	public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) {
+		for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToCommit.entrySet()) {
+			KafkaTopicPartition tp = entry.getKey();
 			long offset = entry.getValue();
 			
 			if (offset >= 0) {
-				setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
+				setOffsetInZooKeeper(zkClient, groupId, tp.getTopic(), tp.getPartition(), offset);
 			}
 		}
 	}
 
 	@Override
-	public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
-		for (TopicPartition tp : partitions) {
-			long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
+	public void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions, Fetcher fetcher) {
+		for (KafkaTopicPartitionLeader tp : partitions) {
+			long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition());
 
 			if (offset != OFFSET_NOT_SET) {
 				LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
-						tp.partition(), offset);
+						tp.getTopicPartition().getPartition(), offset);
 
 				// the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
-				fetcher.seek(tp, offset + 1);
+				fetcher.seek(tp.getTopicPartition(), offset + 1);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
index 7ab7290..61735e9 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -59,12 +59,12 @@ public class FixedPartitioner extends KafkaPartitioner implements Serializable {
 	@Override
 	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
 		int p = 0;
-		for(int i = 0; i < parallelInstances; i++) {
-			if(i == parallelInstanceId) {
+		for (int i = 0; i < parallelInstances; i++) {
+			if (i == parallelInstanceId) {
 				targetPartition = partitions[p];
 				return;
 			}
-			if(++p == partitions.length) {
+			if (++p == partitions.length) {
 				p = 0;
 			}
 		}
@@ -72,7 +72,7 @@ public class FixedPartitioner extends KafkaPartitioner implements Serializable {
 
 	@Override
 	public int partition(Object element, int numPartitions) {
-		if(targetPartition == -1) {
+		if (targetPartition == -1) {
 			throw new RuntimeException("The partitioner has not been initialized properly");
 		}
 		return targetPartition;


Mime
View raw message