flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] AHeise commented on a change in pull request #17050: [FLINK-23351][tests] Harden FileReadingWatermarkITCase by counting wa…
Date Tue, 31 Aug 2021 10:08:53 GMT

AHeise commented on a change in pull request #17050:
URL: https://github.com/apache/flink/pull/17050#discussion_r699178562



##########
File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java
##########
@@ -17,130 +17,109 @@
 
 package org.apache.flink.test.streaming.api;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests that watermarks are emitted while file is being read, particularly the last split.
  *
  * @see <a href="https://issues.apache.org/jira/browse/FLINK-19109">FLINK-19109</a>
  */
-public class FileReadingWatermarkITCase {
-    private static final String NUM_WATERMARKS_ACC_NAME = "numWatermarks";
-    private static final String RUNTIME_ACC_NAME = "runtime";
-    private static final int FILE_SIZE_LINES = 5_000_000;
-    private static final int WATERMARK_INTERVAL_MILLIS = 10;
-    private static final int MIN_EXPECTED_WATERMARKS = 5;
+public class FileReadingWatermarkITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(FileReadingWatermarkITCase.class);
+
+    private static final int WATERMARK_INTERVAL_MILLIS = 1_000;
+    private static final int EXPECTED_WATERMARKS = 5;
 
     @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+    private SharedReference<CountDownLatch> latch;
+
+    @Before
+    public void setUp() {
+        latch = sharedObjects.add(new CountDownLatch(EXPECTED_WATERMARKS));
+    }

Review comment:
       Here is a bigger rewrite of the test that removes a few extra classes/fields and works
around deprecations. I can't attach that to the whole file though. Ping me offline if you
want to have more detailed feedback.
   
   ```suggestion
       @Rule public final SharedObjects sharedObjects = SharedObjects.create();
   
       /**
        * Adds an infinite split that causes the input of {@link
        * org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator} to
instantly go
        * idle while data is still being processed.
        *
        * <p>Before FLINK-19109, watermarks would not be emitted at this point.
        */
       @Test
       public void testWatermarkEmissionWithChaining() throws Exception {
           StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
           env.getConfig().setAutoWatermarkInterval(WATERMARK_INTERVAL_MILLIS);
   
           SharedReference<CountDownLatch> latch =
                   sharedObjects.add(new CountDownLatch(EXPECTED_WATERMARKS));
           checkState(env.isChainingEnabled());
           env.createInput(new InfiniteIntegerInputFormat(true))
                   .assignTimestampsAndWatermarks(
                           WatermarkStrategy.<Integer>forMonotonousTimestamps()
                                   .withTimestampAssigner(context -> getExtractorAssigner()))
                   .addSink(getWatermarkCounter(latch));
   
           env.executeAsync();
           latch.get().await();
       }
   
       private static TimestampAssigner<Integer> getExtractorAssigner() {
           return new TimestampAssigner<Integer>() {
   
               private long counter = 1;
   
               @Override
               public long extractTimestamp(Integer element, long recordTimestamp) {
                   return counter++;
               }
           };
       }
   
       private static SinkFunction<Integer> getWatermarkCounter(
               final SharedReference<CountDownLatch> latch) {
           return new RichSinkFunction<Integer>() {
   
               @Override
               public void invoke(Integer value, SinkFunction.Context context) {
                   try {
                       Thread.sleep(1000);
                       LOG.info("Sink received record");
                   } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                   }
               }
   
               @Override
               public void writeWatermark(Watermark watermark) {
                   LOG.info("Sink received watermark {}", watermark);
                   latch.get().countDown();
               }
           };
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message