Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 733AE17569 for ; Fri, 15 May 2015 07:52:11 +0000 (UTC) Received: (qmail 38087 invoked by uid 500); 15 May 2015 07:52:11 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 38049 invoked by uid 500); 15 May 2015 07:52:11 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 38040 invoked by uid 99); 15 May 2015 07:52:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 May 2015 07:52:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EE265E10F7; Fri, 15 May 2015 07:52:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Message-Id: <12d5d6030dae4289b273ebc4faa84cdb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-2008] Fix broker failure test case Date: Fri, 15 May 2015 07:52:10 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master bd7d86793 -> 705ee8abf [FLINK-2008] Fix broker failure test case This closes #675 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/705ee8ab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/705ee8ab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/705ee8ab Branch: refs/heads/master Commit: 705ee8abf2b0ee97b90c827d106d1136621a31cc Parents: bd7d867 Author: Robert Metzger Authored: Wed May 13 09:34:37 2015 +0200 Committer: Robert Metzger Committed: Fri May 15 09:51:11 2015 +0200 ---------------------------------------------------------------------- .../api/persistent/PersistentKafkaSource.java | 8 +- .../streaming/connectors/kafka/KafkaITCase.java | 220 +++++++++++++------ tools/log4j-travis.properties | 2 +- 3 files changed, 152 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/705ee8ab/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java index 50ee15a..b3e80f3 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java @@ -162,18 +162,18 @@ public class PersistentKafkaSource extends RichParallelSourceFunction LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition()); continue; } + OUT out = deserializationSchema.deserialize(message.message()); + if (LOG.isTraceEnabled()) { + LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition()); + } lastOffsets[message.partition()] = message.offset(); - OUT out = deserializationSchema.deserialize(message.message()); if (deserializationSchema.isEndOfStream(out)) { LOG.info("DeserializationSchema signaled end of stream for this source"); break; } collector.collect(out); - if (LOG.isTraceEnabled()) { - LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition()); - } } } catch(Exception ie) { // this exception is coming from Scala code. http://git-wip-us.apache.org/repos/asf/flink/blob/705ee8ab/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 246756c..4b87dbd 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -27,16 +27,25 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.UUID; import kafka.admin.AdminUtils; import kafka.api.PartitionMetadata; +import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; import kafka.network.SocketServer; import org.I0Itec.zkclient.ZkClient; import org.apache.curator.test.TestingServer; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -46,25 +55,23 @@ import org.apache.flink.runtime.net.NetUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.WindowMapFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.helper.Count; import org.apache.flink.streaming.connectors.kafka.api.KafkaSink; import org.apache.flink.streaming.connectors.kafka.api.KafkaSource; import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource; import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema; import org.apache.flink.util.Collector; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -100,8 +107,6 @@ public class KafkaITCase { private static List brokers; private static String brokerConnectionStrings = ""; - private static boolean shutdownKafkaBroker; - private static ConsumerConfig standardCC; private static ZkClient zkClient; @@ -218,6 +223,7 @@ public class KafkaITCase { // write a sequence from 0 to 99 to each of the three partitions. writeSequence(env, topicName, 0, 99); + readSequence(env, standardCC, topicName, 0, 100, 300); // check offsets @@ -242,7 +248,7 @@ public class KafkaITCase { LOG.info("Finished testPersistentSourceWithOffsetUpdates()"); } - private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception { + private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception { LOG.info("Reading sequence for verification until final count {}", finalCount); DataStream> source = env.addSource( new PersistentKafkaSource>(topicName, new Utils.TypeInformationSerializationSchema>(new Tuple2(1,1), env.getConfig()), cc) @@ -252,7 +258,7 @@ public class KafkaITCase { .map(new MapFunction, Tuple2>() { @Override public Tuple2 map(Tuple2 value) throws Exception { - Thread.sleep(75); + Thread.sleep(100); return value; } }).setParallelism(3); @@ -275,6 +281,8 @@ public class KafkaITCase { for (int i = 0; i < values.length; i++) { int v = values[i]; if (v != 3) { + LOG.warn("Test is going to fail"); + printTopic(topicName, valuesCount, this.getRuntimeContext().getExecutionConfig()); throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values)); } } @@ -340,27 +348,6 @@ public class KafkaITCase { } } - public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception { - try { - see.execute(name); - } catch (JobExecutionException good) { - Throwable t = good.getCause(); - int limit = 0; - while (!(t instanceof SuccessException)) { - if(t == null) { - LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + good.getMessage()); - } - - t = t.getCause(); - if (limit++ == 20) { - LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + good.getMessage()); - } - } - } - } - @Test public void regularKafkaSourceTest() throws Exception { @@ -800,6 +787,7 @@ public class KafkaITCase { } private static boolean leaderHasShutDown = false; + private static boolean shutdownKafkaBroker; @Test(timeout=60000) public void brokerFailureTest() throws Exception { @@ -807,9 +795,47 @@ public class KafkaITCase { createTestTopic(topic, 2, 2); - // KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); - // final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString(); + // --------------------------- write data to topic --------------------- + LOG.info("Writing data to topic {}", topic); + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); + + DataStream stream = env.addSource(new SourceFunction() { + boolean running = true; + + @Override + public void run(Collector collector) throws Exception { + LOG.info("Starting source."); + int cnt = 0; + while (running) { + String msg = "kafka-" + cnt++; + collector.collect(msg); + LOG.info("sending message = "+msg); + + if ((cnt - 1) % 20 == 0) { + LOG.debug("Sending message #{}", cnt - 1); + } + if(cnt == 200) { + LOG.info("Stopping to produce after 200 msgs"); + break; + } + + } + } + + @Override + public void cancel() { + LOG.info("Source got chancel()"); + running = false; + } + }); + stream.addSink(new KafkaSink(brokerConnectionStrings, topic, new JavaDefaultStringSchema())) + .setParallelism(1); + + tryExecute(env, "broker failure test - writer"); + + // --------------------------- read and let broker fail --------------------- + LOG.info("Reading data from topic {} and let a broker fail", topic); PartitionMetadata firstPart = null; do { if(firstPart != null) { @@ -822,6 +848,7 @@ public class KafkaITCase { } while(firstPart.errorCode() != 0); final String leaderToShutDown = firstPart.leader().get().connectionString(); + LOG.info("Leader to shutdown {}", leaderToShutDown); final Thread brokerShutdown = new Thread(new Runnable() { @Override @@ -836,11 +863,7 @@ public class KafkaITCase { } for (KafkaServer kafkaServer : brokers) { - if (leaderToShutDown.equals( - kafkaServer.config().advertisedHostName() - + ":" - + kafkaServer.config().advertisedPort() - )) { + if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) { LOG.info("Killing Kafka Server {}", leaderToShutDown); kafkaServer.shutdown(); leaderHasShutDown = true; @@ -851,11 +874,8 @@ public class KafkaITCase { }); brokerShutdown.start(); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); - // add consuming topology: - DataStreamSource consuming = env.addSource( - new PersistentKafkaSource(topic, new JavaDefaultStringSchema(), standardCC)); + DataStreamSource consuming = env.addSource(new PersistentKafkaSource(topic, new JavaDefaultStringSchema(), standardCC)); consuming.setParallelism(1); consuming.addSink(new SinkFunction() { @@ -875,18 +895,24 @@ public class KafkaITCase { if (start == -1) { start = v; } - Assert.assertFalse("Received tuple twice", validator.get(v - start)); + int offset = v - start; + Assert.assertFalse("Received tuple with value " + offset + " twice", validator.get(offset)); if (v - start < 0 && LOG.isWarnEnabled()) { LOG.warn("Not in order: {}", value); } - validator.set(v - start); + validator.set(offset); elCnt++; if (elCnt == 20) { + LOG.info("Asking leading broker to shut down"); // shut down a Kafka broker shutdownKafkaBroker = true; } - + if (shutdownKafkaBroker) { + // we become a bit slower because the shutdown takes some time and we have + // only a fixed nubmer of elements to read + Thread.sleep(20); + } if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed if (elCnt >= stopAfterMessages) { // check if everything in the bitset is set to true @@ -899,44 +925,31 @@ public class KafkaITCase { } } }); + tryExecute(env, "broker failure test - reader"); - // add producing topology - DataStream stream = env.addSource(new SourceFunction() { - boolean running = true; - - @Override - public void run(Collector collector) throws Exception { - LOG.info("Starting source."); - int cnt = 0; - while (running) { - String msg = "kafka-" + cnt++; - collector.collect(msg); - LOG.info("sending message = "+msg); - - if ((cnt - 1) % 20 == 0) { - LOG.debug("Sending message #{}", cnt - 1); - } + } - try { - Thread.sleep(10); - } catch (InterruptedException ignored) { - } + public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception { + try { + see.execute(name); + } catch (JobExecutionException good) { + Throwable t = good.getCause(); + int limit = 0; + while (!(t instanceof SuccessException)) { + if(t == null) { + LOG.warn("Test failed with exception", good); + Assert.fail("Test failed with: " + good.getMessage()); } - } - @Override - public void cancel() { - LOG.info("Source got chancel()"); - running = false; + t = t.getCause(); + if (limit++ == 20) { + LOG.warn("Test failed with exception", good); + Assert.fail("Test failed with: " + good.getMessage()); + } } - }); - stream.addSink(new KafkaSink(brokerConnectionStrings, topic, new JavaDefaultStringSchema())) - .setParallelism(1); - - tryExecute(env, "broker failure test"); + } } - private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { // create topic Properties topicConfig = new Properties(); @@ -975,4 +988,65 @@ public class KafkaITCase { private static final long serialVersionUID = 1L; } + + // ----------------------- Debugging utilities -------------------- + + /** + * Read topic to list, only using Kafka code. + * @return + */ + private static List> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) { + ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config); + // we request only one stream per consumer instance. Kafka will make sure that each consumer group + // will see each message only once. + Map topicCountMap = Collections.singletonMap(topicName, 1); + Map>> streams = consumerConnector.createMessageStreams(topicCountMap); + if(streams.size() != 1) { + throw new RuntimeException("Expected only one message stream but got "+streams.size()); + } + List> kafkaStreams = streams.get(topicName); + if(kafkaStreams == null) { + throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString()); + } + if(kafkaStreams.size() != 1) { + throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams"); + } + LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId()); + ConsumerIterator iteratorToRead = kafkaStreams.get(0).iterator(); + + List> result = new ArrayList>(); + int read = 0; + while(iteratorToRead.hasNext()) { + read++; + result.add(iteratorToRead.next()); + if(read == stopAfter) { + LOG.info("Read "+read+" elements"); + return result; + } + } + return result; + } + + private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema deserializationSchema, int stopAfter){ + List> contents = readTopicToList(topicName, config, stopAfter); + LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId()); + for(MessageAndMetadata message: contents) { + Object out = deserializationSchema.deserialize(message.message()); + LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString()); + } + } + + private static void printTopic(String topicName, int elements, ExecutionConfig ec) { + // write the sequence to log for debugging purposes + Properties stdProps = standardCC.props().props(); + Properties newProps = new Properties(stdProps); + newProps.setProperty("group.id", "topic-printer"+UUID.randomUUID().toString()); + newProps.setProperty("auto.offset.reset", "smallest"); + newProps.setProperty("zookeeper.connect", standardCC.zkConnect()); + + ConsumerConfig printerConfig = new ConsumerConfig(newProps); + DeserializationSchema deserializer = new Utils.TypeInformationSerializationSchema>(new Tuple2(1,1), ec); + printTopic(topicName, printerConfig, deserializer, elements); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/705ee8ab/tools/log4j-travis.properties ---------------------------------------------------------------------- diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties index 056cf11..69ddcde 100644 --- a/tools/log4j-travis.properties +++ b/tools/log4j-travis.properties @@ -35,4 +35,4 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file \ No newline at end of file