flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
Date Mon, 30 Oct 2017 11:30:03 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224744#comment-16224744
] 

ASF GitHub Bot commented on FLINK-7784:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4910#discussion_r147677068
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
---
    @@ -35,60 +42,101 @@
     import java.io.IOException;
     import java.nio.charset.Charset;
     import java.nio.file.Files;
    +import java.time.Clock;
     import java.util.ArrayList;
     import java.util.Arrays;
     import java.util.Collections;
     import java.util.List;
     import java.util.UUID;
    +import java.util.concurrent.atomic.AtomicBoolean;
     
     import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
    +import static org.hamcrest.CoreMatchers.containsString;
    +import static org.hamcrest.CoreMatchers.equalTo;
    +import static org.hamcrest.CoreMatchers.hasItem;
     import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertThat;
     import static org.junit.Assert.assertTrue;
     import static org.junit.Assert.fail;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.when;
     
     /**
      * Tests for {@link TwoPhaseCommitSinkFunction}.
      */
     public class TwoPhaseCommitSinkFunctionTest {
    -	TestContext context;
    +
    +	@Rule
    +	public TemporaryFolder folder = new TemporaryFolder();
    +
    +	private FileBasedSinkFunction sinkFunction;
    +
    +	private OneInputStreamOperatorTestHarness<String, Object> harness;
    +
    +	private AtomicBoolean throwException = new AtomicBoolean();
    +
    +	private File targetDirectory;
    +
    +	private File tmpDirectory;
    +
    +	@Mock
    +	private Clock mockClock;
    +
    +	@Mock
    +	private Logger mockLogger;
    --- End diff --
    
    The test is asserting on the presence of the substring `This is close to or even exceeding
the transaction timeout` in the log message. All your changes to the code would still pass
the test except for the 2nd case because there is an assert on the elapsed time. Mockito is
used here so that the argument passed to `.warn` can be captured. Imo this is not evil as
no behavior is mocked, and actual effects are tested. 



> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> ------------------------------------------------------------
>
>                 Key: FLINK-7784
>                 URL: https://issues.apache.org/jira/browse/FLINK-7784
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Aljoscha Krettek
>            Assignee: Gary Yao
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails (either when
doing it via the completed checkpoint notification or when trying to commit after restoring
after failure). This means that the job will go into an infinite recovery loop because we
will always keep failing.
> In some cases it might be better to ignore those failures and keep on processing and
this should be the default. We can provide an option that allows failing the sink on failing
commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message