flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "vinoyang (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-10603) Reduce kafka test duration
Date Sun, 28 Oct 2018 11:48:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16666370#comment-16666370
] 

vinoyang edited comment on FLINK-10603 at 10/28/18 11:47 AM:
-------------------------------------------------------------

FlinkKafkaProducer011ITCase#testScaleUpAfterScalingDown, In my local test, this method always
takes about 55s to 1min. I added some time to measure in the method (see comment): 
{code:java}
public void testScaleUpAfterScalingDown() throws Exception {
   long start = System.currentTimeMillis();
   String topic = "scale-down-before-first-checkpoint";

   final int parallelism1 = 4;
   final int parallelism2 = 2;
   final int parallelism3 = 3;
   final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3));

   long step1 = System.currentTimeMillis();
   System.out.println("step1 : " + (step1 - start));    //0

   List<OperatorStateHandle> operatorSubtaskState = repartitionAndExecute(
      topic,
      Collections.emptyList(),
      parallelism1,
      maxParallelism,
      IntStream.range(0, parallelism1).boxed().iterator());

   long step2 = System.currentTimeMillis();
   System.out.println("step2 : " + (step2 - step1));    //32490

   operatorSubtaskState = repartitionAndExecute(
      topic,
      operatorSubtaskState,
      parallelism2,
      maxParallelism,
      IntStream.range(parallelism1,  parallelism1 + parallelism2).boxed().iterator());

   long step3 = System.currentTimeMillis();
   System.out.println("step3 : " + (step3 - step2));    //8939

   operatorSubtaskState = repartitionAndExecute(
      topic,
      operatorSubtaskState,
      parallelism3,
      maxParallelism,
      IntStream.range(parallelism1 + parallelism2,  parallelism1 + parallelism2 + parallelism3).boxed().iterator());

   long step4 = System.currentTimeMillis();
   System.out.println("step4 : " + (step4 - step3));    //6495

   // After each previous repartitionAndExecute call, we are left with some lingering transactions,
that would
   // not allow us to read all committed messages from the topic. Thus we initialize operators
from
   // OperatorSubtaskState once more, but without any new data. This should terminate all
ongoing transactions.

   operatorSubtaskState = repartitionAndExecute(
      topic,
      operatorSubtaskState,
      1,
      maxParallelism,
      Collections.emptyIterator());

   long step5 = System.currentTimeMillis();
   System.out.println("step5 : " + (step5 - step4));    //5394

   assertExactlyOnceForTopic(
      createProperties(),
      topic,
      0,
      IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()),
      10_000L);

   long step6 = System.currentTimeMillis();
   System.out.println("step6 : " + (step6 - step5));    //2039

   deleteTestTopic(topic);

   long end = System.currentTimeMillis();
   System.out.println("step7 : " + (end - step6));    //27
   System.out.println("total : " + (end - start));    //55384
} {code}
FlinkKafkaProducer011ITCase's total test time is about 5 minutes.

cc [~pnowojski] [~Zentol] [~aljoscha]


was (Author: yanghua):
FlinkKafkaProducer011ITCase#testScaleUpAfterScalingDown, In my local test, this method always
takes about 55s to 1min. I added some time to measure in the method (see comment): 
{code:java}
public void testScaleUpAfterScalingDown() throws Exception {
   long start = System.currentTimeMillis();
   String topic = "scale-down-before-first-checkpoint";

   final int parallelism1 = 4;
   final int parallelism2 = 2;
   final int parallelism3 = 3;
   final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3));

   long step1 = System.currentTimeMillis();
   System.out.println("step1 : " + (step1 - start));    //0

   List<OperatorStateHandle> operatorSubtaskState = repartitionAndExecute(
      topic,
      Collections.emptyList(),
      parallelism1,
      maxParallelism,
      IntStream.range(0, parallelism1).boxed().iterator());

   long step2 = System.currentTimeMillis();
   System.out.println("step2 : " + (step2 - step1));    //32490

   operatorSubtaskState = repartitionAndExecute(
      topic,
      operatorSubtaskState,
      parallelism2,
      maxParallelism,
      IntStream.range(parallelism1,  parallelism1 + parallelism2).boxed().iterator());

   long step3 = System.currentTimeMillis();
   System.out.println("step3 : " + (step3 - step2));    //8939

   operatorSubtaskState = repartitionAndExecute(
      topic,
      operatorSubtaskState,
      parallelism3,
      maxParallelism,
      IntStream.range(parallelism1 + parallelism2,  parallelism1 + parallelism2 + parallelism3).boxed().iterator());

   long step4 = System.currentTimeMillis();
   System.out.println("step4 : " + (step4 - step3));    //6495

   // After each previous repartitionAndExecute call, we are left with some lingering transactions,
that would
   // not allow us to read all committed messages from the topic. Thus we initialize operators
from
   // OperatorSubtaskState once more, but without any new data. This should terminate all
ongoing transactions.

   operatorSubtaskState = repartitionAndExecute(
      topic,
      operatorSubtaskState,
      1,
      maxParallelism,
      Collections.emptyIterator());

   long step5 = System.currentTimeMillis();
   System.out.println("step5 : " + (step5 - step4));    //5394

   assertExactlyOnceForTopic(
      createProperties(),
      topic,
      0,
      IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()),
      10_000L);

   long step6 = System.currentTimeMillis();
   System.out.println("step6 : " + (step6 - step5));    //2039

   deleteTestTopic(topic);

   long end = System.currentTimeMillis();
   System.out.println("step7 : " + (end - step6));    //27
   System.out.println("total : " + (end - start));    //55384
}

FlinkKafkaProducer011ITCase's total test time is about 5 minutes.
{code}
 

cc [~pnowojski] [~Zentol] [~aljoscha]

> Reduce kafka test duration
> --------------------------
>
>                 Key: FLINK-10603
>                 URL: https://issues.apache.org/jira/browse/FLINK-10603
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kafka Connector, Tests
>    Affects Versions: 1.7.0
>            Reporter: Chesnay Schepler
>            Assignee: vinoyang
>            Priority: Major
>              Labels: pull-request-available
>
> The tests for the modern kafka connector take more than 10 minutes which is simply unacceptable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message