flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [5/9] flink git commit: [FLINK-3058] Add support for Kafka 0.9.0.0
Date Wed, 20 Jan 2016 19:31:50 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
new file mode 100644
index 0000000..1bd01a2
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -0,0 +1,1371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.server.KafkaServer;
+
+import org.apache.commons.collections.map.LinkedMap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
+import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
+import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
+import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryRule;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.util.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.Assert;
+
+import org.junit.Rule;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+@SuppressWarnings("serial")
+public abstract class KafkaConsumerTestBase extends KafkaTestBase {
+	
+	@Rule
+	public RetryRule retryRule = new RetryRule();
+	
+
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	//
+	//  The tests here are all not activated (by an @Test tag), but need
+	//  to be invoked from the extending classes. That way, the classes can
+	//  select which tests to run.
+	// ------------------------------------------------------------------------
+
+
+	/**
+	 * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist
+	 * and a wrong broker was specified
+	 *
+	 * @throws Exception
+	 */
+	public void runFailOnNoBrokerTest() throws Exception {
+		try {
+			Properties properties = new Properties();
+
+			StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			see.getConfig().disableSysoutLogging();
+			see.setNumberOfExecutionRetries(0);
+			see.setParallelism(1);
+
+			// use wrong ports for the consumers
+			properties.setProperty("bootstrap.servers", "localhost:80");
+			properties.setProperty("zookeeper.connect", "localhost:80");
+			properties.setProperty("group.id", "test");
+			FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties);
+			DataStream<String> stream = see.addSource(source);
+			stream.print();
+			see.execute("No broker test");
+		} catch(RuntimeException re){
+			Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re),
+					re.getClass().equals(RuntimeException.class) &&
+					re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]"));
+		}
+	}
+	/**
+	 * Test that validates that checkpointing and checkpoint notification works properly
+	 */
+	public void runCheckpointingTest() throws Exception {
+		createTestTopic("testCheckpointing", 1, 1);
+
+		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("testCheckpointing", new SimpleStringSchema(), standardProps);
+		Field pendingCheckpointsField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
+		pendingCheckpointsField.setAccessible(true);
+		LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
+
+		Assert.assertEquals(0, pendingCheckpoints.size());
+		source.setRuntimeContext(new MockRuntimeContext(1, 0));
+
+		final HashMap<KafkaTopicPartition, Long> initialOffsets = new HashMap<>();
+		initialOffsets.put(new KafkaTopicPartition("testCheckpointing", 0), 1337L);
+
+		// first restore
+		source.restoreState(initialOffsets);
+
+		// then open
+		source.open(new Configuration());
+		HashMap<KafkaTopicPartition, Long> state1 = source.snapshotState(1, 15);
+
+		assertEquals(initialOffsets, state1);
+
+		HashMap<KafkaTopicPartition, Long> state2 = source.snapshotState(2, 30);
+		Assert.assertEquals(initialOffsets, state2);
+
+		Assert.assertEquals(2, pendingCheckpoints.size());
+
+		source.notifyCheckpointComplete(1);
+		Assert.assertEquals(1, pendingCheckpoints.size());
+
+		source.notifyCheckpointComplete(2);
+		Assert.assertEquals(0, pendingCheckpoints.size());
+
+		source.notifyCheckpointComplete(666); // invalid checkpoint
+		Assert.assertEquals(0, pendingCheckpoints.size());
+
+		// create 500 snapshots
+		for (int i = 100; i < 600; i++) {
+			source.snapshotState(i, 15 * i);
+		}
+		Assert.assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
+
+		// commit only the second last
+		source.notifyCheckpointComplete(598);
+		Assert.assertEquals(1, pendingCheckpoints.size());
+
+		// access invalid checkpoint
+		source.notifyCheckpointComplete(590);
+
+		// and the last
+		source.notifyCheckpointComplete(599);
+		Assert.assertEquals(0, pendingCheckpoints.size());
+
+		source.close();
+
+		deleteTestTopic("testCheckpointing");
+	}
+
+
+
+	/**
+	 * Ensure Kafka is working on both producer and consumer side.
+	 * This executes a job that contains two Flink pipelines.
+	 *
+	 * <pre>
+	 * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
+	 * </pre>
+	 * 
+	 * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
+	 * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
+	 * cause the test to fail.
+	 *
+	 * This test also ensures that FLINK-3156 doesn't happen again:
+	 *
+	 * The following situation caused a NPE in the FlinkKafkaConsumer
+	 *
+	 * topic-1 <-- elements are only produced into topic1.
+	 * topic-2
+	 *
+	 * Therefore, this test is consuming as well from an empty topic.
+	 *
+	 */
+	@RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
+	public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
+		final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
+		final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString();
+
+		final int parallelism = 3;
+		final int elementsPerPartition = 100;
+		final int totalElements = parallelism * elementsPerPartition;
+
+		createTestTopic(topic, parallelism, 2);
+		createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will remain empty all the time
+
+		final StreamExecutionEnvironment env =
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(500);
+		env.setNumberOfExecutionRetries(0); // fail immediately
+		env.getConfig().disableSysoutLogging();
+
+		TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
+
+		TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
+				new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
+
+		TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
+				new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
+
+		// ----------- add producer dataflow ----------
+
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
+
+			private boolean running = true;
+
+			@Override
+			public void run(SourceContext<Tuple2<Long, String>> ctx) throws InterruptedException {
+				int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
+				int limit = cnt + elementsPerPartition;
+
+
+				while (running && cnt < limit) {
+					ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt));
+					cnt++;
+					// we delay data generation a bit so that we are sure that some checkpoints are
+					// triggered (for FLINK-3156)
+					Thread.sleep(50);
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+		stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<Tuple2<Long, String>>(sinkSchema), FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings), null));
+
+		// ----------- add consumer dataflow ----------
+
+		List<String> topics = new ArrayList<>();
+		topics.add(topic);
+		topics.add(additionalEmptyTopic);
+		FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, standardProps);
+
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
+
+		consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
+
+			private int elCnt = 0;
+			private BitSet validator = new BitSet(totalElements);
+
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				assertFalse("Received tuple twice", validator.get(v));
+				validator.set(v);
+				elCnt++;
+
+				if (elCnt == totalElements) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != totalElements) {
+						fail("The bitset was not set to 1 on all elements. Next clear:"
+								+ nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
+			}
+
+			@Override
+			public void close() throws Exception {
+				super.close();
+			}
+		}).setParallelism(1);
+
+		try {
+			tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology");
+		}
+		catch (ProgramInvocationException | JobExecutionException e) {
+			// look for NotLeaderForPartitionException
+			Throwable cause = e.getCause();
+
+			// search for nested SuccessExceptions
+			int depth = 0;
+			while (cause != null && depth++ < 20) {
+				if (cause instanceof kafka.common.NotLeaderForPartitionException) {
+					throw (Exception) cause;
+				}
+				cause = cause.getCause();
+			}
+			throw e;
+		}
+
+		deleteTestTopic(topic);
+	}
+
+
+	/**
+	 * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
+	 * Flink sources.
+	 */
+	public void runOneToOneExactlyOnceTest() throws Exception {
+
+		final String topic = "oneToOneTopic";
+		final int parallelism = 5;
+		final int numElementsPerPartition = 1000;
+		final int totalElements = parallelism * numElementsPerPartition;
+		final int failAfterElements = numElementsPerPartition / 3;
+
+		createTestTopic(topic, parallelism, 1);
+
+		DataGenerators.generateRandomizedIntegerSequence(
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				kafkaServer,
+				topic, parallelism, numElementsPerPartition, true);
+
+		// run the topology that fails and recovers
+
+		DeserializationSchema<Integer> schema =
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.enableCheckpointing(500);
+		env.setParallelism(parallelism);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
+
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+
+		env
+				.addSource(kafkaSource)
+				.map(new PartitionValidatingMapper(parallelism, 1))
+				.map(new FailingIdentityMapper<Integer>(failAfterElements))
+				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+		FailingIdentityMapper.failedBefore = false;
+		tryExecute(env, "One-to-one exactly once test");
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
+	 * one Flink source will read multiple Kafka partitions.
+	 */
+	public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
+		final String topic = "oneToManyTopic";
+		final int numPartitions = 5;
+		final int numElementsPerPartition = 1000;
+		final int totalElements = numPartitions * numElementsPerPartition;
+		final int failAfterElements = numElementsPerPartition / 3;
+
+		final int parallelism = 2;
+
+		createTestTopic(topic, numPartitions, 1);
+
+		DataGenerators.generateRandomizedIntegerSequence(
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				kafkaServer,
+				topic, numPartitions, numElementsPerPartition, true);
+
+		// run the topology that fails and recovers
+
+		DeserializationSchema<Integer> schema =
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.enableCheckpointing(500);
+		env.setParallelism(parallelism);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
+
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+
+		env
+				.addSource(kafkaSource)
+				.map(new PartitionValidatingMapper(numPartitions, 3))
+				.map(new FailingIdentityMapper<Integer>(failAfterElements))
+				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+		FailingIdentityMapper.failedBefore = false;
+		tryExecute(env, "One-source-multi-partitions exactly once test");
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
+	 * that some Flink sources will read no partitions.
+	 */
+	public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
+		final String topic = "manyToOneTopic";
+		final int numPartitions = 5;
+		final int numElementsPerPartition = 1000;
+		final int totalElements = numPartitions * numElementsPerPartition;
+		final int failAfterElements = numElementsPerPartition / 3;
+
+		final int parallelism = 8;
+
+		createTestTopic(topic, numPartitions, 1);
+
+		DataGenerators.generateRandomizedIntegerSequence(
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				kafkaServer,
+				topic, numPartitions, numElementsPerPartition, true);
+
+		// run the topology that fails and recovers
+
+		DeserializationSchema<Integer> schema =
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.enableCheckpointing(500);
+		env.setParallelism(parallelism);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
+		env.setBufferTimeout(0);
+
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+
+		env
+			.addSource(kafkaSource)
+			.map(new PartitionValidatingMapper(numPartitions, 1))
+			.map(new FailingIdentityMapper<Integer>(failAfterElements))
+			.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+		FailingIdentityMapper.failedBefore = false;
+		tryExecute(env, "multi-source-one-partitions exactly once test");
+
+
+		deleteTestTopic(topic);
+	}
+	
+	
+	/**
+	 * Tests that the source can be properly canceled when reading full partitions. 
+	 */
+	public void runCancelingOnFullInputTest() throws Exception {
+		final String topic = "cancelingOnFullTopic";
+
+		final int parallelism = 3;
+		createTestTopic(topic, parallelism, 1);
+
+		// launch a producer thread
+		DataGenerators.InfiniteStringsGenerator generator =
+				new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
+		generator.start();
+
+		// launch a consumer asynchronously
+
+		final AtomicReference<Throwable> jobError = new AtomicReference<>();
+
+		final Runnable jobRunner = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+					env.setParallelism(parallelism);
+					env.enableCheckpointing(100);
+					env.getConfig().disableSysoutLogging();
+
+					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps);
+
+					env.addSource(source).addSink(new DiscardingSink<String>());
+
+					env.execute();
+				}
+				catch (Throwable t) {
+					jobError.set(t);
+				}
+			}
+		};
+
+		Thread runnerThread = new Thread(jobRunner, "program runner thread");
+		runnerThread.start();
+
+		// wait a bit before canceling
+		Thread.sleep(8000);
+
+		Throwable failueCause = jobError.get();
+		if(failueCause != null) {
+			failueCause.printStackTrace();
+			Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
+		}
+
+		// cancel
+		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+		// wait for the program to be done and validate that we failed with the right exception
+		runnerThread.join();
+
+		failueCause = jobError.get();
+		assertNotNull("program did not fail properly due to canceling", failueCause);
+		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+
+		if (generator.isAlive()) {
+			generator.shutdown();
+			generator.join();
+		}
+		else {
+			Throwable t = generator.getError();
+			if (t != null) {
+				t.printStackTrace();
+				fail("Generator failed: " + t.getMessage());
+			} else {
+				fail("Generator failed with no exception");
+			}
+		}
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * Tests that the source can be properly canceled when reading empty partitions. 
+	 */
+	public void runCancelingOnEmptyInputTest() throws Exception {
+		final String topic = "cancelingOnEmptyInputTopic";
+
+		final int parallelism = 3;
+		createTestTopic(topic, parallelism, 1);
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final Runnable jobRunner = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+					env.setParallelism(parallelism);
+					env.enableCheckpointing(100);
+					env.getConfig().disableSysoutLogging();
+
+					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps);
+
+					env.addSource(source).addSink(new DiscardingSink<String>());
+
+					env.execute();
+				}
+				catch (Throwable t) {
+					LOG.error("Job Runner failed with exception", t);
+					error.set(t);
+				}
+			}
+		};
+
+		Thread runnerThread = new Thread(jobRunner, "program runner thread");
+		runnerThread.start();
+
+		// wait a bit before canceling
+		Thread.sleep(8000);
+
+		Throwable failueCause = error.get();
+		if(failueCause != null) {
+			failueCause.printStackTrace();
+			Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
+		}
+		// cancel
+		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+		// wait for the program to be done and validate that we failed with the right exception
+		runnerThread.join();
+
+		failueCause = error.get();
+		assertNotNull("program did not fail properly due to canceling", failueCause);
+		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * Tests that the source can be properly canceled when reading full partitions. 
+	 */
+	public void runFailOnDeployTest() throws Exception {
+		final String topic = "failOnDeployTopic";
+
+		createTestTopic(topic, 2, 1);
+
+		DeserializationSchema<Integer> schema =
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(12); // needs to be more that the mini cluster has slots
+		env.getConfig().disableSysoutLogging();
+
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+
+		env
+				.addSource(kafkaSource)
+				.addSink(new DiscardingSink<Integer>());
+
+		try {
+			env.execute();
+			fail("this test should fail with an exception");
+		}
+		catch (ProgramInvocationException e) {
+
+			// validate that we failed due to a NoResourceAvailableException
+			Throwable cause = e.getCause();
+			int depth = 0;
+			boolean foundResourceException = false;
+
+			while (cause != null && depth++ < 20) {
+				if (cause instanceof NoResourceAvailableException) {
+					foundResourceException = true;
+					break;
+				}
+				cause = cause.getCause();
+			}
+
+			assertTrue("Wrong exception", foundResourceException);
+		}
+
+		deleteTestTopic(topic);
+	}
+
+	public void runConsumeMultipleTopics() throws java.lang.Exception {
+		final int NUM_TOPICS = 5;
+		final int NUM_ELEMENTS = 20;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+
+		// create topics with content
+		final List<String> topics = new ArrayList<>();
+		for (int i = 0; i < NUM_TOPICS; i++) {
+			final String topic = "topic-" + i;
+			topics.add(topic);
+			// create topic
+			createTestTopic(topic, i + 1 /*partitions*/, 1);
+
+			// write something
+			writeSequence(env, topic, NUM_ELEMENTS, i + 1);
+		}
+
+		KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> readSchema = new Tuple2WithTopicDeserializationSchema(env.getConfig());
+		DataStreamSource<Tuple3<Integer, Integer, String>> stream = env.addSource(kafkaServer.getConsumer(topics, readSchema, standardProps));
+
+		stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
+			Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
+			@Override
+			public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> out) throws Exception {
+				Integer count = countPerTopic.get(value.f2);
+				if (count == null) {
+					count = 1;
+				} else {
+					count++;
+				}
+				countPerTopic.put(value.f2, count);
+
+				// check map:
+				for (Map.Entry<String, Integer> el: countPerTopic.entrySet()) {
+					if (el.getValue() < NUM_ELEMENTS) {
+						break; // not enough yet
+					}
+					if (el.getValue() > NUM_ELEMENTS) {
+						throw new RuntimeException("There is a failure in the test. I've read " +
+								el.getValue() + " from topic " + el.getKey());
+					}
+				}
+				// we've seen messages from all topics
+				throw new SuccessException();
+			}
+		}).setParallelism(1);
+
+		tryExecute(env, "Count elements from the topics");
+
+
+		// delete all topics again
+		for (int i = 0; i < NUM_TOPICS; i++) {
+			final String topic = "topic-" + i;
+			deleteTestTopic(topic);
+		}
+	}
+
+	private static class Tuple2WithTopicDeserializationSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> {
+
+		TypeSerializer ts;
+		public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) {
+			ts = TypeInfoParser.parse("Tuple2<Integer, Integer>").createSerializer(ec);
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+			Tuple2<Integer, Integer> t2 = (Tuple2<Integer, Integer>) ts.deserialize(new ByteArrayInputView(message));
+			return new Tuple3<>(t2.f0, t2.f1, topic);
+		}
+
+		@Override
+		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+		}
+	}
+
+	/**
+	 * Test Flink's Kafka integration also with very big records (30MB)
+	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+	 *
+	 */
+	public void runBigRecordTestTopology() throws Exception {
+
+		final String topic = "bigRecordTestTopic";
+		final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
+
+		createTestTopic(topic, parallelism, 1);
+
+		final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
+
+		final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
+				new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
+
+		final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
+				new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setNumberOfExecutionRetries(0);
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(100);
+		env.setParallelism(parallelism);
+
+		// add consuming topology:
+		Properties consumerProps = new Properties();
+		consumerProps.putAll(standardProps);
+		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
+		consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
+		consumerProps.setProperty("queued.max.message.chunks", "1");
+
+		FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps);
+		DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
+
+		consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+
+			private int elCnt = 0;
+
+			@Override
+			public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+				elCnt++;
+				if (value.f0 == -1) {
+					// we should have seen 11 elements now.
+					if (elCnt == 11) {
+						throw new SuccessException();
+					} else {
+						throw new RuntimeException("There have been "+elCnt+" elements");
+					}
+				}
+				if (elCnt > 10) {
+					throw new RuntimeException("More than 10 elements seen: "+elCnt);
+				}
+			}
+		});
+
+		// add producing topology
+		Properties producerProps = new Properties();
+		producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30));
+		producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
+
+		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+
+			private boolean running;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				running = true;
+			}
+
+			@Override
+			public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
+				Random rnd = new Random();
+				long cnt = 0;
+				int fifteenMb = 1024 * 1024 * 15;
+
+				while (running) {
+					byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
+					ctx.collect(new Tuple2<>(cnt++, wl));
+
+					Thread.sleep(100);
+
+					if (cnt == 10) {
+						// signal end
+						ctx.collect(new Tuple2<>(-1L, new byte[]{1}));
+						break;
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null));
+
+		tryExecute(env, "big topology test");
+		deleteTestTopic(topic);
+	}
+
+	
+	public void runBrokerFailureTest() throws Exception {
+		final String topic = "brokerFailureTestTopic";
+
+		final int parallelism = 2;
+		final int numElementsPerPartition = 1000;
+		final int totalElements = parallelism * numElementsPerPartition;
+		final int failAfterElements = numElementsPerPartition / 3;
+
+
+		createTestTopic(topic, parallelism, 2);
+
+		DataGenerators.generateRandomizedIntegerSequence(
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				kafkaServer,
+				topic, parallelism, numElementsPerPartition, true);
+
+		// find leader to shut down
+		int leaderId = kafkaServer.getLeaderToShutDown(topic);
+
+		LOG.info("Leader to shutdown {}", leaderId);
+
+
+		// run the topology that fails and recovers
+
+		DeserializationSchema<Integer> schema =
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(500);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
+
+
+		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
+
+		env
+				.addSource(kafkaSource)
+				.map(new PartitionValidatingMapper(parallelism, 1))
+				.map(new BrokerKillingMapper<Integer>(leaderId, failAfterElements))
+				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+		BrokerKillingMapper.killedLeaderBefore = false;
+		tryExecute(env, "One-to-one exactly once test");
+
+		// start a new broker:
+		kafkaServer.restartBroker(leaderId);
+	}
+
+	public void runKeyValueTest() throws Exception {
+		final String topic = "keyvaluetest";
+		createTestTopic(topic, 1, 1);
+		final int ELEMENT_COUNT = 5000;
+
+		// ----------- Write some data into Kafka -------------------
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.setNumberOfExecutionRetries(0);
+		env.getConfig().disableSysoutLogging();
+
+		DataStream<Tuple2<Long, PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<Long, PojoValue>>() {
+			@Override
+			public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception {
+				Random rnd = new Random(1337);
+				for (long i = 0; i < ELEMENT_COUNT; i++) {
+					PojoValue pojo = new PojoValue();
+					pojo.when = new Date(rnd.nextLong());
+					pojo.lon = rnd.nextLong();
+					pojo.lat = i;
+					// make every second key null to ensure proper "null" serialization
+					Long key = (i % 2 == 0) ? null : i;
+					ctx.collect(new Tuple2<>(key, pojo));
+				}
+			}
+			@Override
+			public void cancel() {
+			}
+		});
+
+		KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
+		kvStream.addSink(kafkaServer.getProducer(topic, schema, FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings), null));
+		env.execute("Write KV to Kafka");
+
+		// ----------- Read the data again -------------------
+
+		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.setNumberOfExecutionRetries(0);
+		env.getConfig().disableSysoutLogging();
+
+
+		KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
+
+		DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, standardProps));
+		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
+			long counter = 0;
+			@Override
+			public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception {
+				// the elements should be in order.
+				Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter );
+				if (value.f1.lat % 2 == 0) {
+					assertNull("key was not null", value.f0);
+				} else {
+					Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter);
+				}
+				counter++;
+				if (counter == ELEMENT_COUNT) {
+					// we got the right number of elements
+					throw new SuccessException();
+				}
+			}
+		});
+
+		tryExecute(env, "Read KV from Kafka");
+
+		deleteTestTopic(topic);
+	}
+
+	public static class PojoValue {
+		public Date when;
+		public long lon;
+		public long lat;
+		public PojoValue() {}
+	}
+
+
+	/**
+	 * Test delete behavior and metrics for producer
+	 * @throws Exception
+	 */
+	public void runAllDeletesTest() throws Exception {
+		final String topic = "alldeletestest";
+		createTestTopic(topic, 1, 1);
+		final int ELEMENT_COUNT = 300;
+
+		// ----------- Write some data into Kafka -------------------
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.setNumberOfExecutionRetries(0);
+		env.getConfig().disableSysoutLogging();
+
+		DataStream<Tuple2<byte[], PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<byte[], PojoValue>>() {
+			@Override
+			public void run(SourceContext<Tuple2<byte[], PojoValue>> ctx) throws Exception {
+				Random rnd = new Random(1337);
+				for (long i = 0; i < ELEMENT_COUNT; i++) {
+					final byte[] key = new byte[200];
+					rnd.nextBytes(key);
+					ctx.collect(new Tuple2<>(key, (PojoValue) null));
+				}
+			}
+			@Override
+			public void cancel() {
+			}
+		});
+
+		TypeInformationKeyValueSerializationSchema<byte[], PojoValue> schema = new TypeInformationKeyValueSerializationSchema<>(byte[].class, PojoValue.class, env.getConfig());
+
+		kvStream.addSink(kafkaServer.getProducer(topic, schema, FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings), null));
+
+		JobExecutionResult result = env.execute("Write deletes to Kafka");
+
+		Map<String, Object> accuResults = result.getAllAccumulatorResults();
+		// there are 37 accumulator results in Kafka 0.9
+		// and 34 in Kafka 0.8
+		Assert.assertTrue("Not enough accumulators from Kafka Producer: " + accuResults.size(), accuResults.size() > 33);
+
+		// ----------- Read the data again -------------------
+
+		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.setNumberOfExecutionRetries(0);
+		env.getConfig().disableSysoutLogging();
+
+		DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+
+		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() {
+			long counter = 0;
+			@Override
+			public void flatMap(Tuple2<byte[], PojoValue> value, Collector<Object> out) throws Exception {
+				// ensure that deleted messages are passed as nulls
+				assertNull(value.f1);
+				counter++;
+				if (counter == ELEMENT_COUNT) {
+					// we got the right number of elements
+					throw new SuccessException();
+				}
+			}
+		});
+
+		tryExecute(env, "Read deletes from Kafka");
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * Test that ensures that DeserializationSchema.isEndOfStream() is properly evaluated
+	 * and that the metrics for the consumer are properly reported.
+	 *
+	 * @throws Exception
+	 */
+	public void runMetricsAndEndOfStreamTest() throws Exception {
+		final String topic = "testEndOfStream";
+		createTestTopic(topic, 1, 1);
+		final int ELEMENT_COUNT = 300;
+
+		// write some data
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.setNumberOfExecutionRetries(0);
+		env.getConfig().disableSysoutLogging();
+
+		writeSequence(env, topic, ELEMENT_COUNT, 1);
+
+		// read using custom schema
+		final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env1.setParallelism(1);
+		env1.setNumberOfExecutionRetries(0);
+		env1.getConfig().disableSysoutLogging();
+
+		DataStream<Tuple2<Integer, Integer>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps));
+		fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() {
+			@Override
+			public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
+				// noop ;)
+			}
+		});
+
+		JobExecutionResult result = tryExecute(env, "Consume " + ELEMENT_COUNT + " elements from Kafka");
+
+		Map<String, Object> accuResults = result.getAllAccumulatorResults();
+		// kafka 0.9 consumer: 39 results
+		if(kafkaServer.getVersion().equals("0.9")) {
+			Assert.assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38);
+		}
+
+		deleteTestTopic(topic);
+	}
+
+	public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> {
+		final int finalCount;
+		int count = 0;
+		TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+		TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig());
+
+		public FixedNumberDeserializationSchema(int finalCount) {
+			this.finalCount = finalCount;
+		}
+
+		@Override
+		public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException {
+			return ser.deserialize(new ByteArrayInputView(message));
+		}
+
+		@Override
+		public boolean isEndOfStream(Tuple2<Integer, Integer> nextElement) {
+			return ++count >= finalCount;
+		}
+
+		@Override
+		public TypeInformation<Tuple2<Integer, Integer>> getProducedType() {
+			return ti;
+		}
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Reading writing test data sets
+	// ------------------------------------------------------------------------
+
+	protected void readSequence(StreamExecutionEnvironment env, Properties cc,
+								final int sourceParallelism,
+								final String topicName,
+								final int valuesCount, final int startFrom) throws Exception {
+		env.getCheckpointConfig().setCheckpointTimeout(5000); // set timeout for checkpoints to 5 seconds
+
+		final int finalCount = valuesCount * sourceParallelism;
+
+		final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+		final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
+				new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
+
+		// create the consumer
+		FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc);
+
+		DataStream<Tuple2<Integer, Integer>> source = env
+				.addSource(consumer).setParallelism(sourceParallelism)
+				.map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
+
+		// verify data
+		source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
+
+			private int[] values = new int[valuesCount];
+			private int count = 0;
+
+			@Override
+			public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
+				values[value.f1 - startFrom]++;
+				count++;
+				LOG.info("Received message {}, total {} messages", value, count);
+
+				// verify if we've seen everything
+				if (count == finalCount) {
+					for (int i = 0; i < values.length; i++) {
+						int v = values[i];
+						if (v != sourceParallelism) {
+							printTopic(topicName, valuesCount, deser);
+							throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
+						}
+					}
+					// test has passed
+					throw new SuccessException();
+				}
+			}
+
+		}).setParallelism(1);
+
+		tryExecute(env, "Read data from Kafka");
+
+		LOG.info("Successfully read sequence for verification");
+	}
+
+	protected void writeSequence(StreamExecutionEnvironment env, String topicName, final int numElements, int parallelism) throws Exception {
+
+		LOG.info("\n===================================\n== Writing sequence of "+numElements+" into "+topicName+" with p="+parallelism+"\n===================================");
+		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+		DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+
+			private boolean running = true;
+
+			@Override
+			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+				int cnt = 0;
+				int partition = getRuntimeContext().getIndexOfThisSubtask();
+
+				while (running && cnt < numElements) {
+					ctx.collect(new Tuple2<>(partition, cnt));
+					cnt++;
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		}).setParallelism(parallelism);
+		
+		stream.addSink(kafkaServer.getProducer(topicName,
+				new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType, env.getConfig())),
+				FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings),
+				new Tuple2Partitioner(parallelism))).setParallelism(parallelism);
+
+		env.execute("Write sequence");
+
+		LOG.info("Finished writing sequence");
+	}
+
+	// ------------------------------------------------------------------------
+	//  Debugging utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Read topic to list, only using Kafka code.
+	 */
+	private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
+		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
+		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
+		// will see each message only once.
+		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
+		if (streams.size() != 1) {
+			throw new RuntimeException("Expected only one message stream but got "+streams.size());
+		}
+		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+		if (kafkaStreams == null) {
+			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+		}
+		if (kafkaStreams.size() != 1) {
+			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+		}
+		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
+		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
+
+		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
+		int read = 0;
+		while(iteratorToRead.hasNext()) {
+			read++;
+			result.add(iteratorToRead.next());
+			if (read == stopAfter) {
+				LOG.info("Read "+read+" elements");
+				return result;
+			}
+		}
+		return result;
+	}
+
+	private static void printTopic(String topicName, ConsumerConfig config,
+								DeserializationSchema<?> deserializationSchema,
+								int stopAfter) throws IOException {
+
+		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
+		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
+
+		for (MessageAndMetadata<byte[], byte[]> message: contents) {
+			Object out = deserializationSchema.deserialize(message.message());
+			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
+		}
+	}
+
+	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) 
+			throws IOException
+	{
+		// write the sequence to log for debugging purposes
+		Properties stdProps = standardCC.props().props();
+		Properties newProps = new Properties(stdProps);
+		newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
+		newProps.setProperty("auto.offset.reset", "smallest");
+		newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
+
+		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
+		printTopic(topicName, printerConfig, deserializer, elements);
+	}
+
+
+	public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
+			implements Checkpointed<Integer>, CheckpointNotifier {
+
+		private static final long serialVersionUID = 6334389850158707313L;
+
+		public static volatile boolean killedLeaderBefore;
+		public static volatile boolean hasBeenCheckpointedBeforeFailure;
+		
+		private final int shutdownBrokerId;
+		private final int failCount;
+		private int numElementsTotal;
+
+		private boolean failer;
+		private boolean hasBeenCheckpointed;
+
+
+		public BrokerKillingMapper(int shutdownBrokerId, int failCount) {
+			this.shutdownBrokerId = shutdownBrokerId;
+			this.failCount = failCount;
+		}
+
+		@Override
+		public void open(Configuration parameters) {
+			failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+		}
+
+		@Override
+		public T map(T value) throws Exception {
+			numElementsTotal++;
+			
+			if (!killedLeaderBefore) {
+				Thread.sleep(10);
+				
+				if (failer && numElementsTotal >= failCount) {
+					// shut down a Kafka broker
+					KafkaServer toShutDown = null;
+					for (KafkaServer server : kafkaServer.getBrokers()) {
+
+						if (kafkaServer.getBrokerId(server) == shutdownBrokerId) {
+							toShutDown = server;
+							break;
+						}
+					}
+	
+					if (toShutDown == null) {
+						StringBuilder listOfBrokers = new StringBuilder();
+						for (KafkaServer server : kafkaServer.getBrokers()) {
+							listOfBrokers.append(kafkaServer.getBrokerId(server));
+							listOfBrokers.append(" ; ");
+						}
+						
+						throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId
+								+ " ; available brokers: " + listOfBrokers.toString());
+					}
+					else {
+						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+						killedLeaderBefore = true;
+						toShutDown.shutdown();
+					}
+				}
+			}
+			return value;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			hasBeenCheckpointed = true;
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return numElementsTotal;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			this.numElementsTotal = state;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
new file mode 100644
index 0000000..228f3ac
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.test.util.SuccessException;
+
+
+import java.io.Serializable;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public abstract class KafkaProducerTestBase extends KafkaTestBase {
+
+
+	/**
+	 * 
+	 * <pre>
+	 *             +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
+	 *            /                  |                                       \
+	 *           /                   |                                        \
+	 * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
+	 *           \                   |                                        /
+	 *            \                  |                                       /
+	 *             +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
+	 * </pre>
+	 * 
+	 * The mapper validates that the values come consistently from the correct Kafka partition.
+	 * 
+	 * The final sink validates that there are no duplicates and that all partitions are present.
+	 */
+	public void runCustomPartitioningTest() {
+		try {
+			LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
+
+			final String topic = "customPartitioningTestTopic";
+			final int parallelism = 3;
+			
+			createTestTopic(topic, parallelism, 1);
+
+			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
+
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env.setNumberOfExecutionRetries(0);
+			env.getConfig().disableSysoutLogging();
+
+			TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
+					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+
+			TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
+					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+
+			// ------ producing topology ---------
+			
+			// source has DOP 1 to make sure it generates no duplicates
+			DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+
+				private boolean running = true;
+
+				@Override
+				public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
+					long cnt = 0;
+					while (running) {
+						ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
+						cnt++;
+					}
+				}
+
+				@Override
+				public void cancel() {
+					running = false;
+				}
+			})
+			.setParallelism(1);
+			
+			// sink partitions into 
+			stream.addSink(kafkaServer.getProducer(topic,
+					new KeyedSerializationSchemaWrapper<>(serSchema),
+					FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings),
+					new CustomPartitioner(parallelism)))
+			.setParallelism(parallelism);
+
+			// ------ consuming topology ---------
+			
+			FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, standardProps);
+			
+			env.addSource(source).setParallelism(parallelism)
+
+					// mapper that validates partitioning and maps to partition
+					.map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
+						
+						private int ourPartition = -1;
+						@Override
+						public Integer map(Tuple2<Long, String> value) {
+							int partition = value.f0.intValue() % parallelism;
+							if (ourPartition != -1) {
+								assertEquals("inconsistent partitioning", ourPartition, partition);
+							} else {
+								ourPartition = partition;
+							}
+							return partition;
+						}
+					}).setParallelism(parallelism)
+					
+					.addSink(new SinkFunction<Integer>() {
+						
+						private int[] valuesPerPartition = new int[parallelism];
+						
+						@Override
+						public void invoke(Integer value) throws Exception {
+							valuesPerPartition[value]++;
+							
+							boolean missing = false;
+							for (int i : valuesPerPartition) {
+								if (i < 100) {
+									missing = true;
+									break;
+								}
+							}
+							if (!missing) {
+								throw new SuccessException();
+							}
+						}
+					}).setParallelism(1);
+			
+			tryExecute(env, "custom partitioning test");
+
+			deleteTestTopic(topic);
+			
+			LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
+
+		private final int expectedPartitions;
+
+		public CustomPartitioner(int expectedPartitions) {
+			this.expectedPartitions = expectedPartitions;
+		}
+
+
+		@Override
+		public int partition(Object next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+			Tuple2<Long, String> tuple = (Tuple2<Long, String>) next;
+
+			assertEquals(expectedPartitions, numPartitions);
+
+			return (int) (tuple.f0 % numPartitions);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
new file mode 100644
index 0000000..73cd2f9
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.consumer.ConsumerConfig;
+
+
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * The base for the Kafka tests. It brings up:
+ * <ul>
+ *     <li>A ZooKeeper mini cluster</li>
+ *     <li>Three Kafka Brokers (mini clusters)</li>
+ *     <li>A Flink mini cluster</li>
+ * </ul>
+ * 
+ * <p>Code in this test is based on the following GitHub repository:
+ * <a href="https://github.com/sakserv/hadoop-mini-clusters">
+ *   https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
+ * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
+ */
+@SuppressWarnings("serial")
+public abstract class KafkaTestBase extends TestLogger {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
+	
+	protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
+
+	protected static String brokerConnectionStrings;
+
+	protected static ConsumerConfig standardCC;
+	protected static Properties standardProps;
+	
+	protected static ForkableFlinkMiniCluster flink;
+
+	protected static int flinkPort;
+
+	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+	protected static KafkaTestEnvironment kafkaServer;
+
+	// ------------------------------------------------------------------------
+	//  Setup and teardown of the mini clusters
+	// ------------------------------------------------------------------------
+	
+	@BeforeClass
+	public static void prepare() throws IOException, ClassNotFoundException {
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting KafkaITCase ");
+		LOG.info("-------------------------------------------------------------------------");
+		
+
+
+		// dynamically load the implementation for the test
+		Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+		kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
+
+		LOG.info("Starting KafkaITCase.prepare() for Kafka " + kafkaServer.getVersion());
+
+		kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS);
+
+		standardProps = kafkaServer.getStandardProperties();
+		standardCC = kafkaServer.getStandardConsumerConfig();
+		brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
+
+		// start also a re-usable Flink mini cluster
+		Configuration flinkConfig = new Configuration();
+		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+		flinkConfig.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 s");
+
+		flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+		flink.start();
+
+		flinkPort = flink.getLeaderRPCPort();
+	}
+
+	@AfterClass
+	public static void shutDownServices() {
+
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Shut down KafkaITCase ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		flinkPort = -1;
+		if (flink != null) {
+			flink.shutdown();
+		}
+
+		kafkaServer.shutdown();
+
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    KafkaITCase finished"); 
+		LOG.info("-------------------------------------------------------------------------");
+	}
+
+
+
+	// ------------------------------------------------------------------------
+	//  Execution utilities
+	// ------------------------------------------------------------------------
+	
+
+	protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
+		try {
+			see.execute(name);
+		}
+		catch (ProgramInvocationException | JobExecutionException root) {
+			Throwable cause = root.getCause();
+
+			// search for nested SuccessExceptions
+			int depth = 0;
+			while (!(cause instanceof SuccessException)) {
+				if (cause == null || depth++ == 20) {
+					throw root;
+				}
+				else {
+					cause = cause.getCause();
+				}
+			}
+		}
+	}
+
+	protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
+		kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor);
+	}
+	
+	protected static void deleteTestTopic(String topic) {
+		kafkaServer.deleteTestTopic(topic);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
new file mode 100644
index 0000000..40be8a1
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.server.KafkaServer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Abstract class providing a Kafka test environment
+ */
+public abstract class KafkaTestEnvironment {
+
+	protected static final String KAFKA_HOST = "localhost";
+
+	public abstract void prepare(int numKafkaServers);
+
+	public abstract void shutdown();
+
+	public abstract void deleteTestTopic(String topic);
+
+	public abstract void createTestTopic(String topic, int numberOfPartitions, int replicationFactor);
+
+	public abstract ConsumerConfig getStandardConsumerConfig();
+
+	public abstract Properties getStandardProperties();
+
+	public abstract String getBrokerConnectionString();
+
+	public abstract String getVersion();
+
+	public abstract List<KafkaServer> getBrokers();
+
+	// -- consumer / producer instances:
+	public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, DeserializationSchema<T> deserializationSchema, Properties props) {
+		return getConsumer(topics, new KeyedDeserializationSchemaWrapper<T>(deserializationSchema), props);
+	}
+
+	public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, KeyedDeserializationSchema<T> readSchema, Properties props) {
+		return getConsumer(Collections.singletonList(topic), readSchema, props);
+	}
+
+	public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
+		return getConsumer(Collections.singletonList(topic), deserializationSchema, props);
+	}
+
+	public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);
+
+	public abstract <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner);
+
+
+	// -- leader failure simulation
+
+	public abstract void restartBroker(int leaderId) throws Exception;
+
+	public abstract int getLeaderToShutDown(String topic) throws Exception;
+
+	public abstract int getBrokerId(KafkaServer server);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
new file mode 100644
index 0000000..5dab05a
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+
+public class TestFixedPartitioner {
+
+
+	/**
+	 * <pre>
+	 *   		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2   --------------/
+	 * 			3   -------------/
+	 * 			4	------------/
+	 * </pre>
+	 */
+	@Test
+	public void testMoreFlinkThanBrokers() {
+		FixedPartitioner<String> part = new FixedPartitioner<>();
+
+		int[] partitions = new int[]{0};
+
+		part.open(0, 4, partitions);
+		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
+
+		part.open(1, 4, partitions);
+		Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length));
+
+		part.open(2, 4, partitions);
+		Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length));
+		Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;)
+
+		part.open(3, 4, partitions);
+		Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length));
+	}
+
+	/**
+	 *
+	 * <pre>
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2	---------------->	2
+	 * 									3
+	 * 									4
+	 * 									5
+	 *
+	 * </pre>
+	 */
+	@Test
+	public void testFewerPartitions() {
+		FixedPartitioner<String> part = new FixedPartitioner<>();
+
+		int[] partitions = new int[]{0, 1, 2, 3, 4};
+		part.open(0, 2, partitions);
+		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
+		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
+
+		part.open(1, 2, partitions);
+		Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
+		Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
+	}
+
+	/*
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	------------>--->	1
+	 * 			2	-----------/----> 	2
+	 * 			3	----------/
+	 */
+	@Test
+	public void testMixedCase() {
+		FixedPartitioner<String> part = new FixedPartitioner<>();
+		int[] partitions = new int[]{0,1};
+
+		part.open(0, 3, partitions);
+		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
+
+		part.open(1, 3, partitions);
+		Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
+
+		part.open(2, 3, partitions);
+		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
new file mode 100644
index 0000000..cc237b6
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import java.util.Random;
+
+@SuppressWarnings("serial")
+public class DataGenerators {
+	
+	public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
+													   KafkaTestEnvironment testServer, String topic,
+													   int numPartitions,
+													   final int from, final int to) throws Exception {
+
+		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+		env.setParallelism(numPartitions);
+		env.getConfig().disableSysoutLogging();
+		env.setNumberOfExecutionRetries(0);
+		
+		DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
+				new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+
+					private volatile boolean running = true;
+
+					@Override
+					public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+						int cnt = from;
+						int partition = getRuntimeContext().getIndexOfThisSubtask();
+
+						while (running && cnt <= to) {
+							ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
+							cnt++;
+						}
+					}
+
+					@Override
+					public void cancel() {
+						running = false;
+					}
+				});
+
+		stream.addSink(testServer.getProducer(topic,
+				new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType, env.getConfig())),
+				FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
+				new Tuple2Partitioner(numPartitions)
+		));
+
+		env.execute("Data generator (Int, Int) stream to topic " + topic);
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
+														 KafkaTestEnvironment testServer, String topic,
+														 final int numPartitions,
+														 final int numElements,
+														 final boolean randomizeOrder) throws Exception {
+		env.setParallelism(numPartitions);
+		env.getConfig().disableSysoutLogging();
+		env.setNumberOfExecutionRetries(0);
+
+		DataStream<Integer> stream = env.addSource(
+				new RichParallelSourceFunction<Integer>() {
+
+					private volatile boolean running = true;
+
+					@Override
+					public void run(SourceContext<Integer> ctx) {
+						// create a sequence
+						int[] elements = new int[numElements];
+						for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
+								i < numElements;
+								i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
+							
+							elements[i] = val;
+						}
+
+						// scramble the sequence
+						if (randomizeOrder) {
+							Random rnd = new Random();
+							for (int i = 0; i < elements.length; i++) {
+								int otherPos = rnd.nextInt(elements.length);
+								
+								int tmp = elements[i];
+								elements[i] = elements[otherPos];
+								elements[otherPos] = tmp;
+							}
+						}
+
+						// emit the sequence
+						int pos = 0;
+						while (running && pos < elements.length) {
+							ctx.collect(elements[pos++]);
+						}
+					}
+
+					@Override
+					public void cancel() {
+						running = false;
+					}
+				});
+
+		stream
+				.rebalance()
+				.addSink(testServer.getProducer(topic,
+						new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
+						FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
+						new KafkaPartitioner<Integer>() {
+							@Override
+							public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+								return next % numPartitions;
+							}
+						}));
+
+		env.execute("Scrambles int sequence generator");
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	public static class InfiniteStringsGenerator extends Thread {
+
+		private final KafkaTestEnvironment server;
+		
+		private final String topic;
+		
+		private volatile Throwable error;
+		
+		private volatile boolean running = true;
+
+		
+		public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) {
+			this.server = server;
+			this.topic = topic;
+		}
+
+		@Override
+		public void run() {
+			// we manually feed data into the Kafka sink
+			FlinkKafkaProducerBase<String> producer = null;
+			try {
+				producer = server.getProducer(topic,
+						new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+						FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString()), new FixedPartitioner<String>());
+				producer.setRuntimeContext(new MockRuntimeContext(1,0));
+				producer.open(new Configuration());
+				
+				final StringBuilder bld = new StringBuilder();
+				final Random rnd = new Random();
+				
+				while (running) {
+					bld.setLength(0);
+					
+					int len = rnd.nextInt(100) + 1;
+					for (int i = 0; i < len; i++) {
+						bld.append((char) (rnd.nextInt(20) + 'a') );
+					}
+					
+					String next = bld.toString();
+					producer.invoke(next);
+				}
+			}
+			catch (Throwable t) {
+				this.error = t;
+			}
+			finally {
+				if (producer != null) {
+					try {
+						producer.close();
+					}
+					catch (Throwable t) {
+						// ignore
+					}
+				}
+			}
+		}
+		
+		public void shutdown() {
+			this.running = false;
+			this.interrupt();
+		}
+		
+		public Throwable getError() {
+			return this.error;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
new file mode 100644
index 0000000..987e6c5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+/**
+ * Sink function that discards data.
+ * @param <T> The type of the function.
+ */
+public class DiscardingSink<T> implements SinkFunction<T> {
+
+	private static final long serialVersionUID = 2777597566520109843L;
+
+	@Override
+	public void invoke(T value) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
new file mode 100644
index 0000000..5a8ffaa
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
+		Checkpointed<Integer>, CheckpointNotifier, Runnable {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
+	
+	private static final long serialVersionUID = 6334389850158707313L;
+	
+	public static volatile boolean failedBefore;
+	public static volatile boolean hasBeenCheckpointedBeforeFailure;
+
+	private final int failCount;
+	private int numElementsTotal;
+	private int numElementsThisTime;
+	
+	private boolean failer;
+	private boolean hasBeenCheckpointed;
+	
+	private Thread printer;
+	private volatile boolean printerRunning = true;
+
+	public FailingIdentityMapper(int failCount) {
+		this.failCount = failCount;
+	}
+
+	@Override
+	public void open(Configuration parameters) {
+		failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+		printer = new Thread(this, "FailingIdentityMapper Status Printer");
+		printer.start();
+	}
+
+	@Override
+	public T map(T value) throws Exception {
+		numElementsTotal++;
+		numElementsThisTime++;
+		
+		if (!failedBefore) {
+			Thread.sleep(10);
+			
+			if (failer && numElementsTotal >= failCount) {
+				hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+				failedBefore = true;
+				throw new Exception("Artificial Test Failure");
+			}
+		}
+		return value;
+	}
+
+	@Override
+	public void close() throws Exception {
+		printerRunning = false;
+		if (printer != null) {
+			printer.interrupt();
+			printer = null;
+		}
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) {
+		this.hasBeenCheckpointed = true;
+	}
+
+	@Override
+	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+		return numElementsTotal;
+	}
+
+	@Override
+	public void restoreState(Integer state) {
+		numElementsTotal = state;
+	}
+
+	@Override
+	public void run() {
+		while (printerRunning) {
+			try {
+				Thread.sleep(5000);
+			}
+			catch (InterruptedException e) {
+				// ignore
+			}
+			LOG.info("============================> Failing mapper  {}: count={}, totalCount={}",
+					getRuntimeContext().getIndexOfThisSubtask(),
+					numElementsThisTime, numElementsTotal);
+		}
+	}
+}


Mime
View raw message