flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.
Date Wed, 16 Nov 2016 10:31:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15670084#comment-15670084
] 

ASF GitHub Bot commented on FLINK-5056:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2797#discussion_r88208586
  
    --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
---
    @@ -570,139 +563,165 @@ private void closeCurrentPartFile(BucketState<T> bucketState)
throws Exception {
     
     	/**
     	 * Gets the truncate() call using reflection.
    -	 *
     	 * <p>
    -	 * Note: This code comes from Flume
    +	 * <b>NOTE:</b> This code comes from Flume.
     	 */
     	private Method reflectTruncate(FileSystem fs) {
    -		Method m = null;
    -		if(fs != null) {
    -			Class<?> fsClass = fs.getClass();
    -			try {
    -				m = fsClass.getMethod("truncate", Path.class, long.class);
    -			} catch (NoSuchMethodException ex) {
    -				LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
    -					" and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix,
validLengthPrefix);
    -				return null;
    -			}
    +		if (this.refTruncate == null) {
    +			Method m = null;
    +			if (fs != null) {
    +				Class<?> fsClass = fs.getClass();
    +				try {
    +					m = fsClass.getMethod("truncate", Path.class, long.class);
    +				} catch (NoSuchMethodException ex) {
    +					LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
    +						" and prefix '{}' to specify how many bytes in a bucket are valid.",
    +						validLengthSuffix, validLengthPrefix);
    +					return null;
    +				}
     
    +				// verify that truncate actually works
    +				FSDataOutputStream outputStream;
    +				Path testPath = new Path(UUID.randomUUID().toString());
    +				try {
    +					outputStream = fs.create(testPath);
    +					outputStream.writeUTF("hello");
    +					outputStream.close();
    +				} catch (IOException e) {
    +					LOG.error("Could not create file for checking if truncate works.", e);
    +					throw new RuntimeException("Could not create file for checking if truncate works.",
e);
    +				}
     
    -			// verify that truncate actually works
    -			FSDataOutputStream outputStream;
    -			Path testPath = new Path(UUID.randomUUID().toString());
    -			try {
    -				outputStream = fs.create(testPath);
    -				outputStream.writeUTF("hello");
    -				outputStream.close();
    -			} catch (IOException e) {
    -				LOG.error("Could not create file for checking if truncate works.", e);
    -				throw new RuntimeException("Could not create file for checking if truncate works.",
e);
    +				try {
    +					m.invoke(fs, testPath, 2);
    +				} catch (IllegalAccessException | InvocationTargetException e) {
    +					LOG.debug("Truncate is not supported.", e);
    +					m = null;
    +				}
    +
    +				try {
    +					fs.delete(testPath, false);
    +				} catch (IOException e) {
    +					LOG.error("Could not delete truncate test file.", e);
    +					throw new RuntimeException("Could not delete truncate test file.", e);
    +				}
     			}
    +			this.refTruncate = m;
    --- End diff --
    
    I'm not so sure about this assignment; it seems very odd to both assign it to a field
and return it. The returned value will typically be assigned to the very same field. The cotnract
should be cleaned up to either return the value or assign it.


> BucketingSink deletes valid data when checkpoint notification is slow.
> ----------------------------------------------------------------------
>
>                 Key: FLINK-5056
>                 URL: https://issues.apache.org/jira/browse/FLINK-5056
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.1.3
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a notification
about a previous checkpoint arrives, it clears its state. This can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message