flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.
Date Wed, 16 Nov 2016 10:31:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15670081#comment-15670081
] 

ASF GitHub Bot commented on FLINK-5056:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2797#discussion_r88206960
  
    --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
---
    @@ -160,7 +167,15 @@ public void postSubmit() throws Exception {
     				while (line != null) {
     					Matcher matcher = messageRegex.matcher(line);
     					if (matcher.matches()) {
    -						numRead++;
    +						uniqMessagesRead.add(line);
    +
    +						// check that in the committed files there are no duplicates
    +						if (!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && !file.getPath().toString().endsWith(PENDING_SUFFIX))
{
    +							if (!messagesInCommittedFiles.add(line)) {
    +								Assert.fail("Duplicate entry in committed bucket.");
    --- End diff --
    
    this check isn't truly necessary, is it? We have the total number of messages with `readNumbers.size()`,
and the number of unique messages ones in `uniqMessagesRead.size()`. Since we compare both
with `NUM_STRINGS` the test can only succeed if no duplicates exist anyway.


> BucketingSink deletes valid data when checkpoint notification is slow.
> ----------------------------------------------------------------------
>
>                 Key: FLINK-5056
>                 URL: https://issues.apache.org/jira/browse/FLINK-5056
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.1.3
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a notification
about a previous checkpoint arrives, it clears its state. This can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message