flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [08/10] flink git commit: [FLINK-2386] [kafka] Move Kafka connectors to 'org.apache.flink.streaming.connectors.kafka'
Date Wed, 26 Aug 2015 19:13:58 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
deleted file mode 100644
index 4a97624..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests that the partition assignment is deterministic and stable.
- */
-public class KafkaConsumerPartitionAssignmentTest {
-
-	@Test
-	public void testPartitionsEqualConsumers() {
-		try {
-			int[] partitions = {4, 52, 17, 1};
-			
-			for (int i = 0; i < partitions.length; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", partitions.length, i);
-				
-				assertNotNull(parts);
-				assertEquals(1, parts.size());
-				assertTrue(contains(partitions, parts.get(0).partition()));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMultiplePartitionsPerConsumers() {
-		try {
-			final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-
-			final Set<Integer> allPartitions = new HashSet<Integer>();
-			for (int i : partitions) {
-				allPartitions.add(i);
-			}
-			
-			final int numConsumers = 3;
-			final int minPartitionsPerConsumer = partitions.length / numConsumers;
-			final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
-			
-			for (int i = 0; i < numConsumers; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", numConsumers, i);
-
-				assertNotNull(parts);
-				assertTrue(parts.size() >= minPartitionsPerConsumer);
-				assertTrue(parts.size() <= maxPartitionsPerConsumer);
-
-				for (TopicPartition p : parts) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p.partition()));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPartitionsFewerThanConsumers() {
-		try {
-			final int[] partitions = {4, 52, 17, 1};
-
-			final Set<Integer> allPartitions = new HashSet<Integer>();
-			for (int i : partitions) {
-				allPartitions.add(i);
-			}
-
-			final int numConsumers = 2 * partitions.length + 3;
-			
-			for (int i = 0; i < numConsumers; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", numConsumers, i);
-
-				assertNotNull(parts);
-				assertTrue(parts.size() <= 1);
-				
-				for (TopicPartition p : parts) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p.partition()));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testAssignEmptyPartitions() {
-		try {
-			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2);
-			assertNotNull(parts1);
-			assertTrue(parts1.isEmpty());
-
-			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0);
-			assertNotNull(parts2);
-			assertTrue(parts2.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testGrowingPartitionsRemainsStable() {
-		try {
-			final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-			final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
-
-			final Set<Integer> allNewPartitions = new HashSet<Integer>();
-			final Set<Integer> allInitialPartitions = new HashSet<Integer>();
-			for (int i : newPartitions) {
-				allNewPartitions.add(i);
-			}
-			for (int i : initialPartitions) {
-				allInitialPartitions.add(i);
-			}
-
-			final int numConsumers = 3;
-			final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
-			final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1;
-			final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
-			final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
-			
-			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 0);
-			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 1);
-			List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 2);
-
-			assertNotNull(parts1);
-			assertNotNull(parts2);
-			assertNotNull(parts3);
-			
-			assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
-
-			for (TopicPartition p : parts1) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts2) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts3) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
-			}
-			
-			// all partitions must have been assigned
-			assertTrue(allInitialPartitions.isEmpty());
-			
-			// grow the set of partitions and distribute anew
-			
-			List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 0);
-			List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 1);
-			List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 2);
-
-			// new partitions must include all old partitions
-			
-			assertTrue(parts1new.size() > parts1.size());
-			assertTrue(parts2new.size() > parts2.size());
-			assertTrue(parts3new.size() > parts3.size());
-			
-			assertTrue(parts1new.containsAll(parts1));
-			assertTrue(parts2new.containsAll(parts2));
-			assertTrue(parts3new.containsAll(parts3));
-
-			assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
-
-			for (TopicPartition p : parts1new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts2new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts3new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allNewPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	private static boolean contains(int[] array, int value) {
-		for (int i : array) {
-			if (i == value) {
-				return true;
-			}
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
deleted file mode 100644
index 2e0fd54..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class KafkaConsumerTest {
-
-	@Test
-	public void testValidateZooKeeperConfig() {
-		try {
-			// empty
-			Properties emptyProperties = new Properties();
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(emptyProperties);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no connect string (only group string)
-			Properties noConnect = new Properties();
-			noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(noConnect);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no group string (only connect string)
-			Properties noGroup = new Properties();
-			noGroup.put("zookeeper.connect", "localhost:47574");
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(noGroup);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSnapshot() {
-		try {
-			Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
-			Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
-			Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-			
-			offsetsField.setAccessible(true);
-			runningField.setAccessible(true);
-			mapField.setAccessible(true);
-
-			FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
-			when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
-			
-			long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 };
-			LinkedMap map = new LinkedMap();
-			
-			offsetsField.set(consumer, testOffsets);
-			runningField.set(consumer, true);
-			mapField.set(consumer, map);
-			
-			assertTrue(map.isEmpty());
-
-			// make multiple checkpoints
-			for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
-				long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
-				assertArrayEquals(testOffsets, checkpoint);
-				
-				// change the offsets, make sure the snapshot did not change
-				long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length);
-				
-				for (int i = 0; i < testOffsets.length; i++) {
-					testOffsets[i] += 1L;
-				}
-				
-				assertArrayEquals(checkpointCopy, checkpoint);
-				
-				assertTrue(map.size() > 0);
-				assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	@Ignore("Kafka consumer internally makes an infinite loop")
-	public void testCreateSourceWithoutCluster() {
-		try {
-			Properties props = new Properties();
-			props.setProperty("zookeeper.connect", "localhost:56794");
-			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
-			props.setProperty("group.id", "non-existent-group");
-
-			new FlinkKafkaConsumer<String>("no op topic", new JavaDefaultStringSchema(), props,
-					FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
-					FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
deleted file mode 100644
index 2b92ebf..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
+++ /dev/null
@@ -1,1137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.connectors.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.testutils.DataGenerators;
-import org.apache.flink.streaming.connectors.testutils.DiscardingSink;
-import org.apache.flink.streaming.connectors.testutils.FailingIdentityMapper;
-import org.apache.flink.streaming.connectors.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.connectors.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.connectors.testutils.PartitionValidatingMapper;
-import org.apache.flink.streaming.connectors.testutils.SuccessException;
-import org.apache.flink.streaming.connectors.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.testutils.Tuple2Partitioner;
-import org.apache.flink.streaming.connectors.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-
-import scala.collection.Seq;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-@SuppressWarnings("serial")
-public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-
-
-	// ------------------------------------------------------------------------
-	//  Required methods by the abstract test base
-	// ------------------------------------------------------------------------
-
-	protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
-			String topic, DeserializationSchema<T> deserializationSchema, Properties props);
-
-	// ------------------------------------------------------------------------
-	//  Suite of Tests
-	//
-	//  The tests here are all not activated (by an @Test tag), but need
-	//  to be invoked from the extending classes. That way, the classes can
-	//  select which tests to run.
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Test that validates that checkpointing and checkpoint notification works properly
-	 */
-	public void runCheckpointingTest() {
-		try {
-			createTestTopic("testCheckpointing", 1, 1);
-
-			FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
-			Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-			pendingCheckpointsField.setAccessible(true);
-			LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
-
-			Assert.assertEquals(0, pendingCheckpoints.size());
-			source.setRuntimeContext(new MockRuntimeContext(1, 0));
-
-			final long[] initialOffsets = new long[] { 1337 };
-
-			// first restore
-			source.restoreState(initialOffsets);
-
-			// then open
-			source.open(new Configuration());
-			long[] state1 = source.snapshotState(1, 15);
-
-			assertArrayEquals(initialOffsets, state1);
-
-			long[] state2 = source.snapshotState(2, 30);
-			Assert.assertArrayEquals(initialOffsets, state2);
-			Assert.assertEquals(2, pendingCheckpoints.size());
-
-			source.commitCheckpoint(1);
-			Assert.assertEquals(1, pendingCheckpoints.size());
-
-			source.commitCheckpoint(2);
-			Assert.assertEquals(0, pendingCheckpoints.size());
-
-			source.commitCheckpoint(666); // invalid checkpoint
-			Assert.assertEquals(0, pendingCheckpoints.size());
-
-			// create 500 snapshots
-			for (int i = 100; i < 600; i++) {
-				source.snapshotState(i, 15 * i);
-			}
-			Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
-
-			// commit only the second last
-			source.commitCheckpoint(598);
-			Assert.assertEquals(1, pendingCheckpoints.size());
-
-			// access invalid checkpoint
-			source.commitCheckpoint(590);
-
-			// and the last
-			source.commitCheckpoint(599);
-			Assert.assertEquals(0, pendingCheckpoints.size());
-
-			source.close();
-
-			deleteTestTopic("testCheckpointing");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
-	 *
-	 * This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
-	 */
-	public void runOffsetInZookeeperValidationTest() {
-		try {
-			LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
-
-			final String topicName = "testOffsetHacking";
-			final int parallelism = 3;
-			
-			createTestTopic(topicName, parallelism, 1);
-
-			StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env1.getConfig().disableSysoutLogging();
-			env1.enableCheckpointing(50);
-			env1.setNumberOfExecutionRetries(0);
-			env1.setParallelism(parallelism);
-
-			StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env2.getConfig().disableSysoutLogging();
-			env2.enableCheckpointing(50);
-			env2.setNumberOfExecutionRetries(0);
-			env2.setParallelism(parallelism);
-
-			StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env3.getConfig().disableSysoutLogging();
-			env3.enableCheckpointing(50);
-			env3.setNumberOfExecutionRetries(0);
-			env3.setParallelism(parallelism);
-
-			// write a sequence from 0 to 99 to each of the 3 partitions.
-			writeSequence(env1, topicName, 100, parallelism);
-
-			readSequence(env2, standardProps, parallelism, topicName, 100, 0);
-
-			ZkClient zkClient = createZookeeperClient();
-			
-			long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
-			long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
-			long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
-
-			LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
-
-			assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-			assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-			assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-
-			LOG.info("Manipulating offsets");
-
-			// set the offset to 50 for the three partitions
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
-
-			zkClient.close();
-			
-			// create new env
-			readSequence(env3, standardProps, parallelism, topicName, 50, 50);
-
-			deleteTestTopic(topicName);
-			
-			LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Ensure Kafka is working on both producer and consumer side.
-	 * This executes a job that contains two Flink pipelines.
-	 *
-	 * <pre>
-	 * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
-	 * </pre>
-	 */
-	public void runSimpleConcurrentProducerConsumerTopology() {
-		try {
-			LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
-
-			final String topic = "concurrentProducerConsumerTopic";
-			final int parallelism = 3;
-			final int elementsPerPartition = 100;
-			final int totalElements = parallelism * elementsPerPartition;
-
-			createTestTopic(topic, parallelism, 2);
-
-			final StreamExecutionEnvironment env =
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(0);
-			env.getConfig().disableSysoutLogging();
-
-			TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
-					new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringType, env.getConfig());
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
-					new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringType, env.getConfig());
-
-			// ----------- add producer dataflow ----------
-
-			DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
-
-				private boolean running = true;
-
-				@Override
-				public void run(SourceContext<Tuple2<Long, String>> ctx) {
-					int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
-					int limit = cnt + elementsPerPartition;
-
-
-					while (running && cnt < limit) {
-						ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
-						cnt++;
-					}
-				}
-
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			});
-			stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, sinkSchema));
-
-			// ----------- add consumer dataflow ----------
-
-			FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
-
-			DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
-
-			consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
-
-				private int elCnt = 0;
-				private BitSet validator = new BitSet(totalElements);
-
-				@Override
-				public void invoke(Tuple2<Long, String> value) throws Exception {
-					String[] sp = value.f1.split("-");
-					int v = Integer.parseInt(sp[1]);
-
-					assertEquals(value.f0 - 1000, (long) v);
-
-					assertFalse("Received tuple twice", validator.get(v));
-					validator.set(v);
-					elCnt++;
-
-					if (elCnt == totalElements) {
-						// check if everything in the bitset is set to true
-						int nc;
-						if ((nc = validator.nextClearBit(0)) != totalElements) {
-							fail("The bitset was not set to 1 on all elements. Next clear:"
-									+ nc + " Set: " + validator);
-						}
-						throw new SuccessException();
-					}
-				}
-
-				@Override
-				public void close() throws Exception {
-					super.close();
-				}
-			}).setParallelism(1);
-
-			tryExecute(env, "runSimpleConcurrentProducerConsumerTopology");
-
-			LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
-
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
-	 * Flink sources.
-	 */
-	public void runOneToOneExactlyOnceTest() {
-		try {
-			LOG.info("Starting runOneToOneExactlyOnceTest()");
-
-			final String topic = "oneToOneTopic";
-			final int parallelism = 5;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = parallelism * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-			
-			createTestTopic(topic, parallelism, 1);
-			
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings, 
-					topic, parallelism, numElementsPerPartition, true);
-			
-			// run the topology that fails and recovers
-
-			DeserializationSchema<Integer> schema = 
-					new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-			
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.enableCheckpointing(500);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-			
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-			
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(parallelism, 1))
-					.map(new FailingIdentityMapper<Integer>(failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-			FailingIdentityMapper.failedBefore = false;
-			tryExecute(env, "One-to-one exactly once test");
-
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-			
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
-	 * one Flink source will read multiple Kafka partitions.
-	 */
-	public void runOneSourceMultiplePartitionsExactlyOnceTest() {
-		try {
-			LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
-
-			final String topic = "oneToManyTopic";
-			final int numPartitions = 5;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = numPartitions * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-			
-			final int parallelism = 2;
-
-			createTestTopic(topic, numPartitions, 1);
-
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings,
-					topic, numPartitions, numElementsPerPartition, true);
-
-			// run the topology that fails and recovers
-
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.enableCheckpointing(500);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(numPartitions, 3))
-					.map(new FailingIdentityMapper<Integer>(failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-			FailingIdentityMapper.failedBefore = false;
-			tryExecute(env, "One-source-multi-partitions exactly once test");
-
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-			
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
-	 * that some Flink sources will read no partitions.
-	 */
-	public void runMultipleSourcesOnePartitionExactlyOnceTest() {
-		try {
-			LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
-
-			final String topic = "manyToOneTopic";
-			final int numPartitions = 5;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = numPartitions * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-
-			final int parallelism = 8;
-
-			createTestTopic(topic, numPartitions, 1);
-
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings,
-					topic, numPartitions, numElementsPerPartition, true);
-
-			// run the topology that fails and recovers
-			
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.enableCheckpointing(500);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-			env.setBufferTimeout(0);
-
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-			
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(numPartitions, 1))
-					.map(new FailingIdentityMapper<Integer>(failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-			
-			FailingIdentityMapper.failedBefore = false;
-			tryExecute(env, "multi-source-one-partitions exactly once test");
-
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-			
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	
-	/**
-	 * Tests that the source can be properly canceled when reading full partitions. 
-	 */
-	public void runCancelingOnFullInputTest() {
-		try {
-			final String topic = "cancelingOnFullTopic";
-
-			final int parallelism = 3;
-			createTestTopic(topic, parallelism, 1);
-
-			// launch a producer thread
-			DataGenerators.InfiniteStringsGenerator generator =
-					new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
-			generator.start();
-
-			// launch a consumer asynchronously
-
-			final AtomicReference<Throwable> jobError = new AtomicReference<Throwable>();
-
-			final Runnable jobRunner = new Runnable() {
-				@Override
-				public void run() {
-					try {
-						final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-						env.setParallelism(parallelism);
-						env.enableCheckpointing(100);
-						env.getConfig().disableSysoutLogging();
-
-						FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
-
-						env.addSource(source).addSink(new DiscardingSink<String>());
-
-						env.execute();
-					}
-					catch (Throwable t) {
-						jobError.set(t);
-					}
-				}
-			};
-
-			Thread runnerThread = new Thread(jobRunner, "program runner thread");
-			runnerThread.start();
-
-			// wait a bit before canceling
-			Thread.sleep(2000);
-
-			// cancel
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManager());
-
-			// wait for the program to be done and validate that we failed with the right exception
-			runnerThread.join();
-
-			Throwable failueCause = jobError.get();
-			assertNotNull("program did not fail properly due to canceling", failueCause);
-			assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
-			if (generator.isAlive()) {
-				generator.shutdown();
-				generator.join();
-			}
-			else {
-				Throwable t = generator.getError();
-				if (t != null) {
-					t.printStackTrace();
-					fail("Generator failed: " + t.getMessage());
-				} else {
-					fail("Generator failed with no exception");
-				}
-			}
-
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests that the source can be properly canceled when reading empty partitions. 
-	 */
-	public void runCancelingOnEmptyInputTest() {
-		try {
-			final String topic = "cancelingOnEmptyInputTopic";
-
-			final int parallelism = 3;
-			createTestTopic(topic, parallelism, 1);
-
-			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
-			final Runnable jobRunner = new Runnable() {
-				@Override
-				public void run() {
-					try {
-						final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-						env.setParallelism(parallelism);
-						env.enableCheckpointing(100);
-						env.getConfig().disableSysoutLogging();
-
-						FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
-
-						env.addSource(source).addSink(new DiscardingSink<String>());
-
-						env.execute();
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-
-			Thread runnerThread = new Thread(jobRunner, "program runner thread");
-			runnerThread.start();
-
-			// wait a bit before canceling
-			Thread.sleep(2000);
-
-			// cancel
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManager());
-
-			// wait for the program to be done and validate that we failed with the right exception
-			runnerThread.join();
-
-			Throwable failueCause = error.get();
-			assertNotNull("program did not fail properly due to canceling", failueCause);
-			assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests that the source can be properly canceled when reading full partitions. 
-	 */
-	public void runFailOnDeployTest() {
-		try {
-			final String topic = "failOnDeployTopic";
-			
-			createTestTopic(topic, 2, 1);
-
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setParallelism(12); // needs to be more that the mini cluster has slots
-			env.getConfig().disableSysoutLogging();
-
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-			
-			env
-					.addSource(kafkaSource)
-					.addSink(new DiscardingSink<Integer>());
-			
-			try {
-				env.execute();
-				fail("this test should fail with an exception");
-			}
-			catch (ProgramInvocationException e) {
-				
-				// validate that we failed due to a NoResourceAvailableException
-				Throwable cause = e.getCause();
-				int depth = 0;
-				boolean foundResourceException = false;
-				
-				while (cause != null && depth++ < 20) {
-					if (cause instanceof NoResourceAvailableException) {
-						foundResourceException = true;
-						break;
-					}
-					cause = cause.getCause();
-				}
-				
-				assertTrue("Wrong exception", foundResourceException);
-			}
-
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Test Flink's Kafka integration also with very big records (30MB)
-	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
-	 */
-	public void runBigRecordTestTopology() {
-		try {
-			LOG.info("Starting runBigRecordTestTopology()");
-
-			final String topic = "bigRecordTestTopic";
-			final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
-			
-			createTestTopic(topic, parallelism, 1);
-
-			final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
-
-			final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
-					new TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(longBytesInfo, new ExecutionConfig());
-
-			final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
-					new TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(longBytesInfo, new ExecutionConfig());
-
-			final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setNumberOfExecutionRetries(0);
-			env.getConfig().disableSysoutLogging();
-			env.enableCheckpointing(100);
-			env.setParallelism(parallelism);
-
-			// add consuming topology:
-			Properties consumerProps = new Properties();
-			consumerProps.putAll(standardProps);
-			consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
-			consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
-			consumerProps.setProperty("queued.max.message.chunks", "1");
-
-			FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
-			DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
-
-			consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
-
-				private int elCnt = 0;
-
-				@Override
-				public void invoke(Tuple2<Long, byte[]> value) throws Exception {
-					elCnt++;
-					if (value.f0 == -1) {
-						// we should have seen 11 elements now.
-						if(elCnt == 11) {
-							throw new SuccessException();
-						} else {
-							throw new RuntimeException("There have been "+elCnt+" elements");
-						}
-					}
-					if(elCnt > 10) {
-						throw new RuntimeException("More than 10 elements seen: "+elCnt);
-					}
-				}
-			});
-
-			// add producing topology
-			Properties producerProps = new Properties();
-			producerProps.setProperty("max.message.size", Integer.toString(1024 * 1024 * 30));
-			
-			DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
-
-				private boolean running;
-
-				@Override
-				public void open(Configuration parameters) throws Exception {
-					super.open(parameters);
-					running = true;
-				}
-
-				@Override
-				public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
-					Random rnd = new Random();
-					long cnt = 0;
-					int fifteenMb = 1024 * 1024 * 15;
-
-					while (running) {
-						byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
-						ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
-
-						Thread.sleep(100);
-
-						if (cnt == 10) {
-							// signal end
-							ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
-							break;
-						}
-					}
-				}
-
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			});
-
-			stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
-					producerProps, deserSchema));
-
-			tryExecute(env, "big topology test");
-
-			deleteTestTopic(topic);
-			
-			LOG.info("Finished runBigRecordTestTopology()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	
-	public void runBrokerFailureTest() {
-		try {
-			LOG.info("starting runBrokerFailureTest()");
-			
-			final String topic = "brokerFailureTestTopic";
-
-			final int parallelism = 2;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = parallelism * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-			
-
-			createTestTopic(topic, parallelism, 2);
-
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings,
-					topic, parallelism, numElementsPerPartition, true);
-
-			// find leader to shut down
-			ZkClient zkClient = createZookeeperClient();
-			PartitionMetadata firstPart = null;
-			do {
-				if (firstPart != null) {
-					LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
-					// not the first try. Sleep a bit
-					Thread.sleep(150);
-				}
-
-				Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
-				firstPart = partitionMetadata.head();
-			}
-			while (firstPart.errorCode() != 0);
-			zkClient.close();
-
-			final String leaderToShutDown = firstPart.leader().get().connectionString();
-			LOG.info("Leader to shutdown {}", leaderToShutDown);
-			
-			
-			// run the topology that fails and recovers
-
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setParallelism(parallelism);
-			env.enableCheckpointing(500);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-			
-
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(parallelism, 1))
-					.map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-			BrokerKillingMapper.killedLeaderBefore = false;
-			tryExecute(env, "One-to-one exactly once test");
-
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					BrokerKillingMapper.hasBeenCheckpointedBeforeFailure);
-
-			LOG.info("finished runBrokerFailureTest()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Reading writing test data sets
-	// ------------------------------------------------------------------------
-
-	private void readSequence(StreamExecutionEnvironment env, Properties cc,
-								final int sourceParallelism,
-								final String topicName,
-								final int valuesCount, final int startFrom) throws Exception {
-
-		final int finalCount = valuesCount * sourceParallelism;
-
-		final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
-				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(intIntTupleType, env.getConfig());
-
-		// create the consumer
-		FlinkKafkaConsumer<Tuple2<Integer, Integer>> consumer = getConsumer(topicName, deser, cc);
-
-		DataStream<Tuple2<Integer, Integer>> source = env
-				.addSource(consumer).setParallelism(sourceParallelism)
-				.map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
-
-		// verify data
-		source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
-
-			private int[] values = new int[valuesCount];
-			private int count = 0;
-
-			@Override
-			public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
-				values[value.f1 - startFrom]++;
-				count++;
-
-				// verify if we've seen everything
-				if (count == finalCount) {
-					for (int i = 0; i < values.length; i++) {
-						int v = values[i];
-						if (v != sourceParallelism) {
-							printTopic(topicName, valuesCount, deser);
-							throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
-						}
-					}
-					// test has passed
-					throw new SuccessException();
-				}
-			}
-
-		}).setParallelism(1);
-
-		tryExecute(env, "Read data from Kafka");
-
-		LOG.info("Successfully read sequence for verification");
-	}
-
-	private static void writeSequence(StreamExecutionEnvironment env, String topicName,
-									  final int numElements, int parallelism) throws Exception {
-
-		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
-			private boolean running = true;
-
-			@Override
-			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-				int cnt = 0;
-				int partition = getRuntimeContext().getIndexOfThisSubtask();
-
-				while (running && cnt < numElements) {
-					ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
-					cnt++;
-				}
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-		}).setParallelism(parallelism);
-		
-		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
-				topicName,
-				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
-				new Tuple2Partitioner(parallelism)
-		)).setParallelism(parallelism);
-
-		env.execute("Write sequence");
-
-		LOG.info("Finished writing sequence");
-	}
-
-	// ------------------------------------------------------------------------
-	//  Debugging utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Read topic to list, only using Kafka code.
-	 */
-	private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
-		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
-		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
-		// will see each message only once.
-		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
-		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
-		if(streams.size() != 1) {
-			throw new RuntimeException("Expected only one message stream but got "+streams.size());
-		}
-		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
-		if(kafkaStreams == null) {
-			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
-		}
-		if(kafkaStreams.size() != 1) {
-			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
-		}
-		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
-		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
-
-		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
-		int read = 0;
-		while(iteratorToRead.hasNext()) {
-			read++;
-			result.add(iteratorToRead.next());
-			if(read == stopAfter) {
-				LOG.info("Read "+read+" elements");
-				return result;
-			}
-		}
-		return result;
-	}
-
-	private static void printTopic(String topicName, ConsumerConfig config,
-								   DeserializationSchema<?> deserializationSchema,
-								   int stopAfter) {
-
-		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
-		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
-
-		for (MessageAndMetadata<byte[], byte[]> message: contents) {
-			Object out = deserializationSchema.deserialize(message.message());
-			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
-		}
-	}
-
-	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) {
-		// write the sequence to log for debugging purposes
-		Properties stdProps = standardCC.props().props();
-		Properties newProps = new Properties(stdProps);
-		newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
-		newProps.setProperty("auto.offset.reset", "smallest");
-		newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
-
-		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
-		printTopic(topicName, printerConfig, deserializer, elements);
-	}
-
-
-	public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
-			implements Checkpointed<Integer>, CheckpointCommitter {
-
-		private static final long serialVersionUID = 6334389850158707313L;
-
-		public static volatile boolean killedLeaderBefore;
-		public static volatile boolean hasBeenCheckpointedBeforeFailure;
-		
-		private final String leaderToShutDown;
-		private final int failCount;
-		private int numElementsTotal;
-
-		private boolean failer;
-		private boolean hasBeenCheckpointed;
-
-
-		public BrokerKillingMapper(String leaderToShutDown, int failCount) {
-			this.leaderToShutDown = leaderToShutDown;
-			this.failCount = failCount;
-		}
-
-		@Override
-		public void open(Configuration parameters) {
-			failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
-		}
-
-		@Override
-		public T map(T value) throws Exception {
-			numElementsTotal++;
-			
-			if (!killedLeaderBefore) {
-				Thread.sleep(10);
-				
-				if (failer && numElementsTotal >= failCount) {
-					// shut down a Kafka broker
-					KafkaServer toShutDown = null;
-					for (KafkaServer kafkaServer : brokers) {
-						if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
-							toShutDown = kafkaServer;
-							break;
-						}
-					}
-	
-					if (toShutDown == null) {
-						throw new Exception("Cannot find broker to shut down");
-					}
-					else {
-						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
-						killedLeaderBefore = true;
-						toShutDown.shutdown();
-					}
-				}
-			}
-			return value;
-		}
-
-		@Override
-		public void commitCheckpoint(long checkpointId) {
-			hasBeenCheckpointed = true;
-		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return numElementsTotal;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			this.numElementsTotal = state;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaITCase.java
deleted file mode 100644
index 54ce4ae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import org.junit.Test;
-
-import java.util.Properties;
-
-
-public class KafkaITCase extends KafkaConsumerTestBase {
-	
-	@Override
-	protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
-		return new FlinkKafkaConsumer081<T>(topic, deserializationSchema, props);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Suite of Tests
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testCheckpointing() {
-		runCheckpointingTest();
-	}
-
-	@Test
-	public void testOffsetInZookeeper() {
-		runOffsetInZookeeperValidationTest();
-	}
-	
-	@Test
-	public void testConcurrentProducerConsumerTopology() {
-		runSimpleConcurrentProducerConsumerTopology();
-	}
-
-	// --- canceling / failures ---
-	
-	@Test
-	public void testCancelingEmptyTopic() {
-		runCancelingOnEmptyInputTest();
-	}
-
-	@Test
-	public void testCancelingFullTopic() {
-		runCancelingOnFullInputTest();
-	}
-
-	@Test
-	public void testFailOnDeploy() {
-		runFailOnDeployTest();
-	}
-
-	// --- source to partition mappings and exactly once ---
-	
-	@Test
-	public void testOneToOneSources() {
-		runOneToOneExactlyOnceTest();
-	}
-
-	@Test
-	public void testOneSourceMultiplePartitions() {
-		runOneSourceMultiplePartitionsExactlyOnceTest();
-	}
-
-	@Test
-	public void testMultipleSourcesOnePartition() {
-		runMultipleSourcesOnePartitionExactlyOnceTest();
-	}
-
-	// --- broker failure ---
-
-	@Test
-	public void testBrokerFailure() {
-		runBrokerFailureTest();
-	}
-
-	// --- special executions ---
-	
-	@Test
-	public void testBigRecordJob() {
-		runBigRecordTestTopology();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
deleted file mode 100644
index b910b54..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import kafka.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaLocalSystemTime implements Time {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
-	@Override
-	public long milliseconds() {
-		return System.currentTimeMillis();
-	}
-
-	@Override
-	public long nanoseconds() {
-		return System.nanoTime();
-	}
-
-	@Override
-	public void sleep(long ms) {
-		try {
-			Thread.sleep(ms);
-		} catch (InterruptedException e) {
-			LOG.warn("Interruption", e);
-		}
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
deleted file mode 100644
index 2f14fef..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.testutils.SuccessException;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("serial")
-public class KafkaProducerITCase extends KafkaTestBase {
-
-
-	/**
-	 * 
-	 * <pre>
-	 *             +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
-	 *            /                  |                                       \
-	 *           /                   |                                        \
-	 * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
-	 *           \                   |                                        /
-	 *            \                  |                                       /
-	 *             +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
-	 * </pre>
-	 * 
-	 * The mapper validates that the values come consistently from the correct Kafka partition.
-	 * 
-	 * The final sink validates that there are no duplicates and that all partitions are present.
-	 */
-	@Test
-	public void testCustomPartitioning() {
-		try {
-			LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
-
-			final String topic = "customPartitioningTestTopic";
-			final int parallelism = 3;
-			
-			createTestTopic(topic, parallelism, 1);
-
-			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setNumberOfExecutionRetries(0);
-			env.getConfig().disableSysoutLogging();
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
-					new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo, env.getConfig());
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
-					new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo, env.getConfig());
-
-			// ------ producing topology ---------
-			
-			// source has DOP 1 to make sure it generates no duplicates
-			DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
-
-				private boolean running = true;
-
-				@Override
-				public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
-					long cnt = 0;
-					while (running) {
-						ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
-						cnt++;
-					}
-				}
-
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			})
-			.setParallelism(1);
-			
-			// sink partitions into 
-			stream.addSink(new KafkaSink<Tuple2<Long, String>>(
-					brokerConnectionStrings, topic,serSchema, new CustomPartitioner(parallelism)))
-			.setParallelism(parallelism);
-
-			// ------ consuming topology ---------
-			
-			FlinkKafkaConsumer<Tuple2<Long, String>> source = 
-					new FlinkKafkaConsumer<Tuple2<Long, String>>(topic, deserSchema, standardProps,
-							FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
-							FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
-			
-			env.addSource(source).setParallelism(parallelism)
-
-					// mapper that validates partitioning and maps to partition
-					.map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
-						
-						private int ourPartition = -1;
-						@Override
-						public Integer map(Tuple2<Long, String> value) {
-							int partition = value.f0.intValue() % parallelism;
-							if (ourPartition != -1) {
-								assertEquals("inconsistent partitioning", ourPartition, partition);
-							} else {
-								ourPartition = partition;
-							}
-							return partition;
-						}
-					}).setParallelism(parallelism)
-					
-					.addSink(new SinkFunction<Integer>() {
-						
-						private int[] valuesPerPartition = new int[parallelism];
-						
-						@Override
-						public void invoke(Integer value) throws Exception {
-							valuesPerPartition[value]++;
-							
-							boolean missing = false;
-							for (int i : valuesPerPartition) {
-								if (i < 100) {
-									missing = true;
-									break;
-								}
-							}
-							if (!missing) {
-								throw new SuccessException();
-							}
-						}
-					}).setParallelism(1);
-			
-			tryExecute(env, "custom partitioning test");
-
-			deleteTestTopic(topic);
-			
-			LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	
-	// ------------------------------------------------------------------------
-
-	public static class CustomPartitioner implements SerializableKafkaPartitioner {
-
-		private final int expectedPartitions;
-
-		public CustomPartitioner(int expectedPartitions) {
-			this.expectedPartitions = expectedPartitions;
-		}
-
-		@Override
-		public int partition(Object key, int numPartitions) {
-			@SuppressWarnings("unchecked")
-			Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
-			
-			assertEquals(expectedPartitions, numPartitions);
-			
-			return (int) (tuple.f0 % numPartitions);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
deleted file mode 100644
index e177497..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import kafka.admin.AdminUtils;
-import kafka.consumer.ConsumerConfig;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.internals.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.testutils.SuccessException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * The base for the Kafka tests. It brings up:
- * <ul>
- *     <li>A ZooKeeper mini cluster</li>
- *     <li>Three Kafka Brokers (mini clusters)</li>
- *     <li>A Flink mini cluster</li>
- * </ul>
- * 
- * <p>Code in this test is based on the following GitHub repository:
- * <a href="https://github.com/sakserv/hadoop-mini-clusters">
- *   https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
- * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
- */
-@SuppressWarnings("serial")
-public abstract class KafkaTestBase {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
-	
-	protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
-
-	protected static String zookeeperConnectionString;
-
-	protected static File tmpZkDir;
-
-	protected static File tmpKafkaParent;
-
-	protected static TestingServer zookeeper;
-	protected static List<KafkaServer> brokers;
-	protected static String brokerConnectionStrings = "";
-
-	protected static ConsumerConfig standardCC;
-	protected static Properties standardProps;
-	
-	protected static ForkableFlinkMiniCluster flink;
-
-	protected static int flinkPort;
-	
-	
-	
-	// ------------------------------------------------------------------------
-	//  Setup and teardown of the mini clusters
-	// ------------------------------------------------------------------------
-	
-	@BeforeClass
-	public static void prepare() throws IOException {
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Starting KafkaITCase ");
-		LOG.info("-------------------------------------------------------------------------");
-		
-		LOG.info("Starting KafkaITCase.prepare()");
-		
-		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-		
-		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
-		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
-
-		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
-		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
-
-		List<File> tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
-		for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
-			File tmpDir = new File(tmpKafkaParent, "server-" + i);
-			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
-			tmpKafkaDirs.add(tmpDir);
-		}
-
-		String kafkaHost = "localhost";
-		int zkPort = NetUtils.getAvailablePort();
-		zookeeperConnectionString = "localhost:" + zkPort;
-
-		zookeeper = null;
-		brokers = null;
-
-		try {
-			LOG.info("Starting Zookeeper");
-			zookeeper = new TestingServer(zkPort, tmpZkDir);
-			
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
-			
-			for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
-				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i), kafkaHost, zookeeperConnectionString));
-				SocketServer socketServer = brokers.get(i).socketServer();
-				
-				String host = socketServer.host() == null ? "localhost" : socketServer.host();
-				brokerConnectionStrings += host+":"+socketServer.port()+",";
-			}
-
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
-		}
-
-		standardProps = new Properties();
-
-		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
-		standardProps.setProperty("bootstrap.servers", brokerConnectionStrings);
-		standardProps.setProperty("group.id", "flink-tests");
-		standardProps.setProperty("auto.commit.enable", "false");
-		standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis.
-		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
-		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
-		
-		Properties consumerConfigProps = new Properties();
-		consumerConfigProps.putAll(standardProps);
-		consumerConfigProps.setProperty("auto.offset.reset", "smallest");
-		standardCC = new ConsumerConfig(consumerConfigProps);
-		
-		// start also a re-usable Flink mini cluster
-		
-		Configuration flinkConfig = new Configuration();
-		flinkConfig.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
-		flinkConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
-
-		flink = new ForkableFlinkMiniCluster(flinkConfig, false, StreamingMode.STREAMING);
-		flinkPort = flink.getJobManagerRPCPort();
-	}
-
-	@AfterClass
-	public static void shutDownServices() {
-
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Shut down KafkaITCase ");
-		LOG.info("-------------------------------------------------------------------------");
-
-		flinkPort = -1;
-		flink.shutdown();
-		
-		for (KafkaServer broker : brokers) {
-			if (broker != null) {
-				broker.shutdown();
-			}
-		}
-		brokers.clear();
-		
-		if (zookeeper != null) {
-			try {
-				zookeeper.stop();
-			}
-			catch (Exception e) {
-				LOG.warn("ZK.stop() failed", e);
-			}
-			zookeeper = null;
-		}
-		
-		// clean up the temp spaces
-		
-		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
-			try {
-				FileUtils.deleteDirectory(tmpKafkaParent);
-			}
-			catch (Exception e) {
-				// ignore
-			}
-		}
-		if (tmpZkDir != null && tmpZkDir.exists()) {
-			try {
-				FileUtils.deleteDirectory(tmpZkDir);
-			}
-			catch (Exception e) {
-				// ignore
-			}
-		}
-
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    KafkaITCase finished"); 
-		LOG.info("-------------------------------------------------------------------------");
-	}
-
-	/**
-	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-	 */
-	private static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
-												String kafkaHost,
-												String zookeeperConnectionString) throws Exception {
-		Properties kafkaProperties = new Properties();
-
-		int kafkaPort = NetUtils.getAvailablePort();
-
-		// properties have to be Strings
-		kafkaProperties.put("advertised.host.name", kafkaHost);
-		kafkaProperties.put("port", Integer.toString(kafkaPort));
-		kafkaProperties.put("broker.id", Integer.toString(brokerId));
-		kafkaProperties.put("log.dir", tmpFolder.toString());
-		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
-		kafkaProperties.put("message.max.bytes", "" + (50 * 1024 * 1024));
-		kafkaProperties.put("replica.fetch.max.bytes", "" + (50 * 1024 * 1024));
-		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
-		KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
-		server.startup();
-		return server;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Execution utilities
-	// ------------------------------------------------------------------------
-	
-	protected ZkClient createZookeeperClient() {
-		return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-	}
-	
-	protected static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
-		try {
-			see.execute(name);
-		} catch (Exception root) {
-			Throwable cause = root.getCause();
-
-			// search for nested SuccessExceptions
-			int depth = 0;
-			while (!(cause instanceof SuccessException)) {
-				if (cause == null || depth++ == 20) {
-					root.printStackTrace();
-					fail("Test failed: " + root.getMessage());
-				} else {
-					cause = cause.getCause();
-				}
-			}
-		}
-	}
-
-	protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
-		
-		// create topic with one client
-		Properties topicConfig = new Properties();
-		LOG.info("Creating topic {}", topic);
-
-		ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-		
-		AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig);
-		creator.close();
-		
-		// validate that the topic has been created
-		final long deadline = System.currentTimeMillis() + 30000;
-		do {
-			List<PartitionInfo> partitions = FlinkKafkaConsumer.getPartitionsForTopic(topic, standardProps);
-			if (partitions != null && partitions.size() > 0) {
-				return;
-			}
-		}
-		while (System.currentTimeMillis() < deadline);
-		fail ("Test topic could not be created");
-	}
-	
-	protected static void deleteTestTopic(String topic) {
-		LOG.info("Deleting topic {}", topic);
-
-		ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-
-		AdminUtils.deleteTopic(zk, topic);
-		
-		zk.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
deleted file mode 100644
index c412136..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.internals;
-
-import kafka.admin.AdminUtils;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.flink.streaming.connectors.KafkaTestBase;
-
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
-	
-	@Test
-	public void runOffsetManipulationinZooKeeperTest() {
-		try {
-			final String topicName = "ZookeeperOffsetHandlerTest-Topic";
-			final String groupId = "ZookeeperOffsetHandlerTest-Group";
-			
-			final long offset = (long) (Math.random() * Long.MAX_VALUE);
-
-			ZkClient zkClient = createZookeeperClient();
-			AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties());
-				
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, groupId, topicName, 0, offset);
-	
-			long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, groupId, topicName, 0);
-
-			zkClient.close();
-			
-			assertEquals(offset, fetchedOffset);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}


Mime
View raw message