Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C178917986 for ; Mon, 13 Apr 2015 10:09:49 +0000 (UTC) Received: (qmail 69991 invoked by uid 500); 13 Apr 2015 10:09:33 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 69950 invoked by uid 500); 13 Apr 2015 10:09:33 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 69941 invoked by uid 99); 13 Apr 2015 10:09:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Apr 2015 10:09:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 13 Apr 2015 10:09:32 +0000 Received: (qmail 68734 invoked by uid 99); 13 Apr 2015 10:09:12 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Apr 2015 10:09:12 +0000 Date: Mon, 13 Apr 2015 10:09:12 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-1753) Add more tests for Kafka Connectors MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/jira/browse/FLINK-1753?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1449= 2224#comment-14492224 ]=20 ASF GitHub Bot commented on FLINK-1753: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28227177 =20 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/= main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopic= Utils.java --- @@ -65,36 +75,145 @@ public void createTopic(String topicName, int numO= fPartitions, int replicationFa =09=09=09=09LOG.warn("Kafka topic \"{}\" already exists. Returning wit= hout action.", topicName); =09=09=09} =09=09} else { +=09=09=09LOG.info("Connecting zookeeper"); + +=09=09=09initZkClient(); =09=09=09AdminUtils.createTopic(zkClient, topicName, numOfPartitions, = replicationFactor, topicConfig); +=09=09=09closeZkClient(); +=09=09} +=09} + +=09public String getBrokerList(String topicName) { +=09=09return getBrokerAddressList(getBrokerAddresses(topicName)); +=09} + +=09public String getBrokerList(String topicName, int partitionId) { +=09=09return getBrokerAddressList(getBrokerAddresses(topicName, partit= ionId)); +=09} + +=09public Set getBrokerAddresses(String topicName) { +=09=09int numOfPartitions =3D getNumberOfPartitions(topicName); + +=09=09HashSet brokers =3D new HashSet(); +=09=09for (int i =3D 0; i < numOfPartitions; i++) { +=09=09=09brokers.addAll(getBrokerAddresses(topicName, i)); =09=09} +=09=09return brokers; +=09} + +=09public Set getBrokerAddresses(String topicName, int partiti= onId) { +=09=09PartitionMetadata partitionMetadata =3D waitAndGetPartitionMetad= ata(topicName, partitionId); +=09=09Collection inSyncReplicas =3D JavaConversions.asJavaColl= ection(partitionMetadata.isr()); + +=09=09HashSet addresses =3D new HashSet(); +=09=09for (Broker broker : inSyncReplicas) { +=09=09=09addresses.add(broker.connectionString()); +=09=09} +=09=09return addresses; +=09} + +=09private static String getBrokerAddressList(Set brokerAddres= ses) { +=09=09StringBuilder brokerAddressList =3D new StringBuilder(""); +=09=09for (String broker : brokerAddresses) { +=09=09=09brokerAddressList.append(broker); +=09=09=09brokerAddressList.append(','); +=09=09} +=09=09brokerAddressList.deleteCharAt(brokerAddressList.length() - 1); + +=09=09return brokerAddressList.toString(); =09} =20 =09public int getNumberOfPartitions(String topicName) { -=09=09Seq partitionMetadataSeq =3D getTopicInfo(top= icName).partitionsMetadata(); +=09=09Seq partitionMetadataSeq =3D getTopicMetadata= (topicName).partitionsMetadata(); =09=09return JavaConversions.asJavaCollection(partitionMetadataSeq).si= ze(); =09} =20 -=09public String getLeaderBrokerAddressForTopic(String topicName) { -=09=09TopicMetadata topicInfo =3D getTopicInfo(topicName); +=09public PartitionMetadata waitAndGetPartitionMetadata(String topicNa= me, int partitionId) { +=09=09PartitionMetadata partitionMetadata; +=09=09while (true) { +=09=09=09try { +=09=09=09=09partitionMetadata =3D getPartitionMetadata(topicName, part= itionId); +=09=09=09=09return partitionMetadata; +=09=09=09} catch (LeaderNotAvailableException e) { +=09=09=09=09// try fetching metadata again --- End diff -- =20 I would suggest to LOG.debug the exception > Add more tests for Kafka Connectors > ----------------------------------- > > Key: FLINK-1753 > URL: https://issues.apache.org/jira/browse/FLINK-1753 > Project: Flink > Issue Type: Improvement > Components: Streaming > Affects Versions: 0.9 > Reporter: Robert Metzger > Assignee: G=C3=A1bor Hermann > > The current {{KafkaITCase}} is only doing a single test. > We need to refactor that test so that it brings up a Kafka/Zookeeper serv= er and than performs various tests: > Tests to include: > - A topology with non-string types MERGED IN 359b39c3 > - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3 > - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3 > - Kafka broker failure. > - Flink TaskManager failure -- This message was sent by Atlassian JIRA (v6.3.4#6332)