Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E82B6179DA for ; Mon, 13 Apr 2015 10:17:33 +0000 (UTC) Received: (qmail 85964 invoked by uid 500); 13 Apr 2015 10:17:33 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 85923 invoked by uid 500); 13 Apr 2015 10:17:33 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 85914 invoked by uid 99); 13 Apr 2015 10:17:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Apr 2015 10:17:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 13 Apr 2015 10:17:32 +0000 Received: (qmail 85748 invoked by uid 99); 13 Apr 2015 10:17:12 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Apr 2015 10:17:12 +0000 Date: Mon, 13 Apr 2015 10:17:12 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-1753) Add more tests for Kafka Connectors MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/jira/browse/FLINK-1753?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1449= 2232#comment-14492232 ]=20 ASF GitHub Bot commented on FLINK-1753: --------------------------------------- Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28227590 =20 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/= test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java --- @@ -524,13 +537,149 @@ public void cancel() { =09=09} =09} =20 +=09private static boolean leaderHasShutDown =3D false; + +=09@Test +=09public void brokerFailureTest() throws Exception { +=09=09String topic =3D "brokerFailureTestTopic"; + +=09=09createTestTopic(topic, 2, 2); =20 -=09private void createTestTopic(String topic, int numberOfPartitions) = { =09=09KafkaTopicUtils kafkaTopicUtils =3D new KafkaTopicUtils(zookeepe= rConnectionString); -=09=09kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1); +=09=09final String leaderToShutDown =3D +=09=09=09=09kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).lead= er().get().connectionString(); + +=09=09final Thread brokerShutdown =3D new Thread(new Runnable() { +=09=09=09@Override +=09=09=09public void run() { +=09=09=09=09shutdownKafkaBroker =3D false; +=09=09=09=09while (!shutdownKafkaBroker) { +=09=09=09=09=09try { +=09=09=09=09=09=09Thread.sleep(10); +=09=09=09=09=09} catch (InterruptedException e) { +=09=09=09=09=09=09LOG.warn("Interruption", e); +=09=09=09=09=09} +=09=09=09=09} + +=09=09=09=09for (KafkaServer kafkaServer : brokers) { +=09=09=09=09=09if (leaderToShutDown.equals( +=09=09=09=09=09=09=09kafkaServer.config().advertisedHostName() +=09=09=09=09=09=09=09=09=09+ ":" +=09=09=09=09=09=09=09=09=09+ kafkaServer.config().advertisedPort() +=09=09=09=09=09)) { +=09=09=09=09=09=09LOG.info("Killing Kafka Server {}", leaderToShutDown= ); +=09=09=09=09=09=09kafkaServer.shutdown(); +=09=09=09=09=09=09leaderHasShutDown =3D true; +=09=09=09=09=09=09break; +=09=09=09=09=09} +=09=09=09=09} +=09=09=09} +=09=09}); +=09=09brokerShutdown.start(); + +=09=09final StreamExecutionEnvironment env =3D StreamExecutionEnvironm= ent.createLocalEnvironment(1); + --- End diff -- =20 I would use `TestSreamEnvironment` instead. > Add more tests for Kafka Connectors > ----------------------------------- > > Key: FLINK-1753 > URL: https://issues.apache.org/jira/browse/FLINK-1753 > Project: Flink > Issue Type: Improvement > Components: Streaming > Affects Versions: 0.9 > Reporter: Robert Metzger > Assignee: G=C3=A1bor Hermann > > The current {{KafkaITCase}} is only doing a single test. > We need to refactor that test so that it brings up a Kafka/Zookeeper serv= er and than performs various tests: > Tests to include: > - A topology with non-string types MERGED IN 359b39c3 > - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3 > - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3 > - Kafka broker failure. > - Flink TaskManager failure -- This message was sent by Atlassian JIRA (v6.3.4#6332)