flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] akalash commented on a change in pull request #17187: [FLINK-11250][runtime] Added method init for RecordWriter for initialization resources(OutputFlusher) outside of constructor
Date Thu, 16 Sep 2021 14:39:20 GMT

akalash commented on a change in pull request #17187:
URL: https://github.com/apache/flink/pull/17187#discussion_r710184536



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1318,58 +1320,94 @@ public void testThreadInvariants() throws Throwable {
         }
     }
 
-    /**
-     * This test ensures that {@link RecordWriter} is correctly closed even if we fail to
construct
-     * {@link OperatorChain}, for example because of user class deserialization error.
-     */
     @Test
-    public void testRecordWriterClosedOnStreamOperatorFactoryDeserializationError()
+    public void testRecordWriterClosedOnTransitDeployingStateError() throws Exception {
+        testRecordWriterClosedOnTransitStateError(ExecutionState.DEPLOYING);
+    }
+
+    @Test
+    public void testRecordWriterClosedOnTransitInitializingStateError() throws Exception
{
+        testRecordWriterClosedOnTransitStateError(ExecutionState.INITIALIZING);
+    }
+
+    @Test
+    public void testRecordWriterClosedOnTransitRunningStateError() throws Exception {
+        testRecordWriterClosedOnTransitStateError(ExecutionState.RUNNING);
+    }
+
+    private void testRecordWriterClosedOnTransitStateError(ExecutionState executionState)
+            throws Exception {
+        // Throw the exception when the state updating to the expected one.
+        NoOpTaskManagerActions taskManagerActions =
+                new NoOpTaskManagerActions() {
+                    @Override
+                    public void updateTaskExecutionState(TaskExecutionState taskExecutionState)
{
+                        if (taskExecutionState.getExecutionState() == executionState) {
+                            throw new ExpectedTestException();
+                        }
+                    }
+                };
+
+        testRecordWriterClosedOnError(
+                env ->
+                        taskBuilderWithConfiguredRecordWriter(env)
+                                .setTaskManagerActions(taskManagerActions)
+                                .build());
+    }
+
+    private void testRecordWriterClosedOnError(
+            FunctionWithException<NettyShuffleEnvironment, Task, Exception> taskProvider)
             throws Exception {
+        try (NettyShuffleEnvironment shuffleEnvironment =
+                new NettyShuffleEnvironmentBuilder().build()) {
+            Task task = taskProvider.apply(shuffleEnvironment);
+
+            task.startTaskThread();
+            task.getExecutingThread().join();
+
+            assertEquals(ExecutionState.FAILED, task.getExecutionState());
+            for (Thread thread : Thread.getAllStackTraces().keySet()) {
+                assertThat(
+                        thread.getName(),
+                        CoreMatchers.is(not(containsString(DEFAULT_OUTPUT_FLUSH_THREAD_NAME))));
+            }
+        }
+    }
+
+    private TestTaskBuilder taskBuilderWithConfiguredRecordWriter(
+            NettyShuffleEnvironment shuffleEnvironment) {
         Configuration taskConfiguration = new Configuration();
+        outputEdgeConfiguration(taskConfiguration);
+
+        ResultPartitionDeploymentDescriptor descriptor =
+                new ResultPartitionDeploymentDescriptor(
+                        PartitionDescriptorBuilder.newBuilder().build(),
+                        NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
+                        1,
+                        false);
+        return new TestTaskBuilder(shuffleEnvironment)
+                .setInvokable(NoOpStreamTask.class)
+                .setTaskConfig(taskConfiguration)
+                .setResultPartitions(singletonList(descriptor));
+    }
+
+    /**
+     * Make sure that there is some output edge in the config so that some RecordWriter is
created.
+     */
+    private void outputEdgeConfiguration(Configuration taskConfiguration) {
         StreamConfig streamConfig = new StreamConfig(taskConfiguration);
         streamConfig.setStreamOperatorFactory(new UnusedOperatorFactory());
 
-        // Make sure that there is some output edge in the config so that some RecordWriter
is
-        // created
         StreamConfigChainer cfg =
                 new StreamConfigChainer(new OperatorID(42, 42), streamConfig, this, 1);
+        cfg.setBufferTimeout(1);

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message