flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1753) Add more tests for Kafka Connectors
Date Mon, 13 Apr 2015 10:17:12 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492232#comment-14492232
] 

ASF GitHub Bot commented on FLINK-1753:
---------------------------------------

Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/589#discussion_r28227590
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
---
    @@ -524,13 +537,149 @@ public void cancel() {
     		}
     	}
     
    +	private static boolean leaderHasShutDown = false;
    +
    +	@Test
    +	public void brokerFailureTest() throws Exception {
    +		String topic = "brokerFailureTestTopic";
    +
    +		createTestTopic(topic, 2, 2);
     
    -	private void createTestTopic(String topic, int numberOfPartitions) {
     		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
    -		kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1);
    +		final String leaderToShutDown =
    +				kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
    +
    +		final Thread brokerShutdown = new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				shutdownKafkaBroker = false;
    +				while (!shutdownKafkaBroker) {
    +					try {
    +						Thread.sleep(10);
    +					} catch (InterruptedException e) {
    +						LOG.warn("Interruption", e);
    +					}
    +				}
    +
    +				for (KafkaServer kafkaServer : brokers) {
    +					if (leaderToShutDown.equals(
    +							kafkaServer.config().advertisedHostName()
    +									+ ":"
    +									+ kafkaServer.config().advertisedPort()
    +					)) {
    +						LOG.info("Killing Kafka Server {}", leaderToShutDown);
    +						kafkaServer.shutdown();
    +						leaderHasShutDown = true;
    +						break;
    +					}
    +				}
    +			}
    +		});
    +		brokerShutdown.start();
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
    +
    --- End diff --
    
    I would use `TestSreamEnvironment` instead.


> Add more tests for Kafka Connectors
> -----------------------------------
>
>                 Key: FLINK-1753
>                 URL: https://issues.apache.org/jira/browse/FLINK-1753
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Gábor Hermann
>
> The current {{KafkaITCase}} is only doing a single test.
> We need to refactor that test so that it brings up a Kafka/Zookeeper server and than
performs various tests:
> Tests to include:
> - A topology with non-string types MERGED IN 359b39c3
> - A topology with a custom Kafka Partitioning class MERGED IN 359b39c3
> - A topology testing the regular {{KafkaSource}}. MERGED IN 359b39c3
> - Kafka broker failure.
> - Flink TaskManager failure



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message