flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1753) Add more tests for Kafka Connectors
Date Mon, 13 Apr 2015 10:14:12 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492229#comment-14492229
] 

ASF GitHub Bot commented on FLINK-1753:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/589#discussion_r28227428
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.runtime.net.NetUtils;
    +import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
    +import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import kafka.api.PartitionMetadata;
    +import kafka.server.KafkaConfig;
    +import kafka.server.KafkaServer;
    +
    +public class KafkaTopicUtilsTest {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtilsTest.class);
    +	private static final int NUMBER_OF_BROKERS = 2;
    +	private static final String TOPIC = "myTopic";
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	@Test
    +	public void test() {
    +		int zkPort;
    +		String kafkaHost;
    +		String zookeeperConnectionString;
    +
    +		File tmpZkDir;
    +		List<File> tmpKafkaDirs;
    +		Map<String, KafkaServer> kafkaServers = null;
    +		TestingServer zookeeper = null;
    +
    +		try {
    +			tmpZkDir = tempFolder.newFolder();
    +
    +			tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_BROKERS);
    +			for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
    +				tmpKafkaDirs.add(tempFolder.newFolder());
    +			}
    +
    +			zkPort = NetUtils.getAvailablePort();
    +			kafkaHost = InetAddress.getLocalHost().getHostName();
    +			zookeeperConnectionString = "localhost:" + zkPort;
    +
    +			// init zookeeper
    +			zookeeper = new TestingServer(zkPort, tmpZkDir);
    +
    +			// init kafka kafkaServers
    +			kafkaServers = new HashMap<String, KafkaServer>();
    +
    +			for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
    +				KafkaServer kafkaServer = getKafkaServer(kafkaHost, zookeeperConnectionString, i,
tmpKafkaDirs.get(i));
    +				kafkaServers.put(kafkaServer.config().advertisedHostName() + ":" + kafkaServer.config().advertisedPort(),
kafkaServer);
    +			}
    +
    +			// create Kafka topic
    +			final KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
    +			kafkaTopicUtils.createTopic(TOPIC, 1, 2);
    +
    +			// check whether topic exists
    +			assertTrue(kafkaTopicUtils.topicExists(TOPIC));
    +
    +			// check number of partitions
    +			assertEquals(1, kafkaTopicUtils.getNumberOfPartitions(TOPIC));
    +
    +			// get partition metadata without error
    +			PartitionMetadata partitionMetadata = kafkaTopicUtils.waitAndGetPartitionMetadata(TOPIC,
0);
    +			assertEquals(0, partitionMetadata.errorCode());
    +
    +			// get broker list
    +			assertEquals(new HashSet<String>(kafkaServers.keySet()), kafkaTopicUtils.getBrokerAddresses(TOPIC));
    +		} catch (IOException e) {
    +			e.printStackTrace();
    +		} catch (Exception e) {
    --- End diff --
    
    I would fail the test on these exceptions.


> Add more tests for Kafka Connectors
> -----------------------------------
>
>                 Key: FLINK-1753
>                 URL: https://issues.apache.org/jira/browse/FLINK-1753
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Gábor Hermann
>
> The current {{KafkaITCase}} is only doing a single test.
> We need to refactor that test so that it brings up a Kafka/Zookeeper server and than
performs various tests:
> Tests to include:
> - A topology with non-string types MERGED IN 359b39c3
> - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3
> - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3
> - Kafka broker failure.
> - Flink TaskManager failure



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message