flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-2008] Fix broker failure test case
Date Fri, 15 May 2015 07:52:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master bd7d86793 -> 705ee8abf


[FLINK-2008] Fix broker failure test case

This closes #675


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/705ee8ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/705ee8ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/705ee8ab

Branch: refs/heads/master
Commit: 705ee8abf2b0ee97b90c827d106d1136621a31cc
Parents: bd7d867
Author: Robert Metzger <rmetzger@apache.org>
Authored: Wed May 13 09:34:37 2015 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri May 15 09:51:11 2015 +0200

----------------------------------------------------------------------
 .../api/persistent/PersistentKafkaSource.java   |   8 +-
 .../streaming/connectors/kafka/KafkaITCase.java | 220 +++++++++++++------
 tools/log4j-travis.properties                   |   2 +-
 3 files changed, 152 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/705ee8ab/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 50ee15a..b3e80f3 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -162,18 +162,18 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 					LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
 					continue;
 				}
+				OUT out = deserializationSchema.deserialize(message.message());
+				if (LOG.isTraceEnabled()) {
+					LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
+				}
 				lastOffsets[message.partition()] = message.offset();
 
-				OUT out = deserializationSchema.deserialize(message.message());
 				if (deserializationSchema.isEndOfStream(out)) {
 					LOG.info("DeserializationSchema signaled end of stream for this source");
 					break;
 				}
 
 				collector.collect(out);
-				if (LOG.isTraceEnabled()) {
-					LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
-				}
 			}
 		} catch(Exception ie) {
 			// this exception is coming from Scala code.

http://git-wip-us.apache.org/repos/asf/flink/blob/705ee8ab/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 246756c..4b87dbd 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -27,16 +27,25 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.UUID;
 
 import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
+import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
 import kafka.network.SocketServer;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -46,25 +55,23 @@ import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
 import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
 import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -100,8 +107,6 @@ public class KafkaITCase {
 	private static List<KafkaServer> brokers;
 	private static String brokerConnectionStrings = "";
 
-	private static boolean shutdownKafkaBroker;
-
 	private static ConsumerConfig standardCC;
 
 	private static ZkClient zkClient;
@@ -218,6 +223,7 @@ public class KafkaITCase {
 		// write a sequence from 0 to 99 to each of the three partitions.
 		writeSequence(env, topicName, 0, 99);
 
+
 		readSequence(env, standardCC, topicName, 0, 100, 300);
 
 		// check offsets
@@ -242,7 +248,7 @@ public class KafkaITCase {
 		LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
 	}
 
-	private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, String topicName,
final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
+	private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String
topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws
Exception {
 		LOG.info("Reading sequence for verification until final count {}", finalCount);
 		DataStream<Tuple2<Integer, Integer>> source = env.addSource(
 				new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer,
Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
@@ -252,7 +258,7 @@ public class KafkaITCase {
 		.map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>()
{
 			@Override
 			public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws
Exception {
-				Thread.sleep(75);
+				Thread.sleep(100);
 				return value;
 			}
 		}).setParallelism(3);
@@ -275,6 +281,8 @@ public class KafkaITCase {
 					for (int i = 0; i < values.length; i++) {
 						int v = values[i];
 						if (v != 3) {
+							LOG.warn("Test is going to fail");
+							printTopic(topicName, valuesCount, this.getRuntimeContext().getExecutionConfig());
 							throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i
+ " array=" + Arrays.toString(values));
 						}
 					}
@@ -340,27 +348,6 @@ public class KafkaITCase {
 		}
 	}
 
-	public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception
{
-		try {
-			see.execute(name);
-		} catch (JobExecutionException good) {
-			Throwable t = good.getCause();
-			int limit = 0;
-			while (!(t instanceof SuccessException)) {
-				if(t == null) {
-					LOG.warn("Test failed with exception", good);
-					Assert.fail("Test failed with: " + good.getMessage());
-				}
-				
-				t = t.getCause();
-				if (limit++ == 20) {
-					LOG.warn("Test failed with exception", good);
-					Assert.fail("Test failed with: " + good.getMessage());
-				}
-			}
-		}
-	}
-
 
 	@Test
 	public void regularKafkaSourceTest() throws Exception {
@@ -800,6 +787,7 @@ public class KafkaITCase {
 	}
 
 	private static boolean leaderHasShutDown = false;
+	private static boolean shutdownKafkaBroker;
 
 	@Test(timeout=60000)
 	public void brokerFailureTest() throws Exception {
@@ -807,9 +795,47 @@ public class KafkaITCase {
 
 		createTestTopic(topic, 2, 2);
 
-	//	KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
-	//	final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
+		// --------------------------- write data to topic ---------------------
+		LOG.info("Writing data to topic {}", topic);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+			boolean running = true;
+
+			@Override
+			public void run(Collector<String> collector) throws Exception {
+				LOG.info("Starting source.");
+				int cnt = 0;
+				while (running) {
+					String msg = "kafka-" + cnt++;
+					collector.collect(msg);
+					LOG.info("sending message = "+msg);
+
+					if ((cnt - 1) % 20 == 0) {
+						LOG.debug("Sending message #{}", cnt - 1);
+					}
+					if(cnt == 200) {
+						LOG.info("Stopping to produce after 200 msgs");
+						break;
+					}
+
+				}
+			}
+
+			@Override
+			public void cancel() {
+				LOG.info("Source got chancel()");
+				running = false;
+			}
+		});
+		stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
+				.setParallelism(1);
+
+		tryExecute(env, "broker failure test - writer");
+
+		// --------------------------- read and let broker fail ---------------------
 
+		LOG.info("Reading data from topic {} and let a broker fail", topic);
 		PartitionMetadata firstPart = null;
 		do {
 			if(firstPart != null) {
@@ -822,6 +848,7 @@ public class KafkaITCase {
 		} while(firstPart.errorCode() != 0);
 
 		final String leaderToShutDown = firstPart.leader().get().connectionString();
+		LOG.info("Leader to shutdown {}", leaderToShutDown);
 
 		final Thread brokerShutdown = new Thread(new Runnable() {
 			@Override
@@ -836,11 +863,7 @@ public class KafkaITCase {
 				}
 
 				for (KafkaServer kafkaServer : brokers) {
-					if (leaderToShutDown.equals(
-							kafkaServer.config().advertisedHostName()
-									+ ":"
-									+ kafkaServer.config().advertisedPort()
-					)) {
+					if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort()))
{
 						LOG.info("Killing Kafka Server {}", leaderToShutDown);
 						kafkaServer.shutdown();
 						leaderHasShutDown = true;
@@ -851,11 +874,8 @@ public class KafkaITCase {
 		});
 		brokerShutdown.start();
 
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
 		// add consuming topology:
-		DataStreamSource<String> consuming = env.addSource(
-				new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
+		DataStreamSource<String> consuming = env.addSource(new PersistentKafkaSource<String>(topic,
new JavaDefaultStringSchema(), standardCC));
 		consuming.setParallelism(1);
 
 		consuming.addSink(new SinkFunction<String>() {
@@ -875,18 +895,24 @@ public class KafkaITCase {
 				if (start == -1) {
 					start = v;
 				}
-				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				int offset = v - start;
+				Assert.assertFalse("Received tuple with value " + offset + " twice", validator.get(offset));
 				if (v - start < 0 && LOG.isWarnEnabled()) {
 					LOG.warn("Not in order: {}", value);
 				}
 
-				validator.set(v - start);
+				validator.set(offset);
 				elCnt++;
 				if (elCnt == 20) {
+					LOG.info("Asking leading broker to shut down");
 					// shut down a Kafka broker
 					shutdownKafkaBroker = true;
 				}
-
+				if (shutdownKafkaBroker) {
+					// we become a bit slower because the shutdown takes some time and we have
+					// only a fixed nubmer of elements to read
+					Thread.sleep(20);
+				}
 				if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
 					if (elCnt >= stopAfterMessages) {
 						// check if everything in the bitset is set to true
@@ -899,44 +925,31 @@ public class KafkaITCase {
 				}
 			}
 		});
+		tryExecute(env, "broker failure test - reader");
 
-		// add producing topology
-		DataStream<String> stream = env.addSource(new SourceFunction<String>() {
-			boolean running = true;
-
-			@Override
-			public void run(Collector<String> collector) throws Exception {
-				LOG.info("Starting source.");
-				int cnt = 0;
-				while (running) {
-					String msg = "kafka-" + cnt++;
-					collector.collect(msg);
-					LOG.info("sending message = "+msg);
-
-					if ((cnt - 1) % 20 == 0) {
-						LOG.debug("Sending message #{}", cnt - 1);
-					}
+	}
 
-					try {
-						Thread.sleep(10);
-					} catch (InterruptedException ignored) {
-					}
+	public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception
{
+		try {
+			see.execute(name);
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				if(t == null) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
 				}
-			}
 
-			@Override
-			public void cancel() {
-				LOG.info("Source got chancel()");
-				running = false;
+				t = t.getCause();
+				if (limit++ == 20) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
+				}
 			}
-		});
-		stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
-				.setParallelism(1);
-
-		tryExecute(env, "broker failure test");
+		}
 	}
 
-
 	private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor)
{
 		// create topic
 		Properties topicConfig = new Properties();
@@ -975,4 +988,65 @@ public class KafkaITCase {
 		private static final long serialVersionUID = 1L;
 	}
 
+
+	// ----------------------- Debugging utilities --------------------
+
+	/**
+	 * Read topic to list, only using Kafka code.
+	 * @return
+	 */
+	private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String
topicName, ConsumerConfig config, final int stopAfter) {
+		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
+		// we request only one stream per consumer instance. Kafka will make sure that each consumer
group
+		// will see each message only once.
+		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
+		if(streams.size() != 1) {
+			throw new RuntimeException("Expected only one message stream but got "+streams.size());
+		}
+		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+		if(kafkaStreams == null) {
+			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+		}
+		if(kafkaStreams.size() != 1) {
+			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+"
streams");
+		}
+		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
+		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
+
+		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[],
byte[]>>();
+		int read = 0;
+		while(iteratorToRead.hasNext()) {
+			read++;
+			result.add(iteratorToRead.next());
+			if(read == stopAfter) {
+				LOG.info("Read "+read+" elements");
+				return result;
+			}
+		}
+		return result;
+	}
+
+	private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema
deserializationSchema, int stopAfter){
+		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName,
config, stopAfter);
+		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
+		for(MessageAndMetadata<byte[], byte[]> message: contents) {
+			Object out = deserializationSchema.deserialize(message.message());
+			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(),
out.toString());
+		}
+	}
+
+	private static void printTopic(String topicName, int elements, ExecutionConfig ec) {
+		// write the sequence to log for debugging purposes
+		Properties stdProps = standardCC.props().props();
+		Properties newProps = new Properties(stdProps);
+		newProps.setProperty("group.id", "topic-printer"+UUID.randomUUID().toString());
+		newProps.setProperty("auto.offset.reset", "smallest");
+		newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
+
+		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
+		DeserializationSchema deserializer = new Utils.TypeInformationSerializationSchema<Tuple2<Integer,
Integer>>(new Tuple2<Integer, Integer>(1,1), ec);
+		printTopic(topicName, printerConfig, deserializer, elements);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/705ee8ab/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 056cf11..69ddcde 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -35,4 +35,4 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
\ No newline at end of file


Mime
View raw message