flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject flink git commit: [FLINK-4723] [kafka] Unify committed offsets to Kafka to be next record to process
Date Tue, 18 Oct 2016 03:45:50 GMT
Repository: flink
Updated Branches:
  refs/heads/master 069de27df -> f46ca3918


[FLINK-4723] [kafka] Unify committed offsets to Kafka to be next record to process

This closes #2580


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f46ca391
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f46ca391
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f46ca391

Branch: refs/heads/master
Commit: f46ca39188dce1764ee6615eb6697588fdc04a2a
Parents: 069de27
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Sun Oct 2 16:54:57 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Oct 18 11:44:30 2016 +0800

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010FetcherTest.java   |   8 +-
 .../connectors/kafka/Kafka010ITCase.java        |  18 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |  33 +-
 .../kafka/internals/Kafka08Fetcher.java         |  15 +-
 .../internals/PeriodicOffsetCommitter.java      |   6 +-
 .../kafka/internals/ZookeeperOffsetHandler.java |  19 +-
 .../connectors/kafka/Kafka08ITCase.java         | 178 ++--------
 .../kafka/KafkaTestEnvironmentImpl.java         |  31 ++
 .../kafka/internal/Kafka09Fetcher.java          |   2 +-
 .../connectors/kafka/Kafka09FetcherTest.java    |  10 +-
 .../connectors/kafka/Kafka09ITCase.java         |  18 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |  32 +-
 .../kafka/FlinkKafkaConsumerBase.java           |  35 +-
 .../kafka/internals/AbstractFetcher.java        |   9 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  56 ++--
 .../connectors/kafka/KafkaConsumerTestBase.java | 322 ++++++++++++++++++-
 .../connectors/kafka/KafkaTestEnvironment.java  |  10 +
 .../AbstractFetcherTimestampsTest.java          |   2 +-
 18 files changed, 559 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 8f0b170..76e3950 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -143,7 +143,7 @@ public class Kafka010FetcherTest {
             @Override
             public void run() {
                 try {
-                    fetcher.commitSpecificOffsetsToKafka(testCommitData);
+                    fetcher.commitInternalOffsetsToKafka(testCommitData);
                 } catch (Throwable t) {
                     commitError.set(t);
                 }
@@ -255,7 +255,7 @@ public class Kafka010FetcherTest {
 
         // ----- trigger the first offset commit -----
 
-        fetcher.commitSpecificOffsetsToKafka(testCommitData1);
+        fetcher.commitInternalOffsetsToKafka(testCommitData1);
         Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
         for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -272,7 +272,7 @@ public class Kafka010FetcherTest {
 
         // ----- trigger the second offset commit -----
 
-        fetcher.commitSpecificOffsetsToKafka(testCommitData2);
+        fetcher.commitInternalOffsetsToKafka(testCommitData2);
         Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
         for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
@@ -297,4 +297,4 @@ public class Kafka010FetcherTest {
             throw new Exception("Exception in the fetcher", caughtError);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 28bf6d5..77407ff 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -131,6 +131,24 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		runEndOfStreamTest();
 	}
 
+	// --- offset committing ---
+
+	@Test(timeout = 60000)
+	public void testCommitOffsetsToKafka() throws Exception {
+		runCommitOffsetsToKafka();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromKafkaCommitOffsets() throws Exception {
+		runStartFromKafkaCommitOffsets();
+	}
+
+	// TODO: This test will not pass until FLINK-4727 is resolved
+//	@Test(timeout = 60000)
+//	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+//		runAutoOffsetRetrievalAndCommitToKafka();
+//	}
+
 	/**
 	 * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 78fc1c6..7d12cde 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import kafka.admin.AdminUtils;
 import kafka.common.KafkaException;
-import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
@@ -35,6 +34,9 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.slf4j.Logger;
@@ -118,6 +120,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public KafkaOffsetHandler createOffsetHandler(Properties props) {
+		return new KafkaOffsetHandlerImpl(props);
+	}
+
+	@Override
 	public void restartBroker(int leaderId) throws Exception {
 		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
 	}
@@ -213,11 +220,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
 		standardProps.setProperty("group.id", "flink-tests");
-		standardProps.setProperty("auto.commit.enable", "false");
+		standardProps.setProperty("enable.auto.commit", "false");
 		standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
 		standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
 		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value)
-		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+		standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
 	}
 
 	@Override
@@ -381,4 +388,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
 	}
 
+	private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+		private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+		public KafkaOffsetHandlerImpl(Properties props) {
+			offsetClient = new KafkaConsumer<>(props);
+		}
+
+		@Override
+		public Long getCommittedOffset(String topicName, int partition) {
+			OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
+			return (committed != null) ? committed.offset() : null;
+		}
+
+		@Override
+		public void close() {
+			offsetClient.close();
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index aee3acc..5861058 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -140,11 +140,13 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 					}
 				}
 
-				Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getOffsets(partitionsWithNoOffset);
+				Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
 				for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
-					Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
-					if (offset != null) {
-						partition.setOffset(offset);
+					Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition());
+					if (zkOffset != null) {
+						// the offset in ZK represents the "next record to process", so we need to subtract it by 1
+						// to correctly represent our internally checkpointed offsets
+						partition.setOffset(zkOffset - 1);
 					}
 				}
 			}
@@ -324,10 +326,11 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
 		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
 		if (zkHandler != null) {
-			zkHandler.writeOffsets(offsets);
+			// the ZK handler takes care of incrementing the offsets by 1 before committing
+			zkHandler.prepareAndCommitOffsets(offsets);
 		}
 
 		// Set committed offsets in topic partition state

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
index 6aaeca9..27d90f2 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
@@ -62,12 +62,12 @@ public class PeriodicOffsetCommitter extends Thread {
 				Thread.sleep(commitInterval);
 
 				// create copy a deep copy of the current offsets
-				HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>(partitionStates.length);
+				HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length);
 				for (KafkaTopicPartitionState<?> partitionState : partitionStates) {
-					currentOffsets.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
+					offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset());
 				}
 				
-				offsetHandler.writeOffsets(currentOffsets);
+				offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
 			}
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index a1a81ed..8f2ef09 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -77,29 +77,30 @@ public class ZookeeperOffsetHandler {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Writes given set of offsets for Kafka partitions to ZooKeeper.
+	 * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of
+	 * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so
+	 * that the committed offsets to Zookeeper represent the next record to process.
 	 * 
-	 * @param offsetsToWrite The offsets for the partitions to write.
+	 * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit.
 	 * @throws Exception The method forwards exceptions.
 	 */
-	public void writeOffsets(Map<KafkaTopicPartition, Long> offsetsToWrite) throws Exception {
-		for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToWrite.entrySet()) {
+	public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception {
+		for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) {
 			KafkaTopicPartition tp = entry.getKey();
-			long offset = entry.getValue();
 
-			if (offset >= 0) {
-				setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset);
+			Long lastProcessedOffset = entry.getValue();
+			if (lastProcessedOffset != null && lastProcessedOffset >= 0) {
+				setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
 			}
 		}
 	}
 
 	/**
-	 * 
 	 * @param partitions The partitions to read offsets for.
 	 * @return The mapping from partition to offset.
 	 * @throws Exception This method forwards exceptions.
 	 */
-	public Map<KafkaTopicPartition, Long> getOffsets(List<KafkaTopicPartition> partitions) throws Exception {
+	public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
 		Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size());
 		for (KafkaTopicPartition tp : partitions) {
 			Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition());

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 1c69d78..fabb0fe 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -135,31 +135,21 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		runBrokerFailureTest();
 	}
 
-	// --- special executions ---
+	// --- offset committing ---
 
 	@Test(timeout = 60000)
-	public void testBigRecordJob() throws Exception {
-		runBigRecordTestTopology();
+	public void testCommitOffsetsToZookeeper() throws Exception {
+		runCommitOffsetsToKafka();
 	}
 
 	@Test(timeout = 60000)
-	public void testMultipleTopics() throws Exception {
-		runProduceConsumeMultipleTopics();
-	}
-
-	@Test(timeout = 60000)
-	public void testAllDeletes() throws Exception {
-		runAllDeletesTest();
-	}
-
-	@Test(timeout=60000)
-	public void testEndOfStream() throws Exception {
-		runEndOfStreamTest();
+	public void testStartFromZookeeperCommitOffsets() throws Exception {
+		runStartFromKafkaCommitOffsets();
 	}
 
 	@Test(timeout = 60000)
-	public void testMetrics() throws Throwable {
-		runMetricsTest();
+	public void testAutoOffsetRetrievalAndCommitToZookeeper() throws Exception {
+		runAutoOffsetRetrievalAndCommitToKafka();
 	}
 
 	@Test
@@ -186,59 +176,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
-	/**
-	 * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
-	 *
-	 * This test is only applicable if the Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
-	 */
-	@Test(timeout = 60000)
-	public void testOffsetInZookeeper() throws Exception {
-		final int parallelism = 3;
-
-		// write a sequence from 0 to 99 to each of the 3 partitions.
-		final String topicName = writeSequence("testOffsetInZK", 100, parallelism, 1);
-
-		StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env1.getConfig().disableSysoutLogging();
-		env1.enableCheckpointing(50);
-		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env1.setParallelism(parallelism);
-
-		StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env2.getConfig().disableSysoutLogging();
-		env2.enableCheckpointing(50);
-		env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env2.setParallelism(parallelism);
-
-		readSequence(env1, standardProps, parallelism, topicName, 100, 0);
-
-		CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-
-		Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0);
-		Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1);
-		Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2);
-
-		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
-
-		assertTrue(o1 == null || (o1 >= 0 && o1 <= 100));
-		assertTrue(o2 == null || (o2 >= 0 && o2 <= 100));
-		assertTrue(o3 == null || (o3 >= 0 && o3 <= 100));
-
-		LOG.info("Manipulating offsets");
-
-		// set the offset to 50 for the three partitions
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0, 49);
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1, 49);
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2, 49);
-
-		curatorClient.close();
-
-		// create new env
-		readSequence(env2, standardProps, parallelism, topicName, 50, 50);
-
-		deleteTestTopic(topicName);
-	}
 
 	@Test(timeout = 60000)
 	public void testOffsetAutocommitTest() throws Exception {
@@ -275,94 +212,37 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 		// ensure that the offset has been committed
 		boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 100) ||
-				(o2 != null && o2 > 0 && o2 <= 100) ||
-				(o3 != null && o3 > 0 && o3 <= 100);
+			(o2 != null && o2 > 0 && o2 <= 100) ||
+			(o3 != null && o3 > 0 && o3 <= 100);
 		assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet);
 
 		deleteTestTopic(topicName);
 	}
 
-	/**
-	 * This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset
-	 * is committed to Zookeeper, even if some partitions are not read
-	 *
-	 * Test:
-	 * - Create 3 topics
-	 * - write 50 messages into each.
-	 * - Start three consumers with auto.offset.reset='latest' and wait until they committed into ZK.
-	 * - Check if the offsets in ZK are set to 50 for the three partitions
-	 *
-	 * See FLINK-3440 as well
-	 */
-	@Test(timeout = 60000)
-	public void testKafkaOffsetRetrievalToZookeeper() throws Exception {
-		final int parallelism = 3;
+	// --- special executions ---
 
-		// write a sequence from 0 to 49 to each of the 3 partitions.
-		final String topicName =  writeSequence("testKafkaOffsetToZk", 50, parallelism, 1);
+	@Test(timeout = 60000)
+	public void testBigRecordJob() throws Exception {
+		runBigRecordTestTopology();
+	}
 
-		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env2.getConfig().disableSysoutLogging();
-		env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env2.setParallelism(parallelism);
-		env2.enableCheckpointing(200);
+	@Test(timeout = 60000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
 
-		Properties readProps = new Properties();
-		readProps.putAll(standardProps);
-		readProps.setProperty("auto.offset.reset", "latest");
-
-		DataStream<String> stream = env2.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps));
-		stream.addSink(new DiscardingSink<String>());
-
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-		final Thread runner = new Thread("runner") {
-			@Override
-			public void run() {
-				try {
-					env2.execute();
-				}
-				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
-						errorRef.set(t);
-					}
-				}
-			}
-		};
-		runner.start();
-
-		final CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-		final Long l49 = 49L;
-				
-		final long deadline = 30000 + System.currentTimeMillis();
-		do {
-			Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
-			Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
-			Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
-
-			if (l49.equals(o1) && l49.equals(o2) && l49.equals(o3)) {
-				break;
-			}
-			
-			Thread.sleep(100);
-		}
-		while (System.currentTimeMillis() < deadline);
-		
-		// cancel the job
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-		
-		final Throwable t = errorRef.get();
-		if (t != null) {
-			throw new RuntimeException("Job failed with an exception", t);
-		}
+	@Test(timeout = 60000)
+	public void testAllDeletes() throws Exception {
+		runAllDeletesTest();
+	}
 
-		// check if offsets are correctly in ZK
-		Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
-		Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
-		Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
-		Assert.assertEquals(Long.valueOf(49L), o1);
-		Assert.assertEquals(Long.valueOf(49L), o2);
-		Assert.assertEquals(Long.valueOf(49L), o3);
+	@Test(timeout=60000)
+	public void testEndOfStream() throws Exception {
+		runEndOfStreamTest();
+	}
 
-		curatorFramework.close();
+	@Test(timeout = 60000)
+	public void testMetrics() throws Throwable {
+		runMetricsTest();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index a0d5002..567d22d 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -111,6 +112,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public KafkaOffsetHandler createOffsetHandler(Properties props) {
+		return new KafkaOffsetHandlerImpl(props);
+	}
+
+	@Override
 	public void restartBroker(int leaderId) throws Exception {
 		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
 	}
@@ -351,4 +357,29 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
 	}
 
+	private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+		private final CuratorFramework offsetClient;
+		private final String groupId;
+
+		public KafkaOffsetHandlerImpl(Properties props) {
+			offsetClient = createCuratorClient();
+			groupId = props.getProperty("group.id");
+		}
+
+		@Override
+		public Long getCommittedOffset(String topicName, int partition) {
+			try {
+				return ZookeeperOffsetHandler.getOffsetFromZooKeeper(offsetClient, groupId, topicName, partition);
+			} catch (Exception e) {
+				throw new RuntimeException("Exception when getting offsets from Zookeeper", e);
+			}
+		}
+
+		@Override
+		public void close() {
+			offsetClient.close();
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 37e40fc..3a3d3de 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -300,7 +300,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 	}
 
 	@Override
-	public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
 		KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
 		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 5a638b2..c5cf0cc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -143,7 +143,7 @@ public class Kafka09FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitSpecificOffsetsToKafka(testCommitData);
+					fetcher.commitInternalOffsetsToKafka(testCommitData);
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -259,7 +259,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitSpecificOffsetsToKafka(testCommitData1);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1);
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -270,13 +270,13 @@ public class Kafka09FetcherTest {
 			}
 			else if (partition.topic().equals("another")) {
 				assertEquals(99, partition.partition());
-				assertEquals(18L, entry.getValue().offset());
+				assertEquals(17L, entry.getValue().offset());
 			}
 		}
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitSpecificOffsetsToKafka(testCommitData2);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2);
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
@@ -287,7 +287,7 @@ public class Kafka09FetcherTest {
 			}
 			else if (partition.topic().equals("another")) {
 				assertEquals(99, partition.partition());
-				assertEquals(28L, entry.getValue().offset());
+				assertEquals(27L, entry.getValue().offset());
 			}
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index b9ec18a..3d347dc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -109,4 +109,22 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 	public void testMetrics() throws Throwable {
 		runMetricsTest();
 	}
+
+	// --- offset committing ---
+
+	@Test(timeout = 60000)
+	public void testCommitOffsetsToKafka() throws Exception {
+		runCommitOffsetsToKafka();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromKafkaCommitOffsets() throws Exception {
+		runStartFromKafkaCommitOffsets();
+	}
+
+	// TODO: This test will not pass until FLINK-4727 is resolved
+//	@Test(timeout = 60000)
+//	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+//		runAutoOffsetRetrievalAndCommitToKafka();
+//	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9d8fa9a..223dacb 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,6 +36,9 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,6 +106,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public KafkaOffsetHandler createOffsetHandler(Properties props) {
+		return new KafkaOffsetHandlerImpl(props);
+	}
+
+	@Override
 	public void restartBroker(int leaderId) throws Exception {
 		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
 	}
@@ -203,11 +211,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
 		standardProps.setProperty("group.id", "flink-tests");
-		standardProps.setProperty("auto.commit.enable", "false");
+		standardProps.setProperty("enable.auto.commit", "false");
 		standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout);
 		standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
 		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value)
-		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+		standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
 
 	}
 
@@ -396,4 +404,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		return prop;
 	}
 
+	private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+		private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+		public KafkaOffsetHandlerImpl(Properties props) {
+			offsetClient = new KafkaConsumer<>(props);
+		}
+
+		@Override
+		public Long getCommittedOffset(String topicName, int partition) {
+			OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
+			return (committed != null) ? committed.offset() : null;
+		}
+
+		@Override
+		public void close() {
+			offsetClient.close();
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 05028e6..7d6bd76 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -103,8 +103,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	//  runtime state (used individually by each parallel subtask) 
 	// ------------------------------------------------------------------------
 	
-	/** Data for pending but uncommitted checkpoints */
-	private final LinkedMap pendingCheckpoints = new LinkedMap();
+	/** Data for pending but uncommitted offsets */
+	private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
 
 	/** The fetcher implements the connections to the Kafka brokers */
 	private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
@@ -347,12 +347,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				if (restoreToOffset != null) {
 					// the map cannot be asynchronously updated, because only one checkpoint call can happen
 					// on this function at a time: either snapshotState() or notifyCheckpointComplete()
-					pendingCheckpoints.put(checkpointId, restoreToOffset);
-
-					// truncate the map, to prevent infinite growth
-					while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
-						pendingCheckpoints.remove(0);
-					}
+					pendingOffsetsToCommit.put(checkpointId, restoreToOffset);
 
 					for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
 						listState.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
@@ -367,17 +362,17 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 				// the map cannot be asynchronously updated, because only one checkpoint call can happen
 				// on this function at a time: either snapshotState() or notifyCheckpointComplete()
-				pendingCheckpoints.put(checkpointId, currentOffsets);
-
-				// truncate the map, to prevent infinite growth
-				while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
-					pendingCheckpoints.remove(0);
-				}
+				pendingOffsetsToCommit.put(checkpointId, currentOffsets);
 
 				for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
 					listState.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
 				}
 			}
+
+			// truncate the map of pending offsets to commit, to prevent infinite growth
+			while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+				pendingOffsetsToCommit.remove(0);
+			}
 		}
 	}
 
@@ -400,26 +395,26 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		}
 
 		try {
-			final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+			final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
 			if (posInMap == -1) {
 				LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
 				return;
 			}
 
 			@SuppressWarnings("unchecked")
-			HashMap<KafkaTopicPartition, Long> checkpointOffsets = 
-					(HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
+			HashMap<KafkaTopicPartition, Long> offsets =
+					(HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
 
 			// remove older checkpoints in map
 			for (int i = 0; i < posInMap; i++) {
-				pendingCheckpoints.remove(0);
+				pendingOffsetsToCommit.remove(0);
 			}
 
-			if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
+			if (offsets == null || offsets.size() == 0) {
 				LOG.debug("Checkpoint state was empty.");
 				return;
 			}
-			fetcher.commitSpecificOffsetsToKafka(checkpointOffsets);
+			fetcher.commitInternalOffsetsToKafka(offsets);
 		}
 		catch (Exception e) {
 			if (running) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 7ee3079..eb01b78 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -159,12 +159,15 @@ public abstract class AbstractFetcher<T, KPH> {
 
 	/**
 	 * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
-	 * older Kafka versions).
+	 * older Kafka versions). The given offsets are the internal checkpointed offsets, representing
+	 * the last processed record of each partition. Version-specific implementations of this method
+	 * need to hold the contract that the given offsets must be incremented by 1 before
+	 * committing them, so that committed offsets to Kafka represent "the next record to process".
 	 * 
-	 * @param offsets The offsets to commit to Kafka.
+	 * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
 	 * @throws Exception This method forwards exceptions.
 	 */
-	public abstract void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
+	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
 	
 	// ------------------------------------------------------------------------
 	//  snapshot and restore the state

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 6d2dc70..97220c2 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -161,9 +161,17 @@ public class FlinkKafkaConsumerBaseTest {
 		assertFalse(listState.get().iterator().hasNext());
 	}
 
+	/**
+	 * Tests that on snapshots, states and offsets to commit to Kafka are correct
+	 */
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testSnapshotState() throws Exception {
+
+		// --------------------------------------------------------------------
+		//   prepare fake states
+		// --------------------------------------------------------------------
+
 		final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
 		state1.put(new KafkaTopicPartition("abc", 13), 16768L);
 		state1.put(new KafkaTopicPartition("def", 7), 987654321L);
@@ -176,13 +184,15 @@ public class FlinkKafkaConsumerBaseTest {
 		state3.put(new KafkaTopicPartition("abc", 13), 16780L);
 		state3.put(new KafkaTopicPartition("def", 7), 987654377L);
 
+		// --------------------------------------------------------------------
+		
 		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
 		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
-
-		final LinkedMap pendingCheckpoints = new LinkedMap();
-
-		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingCheckpoints, true);
-		assertEquals(0, pendingCheckpoints.size());
+			
+		final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+	
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
+		assertEquals(0, pendingOffsetsToCommit.size());
 
 		OperatorStateStore backend = mock(OperatorStateStore.class);
 
@@ -207,8 +217,8 @@ public class FlinkKafkaConsumerBaseTest {
 		}
 
 		assertEquals(state1, snapshot1);
-		assertEquals(1, pendingCheckpoints.size());
-		assertEquals(state1, pendingCheckpoints.get(138L));
+		assertEquals(1, pendingOffsetsToCommit.size());
+		assertEquals(state1, pendingOffsetsToCommit.get(138L));
 
 		// checkpoint 2
 		consumer.prepareSnapshot(140L, 140L);
@@ -221,13 +231,13 @@ public class FlinkKafkaConsumerBaseTest {
 		}
 
 		assertEquals(state2, snapshot2);
-		assertEquals(2, pendingCheckpoints.size());
-		assertEquals(state2, pendingCheckpoints.get(140L));
-
+		assertEquals(2, pendingOffsetsToCommit.size());
+		assertEquals(state2, pendingOffsetsToCommit.get(140L));
+		
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
-		assertEquals(1, pendingCheckpoints.size());
-		assertTrue(pendingCheckpoints.containsKey(140L));
+		assertEquals(1, pendingOffsetsToCommit.size());
+		assertTrue(pendingOffsetsToCommit.containsKey(140L));
 
 		// checkpoint 3
 		consumer.prepareSnapshot(141L, 141L);
@@ -240,16 +250,16 @@ public class FlinkKafkaConsumerBaseTest {
 		}
 
 		assertEquals(state3, snapshot3);
-		assertEquals(2, pendingCheckpoints.size());
-		assertEquals(state3, pendingCheckpoints.get(141L));
-
+		assertEquals(2, pendingOffsetsToCommit.size());
+		assertEquals(state3, pendingOffsetsToCommit.get(141L));
+		
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
-		assertEquals(0, pendingCheckpoints.size());
+		assertEquals(0, pendingOffsetsToCommit.size());
 
 
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
-		assertEquals(0, pendingCheckpoints.size());
+		assertEquals(0, pendingOffsetsToCommit.size());
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
@@ -260,24 +270,24 @@ public class FlinkKafkaConsumerBaseTest {
 			consumer.prepareSnapshot(i, i);
 			listState.clear();
 		}
-		assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
+		assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingOffsetsToCommit.size());
 
 		// commit only the second last
 		consumer.notifyCheckpointComplete(598);
-		assertEquals(1, pendingCheckpoints.size());
+		assertEquals(1, pendingOffsetsToCommit.size());
 
 		// access invalid checkpoint
 		consumer.notifyCheckpointComplete(590);
 
 		// and the last
 		consumer.notifyCheckpointComplete(599);
-		assertEquals(0, pendingCheckpoints.size());
+		assertEquals(0, pendingOffsetsToCommit.size());
 	}
 
 	// ------------------------------------------------------------------------
 
 	private static <T> FlinkKafkaConsumerBase<T> getConsumer(
-			AbstractFetcher<T, ?> fetcher, LinkedMap pendingCheckpoints, boolean running) throws Exception
+			AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception
 	{
 		FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
 
@@ -285,9 +295,9 @@ public class FlinkKafkaConsumerBaseTest {
 		fetcherField.setAccessible(true);
 		fetcherField.set(consumer, fetcher);
 
-		Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
+		Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit");
 		mapField.setAccessible(true);
-		mapField.set(consumer, pendingCheckpoints);
+		mapField.set(consumer, pendingOffsetsToCommit);
 
 		Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running");
 		runningField.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 0810a3e..7b06cfd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -29,6 +29,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -97,7 +98,6 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.Date;
@@ -201,6 +201,249 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 		}
 	}
+
+	/**
+	 * Ensures that the committed offsets to Kafka are the offsets of "the next record to process"
+	 */
+	public void runCommitOffsetsToKafka() throws Exception {
+		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
+		final int parallelism = 3;
+		final int recordsInEachPartition = 50;
+
+		final String topicName = writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, parallelism, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.getConfig().disableSysoutLogging();
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(200);
+
+		DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps));
+		stream.addSink(new DiscardingSink<String>());
+
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+		final Thread runner = new Thread("runner") {
+			@Override
+			public void run() {
+				try {
+					env.execute();
+				}
+				catch (Throwable t) {
+					if (!(t.getCause() instanceof JobCancellationException)) {
+						errorRef.set(t);
+					}
+				}
+			}
+		};
+		runner.start();
+
+		final Long l50 = 50L; // the final committed offset in Kafka should be 50
+		final long deadline = 30000 + System.currentTimeMillis();
+
+		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
+
+		do {
+			Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+			Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+			Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+
+			if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) {
+				break;
+			}
+
+			Thread.sleep(100);
+		}
+		while (System.currentTimeMillis() < deadline);
+
+		// cancel the job
+		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+		final Throwable t = errorRef.get();
+		if (t != null) {
+			throw new RuntimeException("Job failed with an exception", t);
+		}
+
+		// final check to see if offsets are correctly in Kafka
+		Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+		Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+		Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+		Assert.assertEquals(Long.valueOf(50L), o1);
+		Assert.assertEquals(Long.valueOf(50L), o2);
+		Assert.assertEquals(Long.valueOf(50L), o3);
+
+		kafkaOffsetHandler.close();
+		deleteTestTopic(topicName);
+	}
+
+	/**
+	 * This test first writes a total of 300 records to a test topic, reads the first 150 so that some offsets are
+	 * committed to Kafka, and then startup the consumer again to read the remaining records starting from the committed offsets.
+	 * The test ensures that whatever offsets were committed to Kafka, the consumer correctly picks them up
+	 * and starts at the correct position.
+	 */
+	public void runStartFromKafkaCommitOffsets() throws Exception {
+		final int parallelism = 3;
+		final int recordsInEachPartition = 300;
+
+		final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1);
+
+		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
+
+		Long o1;
+		Long o2;
+		Long o3;
+		int attempt = 0;
+		// make sure that o1, o2, o3 are not all null before proceeding
+		do {
+			attempt++;
+			LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka");
+
+			final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env.getConfig().disableSysoutLogging();
+			env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+			env.setParallelism(parallelism);
+			env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
+
+			env
+				.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
+				.map(new ThrottledMapper<String>(50))
+				.map(new MapFunction<String, Object>() {
+					int count = 0;
+					@Override
+					public Object map(String value) throws Exception {
+						count++;
+						if (count == 150) {
+							throw new SuccessException();
+						}
+						return null;
+					}
+				})
+				.addSink(new DiscardingSink<>());
+
+			tryExecute(env, "Read some records to commit offsets to Kafka");
+
+			o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+			o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+			o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+		} while (o1 == null && o2 == null && o3 == null && attempt < 3);
+
+		if (o1 == null && o2 == null && o3 == null) {
+			throw new RuntimeException("No offsets have been committed after 3 attempts");
+		}
+
+		LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", o1, o2, o3);
+
+		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env2.getConfig().disableSysoutLogging();
+		env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env2.setParallelism(parallelism);
+
+		// whatever offsets were committed for each partition, the consumer should pick
+		// them up and start from the correct position so that the remaining records are all read
+		HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>();
+		partitionsToValuesCountAndStartOffset.put(0, new Tuple2<>(
+			(o1 != null) ? (int) (recordsInEachPartition - o1) : recordsInEachPartition,
+			(o1 != null) ? o1.intValue() : 0
+		));
+		partitionsToValuesCountAndStartOffset.put(1, new Tuple2<>(
+			(o2 != null) ? (int) (recordsInEachPartition - o2) : recordsInEachPartition,
+			(o2 != null) ? o2.intValue() : 0
+		));
+		partitionsToValuesCountAndStartOffset.put(2, new Tuple2<>(
+			(o3 != null) ? (int) (recordsInEachPartition - o3) : recordsInEachPartition,
+			(o3 != null) ? o3.intValue() : 0
+		));
+
+		readSequence(env2, standardProps, topicName, partitionsToValuesCountAndStartOffset);
+
+		kafkaOffsetHandler.close();
+		deleteTestTopic(topicName);
+	}
+
+	/**
+	 * This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset
+	 * is committed to Kafka, even if some partitions are not read.
+	 *
+	 * Test:
+	 * - Create 3 partitions
+	 * - write 50 messages into each.
+	 * - Start three consumers with auto.offset.reset='latest' and wait until they committed into Kafka.
+	 * - Check if the offsets in Kafka are set to 50 for the three partitions
+	 *
+	 * See FLINK-3440 as well
+	 */
+	public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
+		final int parallelism = 3;
+		final int recordsInEachPartition = 50;
+
+		final String topicName = writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", recordsInEachPartition, parallelism, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.getConfig().disableSysoutLogging();
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(200);
+
+		Properties readProps = new Properties();
+		readProps.putAll(standardProps);
+		readProps.setProperty("auto.offset.reset", "latest"); // set to reset to latest, so that partitions are initially not read
+
+		DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps));
+		stream.addSink(new DiscardingSink<String>());
+
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+		final Thread runner = new Thread("runner") {
+			@Override
+			public void run() {
+				try {
+					env.execute();
+				}
+				catch (Throwable t) {
+					if (!(t.getCause() instanceof JobCancellationException)) {
+						errorRef.set(t);
+					}
+				}
+			}
+		};
+		runner.start();
+
+		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
+
+		final Long l50 = 50L; // the final committed offset in Kafka should be 50
+		final long deadline = 30000 + System.currentTimeMillis();
+		do {
+			Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+			Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+			Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+
+			if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) {
+				break;
+			}
+
+			Thread.sleep(100);
+		}
+		while (System.currentTimeMillis() < deadline);
+
+		// cancel the job
+		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+		final Throwable t = errorRef.get();
+		if (t != null) {
+			throw new RuntimeException("Job failed with an exception", t);
+		}
+
+		// final check to see if offsets are correctly in Kafka
+		Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+		Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+		Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+		Assert.assertEquals(Long.valueOf(50L), o1);
+		Assert.assertEquals(Long.valueOf(50L), o2);
+		Assert.assertEquals(Long.valueOf(50L), o3);
+
+		kafkaOffsetHandler.close();
+		deleteTestTopic(topicName);
+	}
 	
 	/**
 	 * Ensure Kafka is working on both producer and consumer side.
@@ -1346,48 +1589,80 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	//  Reading writing test data sets
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Runs a job using the provided environment to read a sequence of records from a single Kafka topic.
+	 * The method allows to individually specify the expected starting offset and total read value count of each partition.
+	 * The job will be considered successful only if all partition read results match the start offset and value count criteria.
+	 */
 	protected void readSequence(StreamExecutionEnvironment env, Properties cc,
-								final int sourceParallelism,
 								final String topicName,
-								final int valuesCount, final int startFrom) throws Exception {
+								final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception {
+		final int sourceParallelism = partitionsToValuesCountAndStartOffset.keySet().size();
 
-		final int finalCount = valuesCount * sourceParallelism;
+		int finalCountTmp = 0;
+		for (Map.Entry<Integer, Tuple2<Integer, Integer>> valuesCountAndStartOffset : partitionsToValuesCountAndStartOffset.entrySet()) {
+			finalCountTmp += valuesCountAndStartOffset.getValue().f0;
+		}
+		final int finalCount = finalCountTmp;
 
 		final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
 
 		final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
-				new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
+			new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
 
 		// create the consumer
 		cc.putAll(secureProps);
 		FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc);
 
 		DataStream<Tuple2<Integer, Integer>> source = env
-				.addSource(consumer).setParallelism(sourceParallelism)
-				.map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
+			.addSource(consumer).setParallelism(sourceParallelism)
+			.map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
 
 		// verify data
 		source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
 
-			private int[] values = new int[valuesCount];
+			private HashMap<Integer, BitSet> partitionsToValueCheck;
 			private int count = 0;
 
 			@Override
+			public void open(Configuration parameters) throws Exception {
+				partitionsToValueCheck = new HashMap<>();
+				for (Integer partition : partitionsToValuesCountAndStartOffset.keySet()) {
+					partitionsToValueCheck.put(partition, new BitSet());
+				}
+			}
+
+			@Override
 			public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
-				values[value.f1 - startFrom]++;
+				int partition = value.f0;
+				int val = value.f1;
+
+				BitSet bitSet = partitionsToValueCheck.get(partition);
+				if (bitSet == null) {
+					throw new RuntimeException("Got a record from an unknown partition");
+				} else {
+					bitSet.set(val - partitionsToValuesCountAndStartOffset.get(partition).f1);
+				}
+
 				count++;
+
 				LOG.info("Received message {}, total {} messages", value, count);
 
 				// verify if we've seen everything
 				if (count == finalCount) {
-					for (int i = 0; i < values.length; i++) {
-						int v = values[i];
-						if (v != sourceParallelism) {
-							printTopic(topicName, valuesCount, deser);
-							throw new RuntimeException("Expected v to be " + sourceParallelism + 
-									", but was " + v + " on element " + i + " array=" + Arrays.toString(values));
+					for (Map.Entry<Integer, BitSet> partitionsToValueCheck : this.partitionsToValueCheck.entrySet()) {
+						BitSet check = partitionsToValueCheck.getValue();
+						int expectedValueCount = partitionsToValuesCountAndStartOffset.get(partitionsToValueCheck.getKey()).f0;
+
+						if (check.cardinality() != expectedValueCount) {
+							throw new RuntimeException("Expected cardinality to be " + expectedValueCount +
+								", but was " + check.cardinality());
+						} else if (check.nextClearBit(0) != expectedValueCount) {
+							throw new RuntimeException("Expected next clear bit to be " + expectedValueCount +
+								", but was " + check.cardinality());
 						}
 					}
+
 					// test has passed
 					throw new SuccessException();
 				}
@@ -1400,6 +1675,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		LOG.info("Successfully read sequence for verification");
 	}
 
+	/**
+	 * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, Properties, String, Map)} to
+	 * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic.
+	 */
+	protected void readSequence(StreamExecutionEnvironment env, Properties cc,
+								final int sourceParallelism,
+								final String topicName,
+								final int valuesCount, final int startFrom) throws Exception {
+		HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>();
+		for (int i = 0; i < sourceParallelism; i++) {
+			partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom));
+		}
+		readSequence(env, cc, topicName, partitionsToValuesCountAndStartOffset);
+	}
+
 	protected String writeSequence(
 			String baseTopicName,
 			final int numElements,
@@ -1430,7 +1720,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			LOG.info("Writing attempt #1");
 			
 			// -------- Write the Sequence --------
-			
+
 			createTestTopic(topicName, parallelism, replicationFactor);
 
 			StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index ded1fde..806d342 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.Properties;
 
 /**
@@ -83,6 +85,14 @@ public abstract class KafkaTestEnvironment {
 														KeyedSerializationSchema<T> serSchema, Properties props,
 														KafkaPartitioner<T> partitioner);
 
+	// -- offset handlers
+
+	public interface KafkaOffsetHandler {
+		Long getCommittedOffset(String topicName, int partition);
+		void close();
+	}
+
+	public abstract KafkaOffsetHandler createOffsetHandler(Properties props);
 
 	// -- leader failure simulation
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f46ca391/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 582311f..7db6ba4 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -225,7 +225,7 @@ public class AbstractFetcherTimestampsTest {
 		}
 
 		@Override
-		public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
 			throw new UnsupportedOperationException();
 		}
 	}


Mime
View raw message