flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [2/2] flink git commit: [FLINK-1753] [streaming] Added test for Kafka connector with tuple type
Date Thu, 02 Apr 2015 08:13:54 GMT
[FLINK-1753] [streaming] Added test for Kafka connector with tuple type

This closes #557


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/359b39c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/359b39c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/359b39c3

Branch: refs/heads/master
Commit: 359b39c383347b14c1a3e382d59717ac1be1b222
Parents: 7cf9586
Author: Gábor Hermann <reckoner42@gmail.com>
Authored: Wed Apr 1 14:51:04 2015 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Thu Apr 2 09:21:23 2015 +0200

----------------------------------------------------------------------
 .../connectors/kafka/api/KafkaSink.java         |   9 +-
 .../connectors/kafka/api/KafkaSource.java       |  20 +
 .../kafka/api/config/PartitionerWrapper.java    |   2 +-
 .../streaming/connectors/kafka/KafkaITCase.java | 428 ++++++++++++++++---
 4 files changed, 406 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index f1dbc8c..0bbf9a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -34,7 +34,6 @@ import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import kafka.serializer.DefaultEncoder;
-import kafka.serializer.StringEncoder;
 
 /**
  * Sink that emits its inputs to a Kafka topic.
@@ -123,7 +122,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 		props.put("request.required.acks", "1");
 
 		props.put("serializer.class", DefaultEncoder.class.getCanonicalName());
-		props.put("key.serializer.class", StringEncoder.class.getCanonicalName());
+
+		// this will not be used as the key will not be serialized
+		props.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
 
 		if (partitioner != null) {
 			props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
@@ -152,7 +153,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	@Override
 	public void invoke(IN next) {
 		byte[] serialized = schema.serialize(next);
-		producer.send(new KeyedMessage<IN, byte[]>(topicId, next, serialized));
+
+		// Sending message without serializable key.
+		producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 4a6da3b..a0805c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -66,6 +66,26 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT>
{
 
 	/**
 	 * Creates a KafkaSource that consumes a topic.
+	 *
+	 * @param zookeeperAddress
+	 *            Address of the Zookeeper host (with port number).
+	 * @param topicId
+	 *            ID of the Kafka topic.
+	 * @param groupId
+	 * 			   ID of the consumer group.
+	 * @param deserializationSchema
+	 *            User defined deserialization schema.
+	 * @param zookeeperSyncTimeMillis
+	 *            Synchronization time with zookeeper.
+	 */
+	public KafkaSource(String zookeeperAddress,
+					String topicId, String groupId,
+					DeserializationSchema<OUT> deserializationSchema,
+					long zookeeperSyncTimeMillis) {
+		this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis,
null);
+	}
+	/**
+	 * Creates a KafkaSource that consumes a topic.
 	 * 
 	 * @param zookeeperAddress
 	 *            Address of the Zookeeper host (with port number).

http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
index f9dd21f..7ae17df 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
@@ -32,7 +32,7 @@ import kafka.utils.VerifiableProperties;
  *
  * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped
Partitioner instance.
  *
- * The serialziable PartitionerWrapper is serialized into the Properties Hashmap and also
deserialized from there.
+ * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also
deserialized from there.
  */
 public class PartitionerWrapper implements Partitioner {
 	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";

http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 95609f9..9344722 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -24,7 +27,9 @@ import java.net.UnknownHostException;
 import java.util.BitSet;
 import java.util.Properties;
 
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -33,14 +38,19 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
+import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -61,74 +71,390 @@ public class KafkaITCase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
 
-	private final String TOPIC = "myTopic";
+	private static int zkPort;
+	private static int kafkaPort;
+	private static String kafkaHost;
+
+	private static String zookeeperConnectionString;
 
-	private int zkPort;
-	private int kafkaPort;
-	private String kafkaHost;
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+	public static File tmpZkDir;
+	public static File tmpKafkaDir;
 
-	private String zookeeperConnectionString;
+	private static TestingServer zookeeper;
+	private static KafkaServer broker1;
 
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-	public File tmpZkDir;
-	public File tmpKafkaDir;
 
-	@Before
-	public void prepare() throws IOException {
+	@BeforeClass
+	public static void prepare() throws IOException {
+		LOG.info("Starting KafkaITCase.prepare()");
 		tmpZkDir = tempFolder.newFolder();
 		tmpKafkaDir = tempFolder.newFolder();
 		kafkaHost = InetAddress.getLocalHost().getHostName();
 		zkPort = NetUtils.getAvailablePort();
 		kafkaPort = NetUtils.getAvailablePort();
 		zookeeperConnectionString = "localhost:" + zkPort;
-	}
 
-	@Test
-	public void test() {
-		LOG.info("Starting KafkaITCase.test()");
-		TestingServer zookeeper = null;
-		KafkaServer broker1 = null;
+		zookeeper = null;
+		broker1 = null;
+
 		try {
+			LOG.info("Starting Zookeeper");
 			zookeeper = getZookeeper();
+			LOG.info("Starting KafkaServer");
 			broker1 = getKafkaServer(0);
-			LOG.info("ZK and KafkaServer started. Creating test topic:");
-			createTestTopic();
-
-			LOG.info("Starting Kafka Topology in Flink:");
-			startKafkaTopology();
-
-			LOG.info("Test succeeded.");
+			LOG.info("ZK and KafkaServer started.");
 		} catch (Throwable t) {
 			LOG.warn("Test failed with exception", t);
 			Assert.fail("Test failed with: " + t.getMessage());
-		} finally {
-			LOG.info("Shutting down all services");
-			if (broker1 != null) {
-				broker1.shutdown();
+		}
+	}
+
+	@AfterClass
+	public static void shutDownServices() {
+		LOG.info("Shutting down all services");
+		if (broker1 != null) {
+			broker1.shutdown();
+		}
+		if (zookeeper != null) {
+			try {
+				zookeeper.stop();
+			} catch (IOException e) {
+				LOG.warn("ZK.stop() failed", e);
+			}
+		}
+	}
+
+	@Test
+	public void regularKafkaSourceTest() throws Exception {
+		LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
+
+		String topic = "regularKafkaSourceTestTopic";
+		createTestTopic(topic, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+				new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup",
new TupleSerializationSchema(), 5000));
+		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+			int elCnt = 0;
+			int start = -1;
+			BitSet validator = new BitSet(101);
+
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				LOG.info("Got " + value);
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				validator.set(v - start);
+				elCnt++;
+				if (elCnt == 100) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != 100) {
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:"
+ nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
 			}
-			if (zookeeper != null) {
-				try {
-					zookeeper.stop();
-				} catch (IOException e) {
-					LOG.warn("ZK.stop() failed", e);
+		});
+
+		// add producing topology
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long,
String>>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+
+			@Override
+			public void run(Collector<Tuple2<Long, String>> collector) throws Exception
{
+				LOG.info("Starting source.");
+				int cnt = 0;
+				while (running) {
+					collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+					try {
+						Thread.sleep(100);
+					} catch (InterruptedException ignored) {
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				LOG.info("Source got cancel()");
+				running = false;
+			}
+		});
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString,
topic, new TupleSerializationSchema()));
+
+		try {
+			env.setParallelism(1);
+			env.execute();
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				t = t.getCause();
+				if (limit++ == 20) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
 				}
 			}
 		}
 
+		LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
 	}
 
-	private void createTestTopic() {
-		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
-		kafkaTopicUtils.createTopic(TOPIC, 1, 1);
+	@Test
+	public void tupleTestTopology() throws Exception {
+		LOG.info("Starting KafkaITCase.tupleTestTopology()");
+
+		String topic = "tupleTestTopic";
+		createTestTopic(topic, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+				new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString,
topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
+		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+			int elCnt = 0;
+			int start = -1;
+			BitSet validator = new BitSet(101);
+
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				LOG.info("Got " + value);
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				validator.set(v - start);
+				elCnt++;
+				if (elCnt == 100) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != 100) {
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:"
+ nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
+			}
+		});
+
+		// add producing topology
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long,
String>>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+
+			@Override
+			public void run(Collector<Tuple2<Long, String>> collector) throws Exception
{
+				LOG.info("Starting source.");
+				int cnt = 0;
+				while (running) {
+					collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+					try {
+						Thread.sleep(100);
+					} catch (InterruptedException ignored) {
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				LOG.info("Source got cancel()");
+				running = false;
+			}
+		});
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString,
topic, new TupleSerializationSchema()));
+
+		try {
+			env.setParallelism(1);
+			env.execute();
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				t = t.getCause();
+				if (limit++ == 20) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
+				}
+			}
+		}
+
+		LOG.info("Finished KafkaITCase.tupleTestTopology()");
 	}
 
-	private void startKafkaTopology() throws Exception {
+	private static boolean partitionerHasBeenCalled = false;
+
+	@Test
+	public void customPartitioningTestTopology() throws Exception {
+		LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
+
+		String topic = "customPartitioningTestTopic";
+		
+		createTestTopic(topic, 3);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+				new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString,
topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
+		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+			int start = -1;
+			BitSet validator = new BitSet(101);
+
+			boolean gotPartition1 = false;
+			boolean gotPartition2 = false;
+			boolean gotPartition3 = false;
+
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				LOG.info("Got " + value);
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				switch (v) {
+					case 9:
+						gotPartition1 = true;
+						break;
+					case 19:
+						gotPartition2 = true;
+						break;
+					case 99:
+						gotPartition3 = true;
+						break;
+				}
+
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				validator.set(v - start);
+
+				if (gotPartition1 && gotPartition2 && gotPartition3) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != 100) {
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:"
+ nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
+			}
+		});
+
+		// add producing topology
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long,
String>>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+
+			@Override
+			public void run(Collector<Tuple2<Long, String>> collector) throws Exception
{
+				LOG.info("Starting source.");
+				int cnt = 0;
+				while (running) {
+					collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+					try {
+						Thread.sleep(100);
+					} catch (InterruptedException ignored) {
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				LOG.info("Source got cancel()");
+				running = false;
+			}
+		});
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString,
topic, new TupleSerializationSchema(), new CustomPartitioner()));
+
+		try {
+			env.setParallelism(1);
+			env.execute();
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				t = t.getCause();
+				if (limit++ == 20) {
+					throw good;
+				}
+			}
+
+			assertTrue(partitionerHasBeenCalled);
+		}
+
+		LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
+	}
+
+	/**
+	 * This is for a topic with 3 partitions and Tuple2<Long, String>
+	 */
+	private static class CustomPartitioner implements SerializableKafkaPartitioner {
+
+		@Override
+		public int partition(Object key, int numPartitions) {
+			partitionerHasBeenCalled = true;
+
+			Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
+			if (tuple.f0 < 10) {
+				return 0;
+			} else if (tuple.f0 < 20) {
+				return 1;
+			} else {
+				return 2;
+			}
+		}
+	}
+
+	private static class TupleSerializationSchema implements DeserializationSchema<Tuple2<Long,
String>>, SerializationSchema<Tuple2<Long, String>, byte[]> {
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public Tuple2<Long, String> deserialize(byte[] message) {
+			Object deserializedObject = SerializationUtils.deserialize(message);
+			return (Tuple2<Long, String>) deserializedObject;
+		}
+
+		@Override
+		public byte[] serialize(Tuple2<Long, String> element) {
+			return SerializationUtils.serialize(element);
+		}
+
+		@Override
+		public boolean isEndOfStream(Tuple2<Long, String> nextElement) {
+			return false;
+		}
+
+	}
+
+	@Test
+	public void simpleTestTopology() throws Exception {
+		String topic = "simpleTestTopic";
+
+		createTestTopic(topic, 1);
+
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		// add consuming topology:
 		DataStreamSource<String> consuming = env.addSource(
-				new PersistentKafkaSource<String>(zookeeperConnectionString, TOPIC, new JavaDefaultStringSchema(),
5000, 100, Offset.FROM_BEGINNING));
+				new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(),
5000, 100, Offset.FROM_BEGINNING));
 		consuming.addSink(new SinkFunction<String>() {
 			int elCnt = 0;
 			int start = -1;
@@ -176,11 +502,11 @@ public class KafkaITCase {
 
 			@Override
 			public void cancel() {
-				LOG.info("Source got chancel()");
+				LOG.info("Source got cancel()");
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<String>(zookeeperConnectionString, TOPIC, new JavaDefaultStringSchema()));
+		stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema()));
 
 		try {
 			env.setParallelism(1);
@@ -191,21 +517,28 @@ public class KafkaITCase {
 			while (!(t instanceof SuccessException)) {
 				t = t.getCause();
 				if (limit++ == 20) {
-					throw good;
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
 				}
 			}
 		}
 	}
 
 
-	private TestingServer getZookeeper() throws Exception {
+	private void createTestTopic(String topic, int numberOfPartitions) {
+		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
+		kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1);
+	}
+
+
+	private static TestingServer getZookeeper() throws Exception {
 		return new TestingServer(zkPort, tmpZkDir);
 	}
 
 	/**
 	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
 	 */
-	private KafkaServer getKafkaServer(int brokerId) throws UnknownHostException {
+	private static KafkaServer getKafkaServer(int brokerId) throws UnknownHostException {
 		Properties kafkaProperties = new Properties();
 		// properties have to be Strings
 		kafkaProperties.put("advertised.host.name", kafkaHost);
@@ -220,13 +553,12 @@ public class KafkaITCase {
 		return server;
 	}
 
-	public class LocalSystemTime implements Time {
+	public static class LocalSystemTime implements Time {
 
 		@Override
 		public long milliseconds() {
 			return System.currentTimeMillis();
 		}
-
 		public long nanoseconds() {
 			return System.nanoTime();
 		}
@@ -243,9 +575,7 @@ public class KafkaITCase {
 	}
 
 	public static class SuccessException extends Exception {
-
 		private static final long serialVersionUID = 1L;
-
 	}
 
 }


Mime
View raw message