flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7011) Instable Kafka testStartFromKafkaCommitOffsets failures on Travis
Date Fri, 30 Jun 2017 04:18:02 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069446#comment-16069446
] 

ASF GitHub Bot commented on FLINK-7011:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4190#discussion_r124961658
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
    @@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws Exception {
     
     		final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition,
parallelism, 1);
     
    -		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
    +		// read some records so that some offsets are committed to Kafka
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.getConfig().disableSysoutLogging();
    +		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +		env.setParallelism(parallelism);
    +		env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
     
    -		Long o1;
    -		Long o2;
    -		Long o3;
    -		int attempt = 0;
    -		// make sure that o1, o2, o3 are not all null before proceeding
    -		do {
    -			attempt++;
    -			LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka");
    -
    -			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    -			env.getConfig().disableSysoutLogging();
    -			env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    -			env.setParallelism(parallelism);
    -			env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
    -
    -			env
    -				.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
    -				.map(new ThrottledMapper<String>(consumePause))
    -				.map(new MapFunction<String, Object>() {
    -					int count = 0;
    -					@Override
    -					public Object map(String value) throws Exception {
    -						count++;
    -						if (count == recordsToConsume) {
    -							throw new SuccessException();
    -						}
    -						return null;
    +		env
    +			.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
    +			.map(new ThrottledMapper<String>(consumePause))
    +			.map(new MapFunction<String, Object>() {
    +				int count = 0;
    +				@Override
    +				public Object map(String value) throws Exception {
    +					count++;
    +					if (count == recordsToConsume) {
    +						throw new SuccessException();
     					}
    -				})
    -				.addSink(new DiscardingSink<>());
    +					return null;
    +				}
    +			})
    +			.addSink(new DiscardingSink<>());
     
    -			tryExecute(env, "Read some records to commit offsets to Kafka");
    +		tryExecute(env, "Read some records to commit offsets to Kafka");
     
    +		// make sure that we indeed have some offsets committed to Kafka
    +		Long o1 = null;
    +		Long o2 = null;
    +		Long o3 = null;
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
    +		while (o1 == null && o2 == null && o3 == null) {
    +			Thread.sleep(100);
     			o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
     			o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
     			o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
    -		} while (o1 == null && o2 == null && o3 == null && attempt
< 3);
    --- End diff --
    
    I understand the argument. Perhaps it is also a fact that this test is covering too much
into one single test, hence the awkwardness in making it stable.
    I think it is sufficient to have 2 separate tests that replace this:
    (a) test that committed Kafka offsets are correct (there is already a ITCase for this)
    (b) test that committed offsets are correctly picked up and used correctly (there is actually
also a test for this already).
    
    Hence, I would conclude that perhaps this test can be removed.


> Instable Kafka testStartFromKafkaCommitOffsets failures on Travis
> -----------------------------------------------------------------
>
>                 Key: FLINK-7011
>                 URL: https://issues.apache.org/jira/browse/FLINK-7011
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Tests
>    Affects Versions: 1.3.1, 1.4.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> Example:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/246703474/log.txt?X-Amz-Expires=30&X-Amz-Date=20170627T065647Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170627/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=dbfc90cfc386fef0990325b54ff74ee4d441944687e7fdaa73ce7b0c2b2ec0ea
> In general, the test {{testStartFromKafkaCommitOffsets}} implementation is a bit of an
overkill. Before continuing with the test, it writes some records just for the sake of committing
offsets to Kafka and waits for some offsets to be committed (which leads to the instability),
whereas we can do that simply using the test base's {{OffsetHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message