flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl0u <...@git.apache.org>
Subject [GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.
Date Tue, 15 Nov 2016 17:06:05 GMT
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2797#discussion_r88063735
  
    --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
---
    @@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState<T> bucketState)
throws Exception {
     
     	/**
     	 * Gets the truncate() call using reflection.
    -	 *
     	 * <p>
    -	 * Note: This code comes from Flume
    +	 * <b>NOTE:</b> This code comes from Flume.
     	 */
     	private Method reflectTruncate(FileSystem fs) {
    -		Method m = null;
    -		if(fs != null) {
    -			Class<?> fsClass = fs.getClass();
    -			try {
    -				m = fsClass.getMethod("truncate", Path.class, long.class);
    -			} catch (NoSuchMethodException ex) {
    -				LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
    -					" and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix,
validLengthPrefix);
    -				return null;
    -			}
    +		if (this.refTruncate == null) {
    +			Method m = null;
    +			if (fs != null) {
    +				Class<?> fsClass = fs.getClass();
    +				try {
    +					m = fsClass.getMethod("truncate", Path.class, long.class);
    +				} catch (NoSuchMethodException ex) {
    +					LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
    +						" and prefix '{}' to specify how many bytes in a bucket are valid.",
    +						validLengthSuffix, validLengthPrefix);
    +					return null;
    +				}
    +
    +				// verify that truncate actually works
    +				FSDataOutputStream outputStream;
    +				Path testPath = new Path(UUID.randomUUID().toString());
    +				try {
    +					outputStream = fs.create(testPath);
    +					outputStream.writeUTF("hello");
    +					outputStream.close();
    +				} catch (IOException e) {
    +					LOG.error("Could not create file for checking if truncate works.", e);
    +					throw new RuntimeException("Could not create file for checking if truncate works.",
e);
    +				}
     
    +				try {
    +					m.invoke(fs, testPath, 2);
    +				} catch (IllegalAccessException | InvocationTargetException e) {
    +					LOG.debug("Truncate is not supported.", e);
    +					m = null;
    +				}
     
    -			// verify that truncate actually works
    -			FSDataOutputStream outputStream;
    -			Path testPath = new Path(UUID.randomUUID().toString());
    -			try {
    -				outputStream = fs.create(testPath);
    -				outputStream.writeUTF("hello");
    -				outputStream.close();
    -			} catch (IOException e) {
    -				LOG.error("Could not create file for checking if truncate works.", e);
    -				throw new RuntimeException("Could not create file for checking if truncate works.",
e);
    +				try {
    +					fs.delete(testPath, false);
    +				} catch (IOException e) {
    +					LOG.error("Could not delete truncate test file.", e);
    +					throw new RuntimeException("Could not delete truncate test file.", e);
    +				}
     			}
    +			this.refTruncate = m;
    +		}
    +		return this.refTruncate;
    +	}
     
    +	private Path getPendingPathFor(Path path) {
    --- End diff --
    
    what will this save?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message