flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-2979] Fix RollingSink truncate for Hadoop 2.7
Date Fri, 06 Nov 2015 18:48:13 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.10 63d92c7aa -> 7f312d1e5


[FLINK-2979] Fix RollingSink truncate for Hadoop 2.7

The problem was, that truncate is asynchronous and the RollingSink was
not taking this into account.

Now it has a loop after the truncate call that waits until the file is
actually truncated.

This also changes the Hadoop 2.6 travis build to 2.7, instead.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f312d1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f312d1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f312d1e

Branch: refs/heads/release-0.10
Commit: 7f312d1e596c5a7c8ee437822acb780657a68bea
Parents: 63d92c7
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Nov 6 16:15:06 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Nov 6 19:47:55 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    | 94 +++++++++++++++++---
 1 file changed, 82 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f312d1e/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index c705767..2112b28 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -17,9 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.fs;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import org.apache.commons.lang3.time.StopWatch;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
@@ -32,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +38,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -175,6 +177,13 @@ public class RollingSink<T> extends RichSinkFunction<T> implements
InputTypeConf
 	private final String DEFAULT_PART_REFIX = "part";
 
 	/**
+	 * The default timeout for asynchronous operations such as recoverLease and truncate. In
+	 * milliseconds.
+	 */
+	private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
+
+
+	/**
 	 * The base {@code Path} that stored all rolling bucket directories.
 	 */
 	private final String basePath;
@@ -223,6 +232,17 @@ public class RollingSink<T> extends RichSinkFunction<T> implements
InputTypeConf
 	private String partPrefix = DEFAULT_PART_REFIX;
 
 	/**
+	 * The timeout for asynchronous operations such as recoverLease and truncate. In
+	 * milliseconds.
+	 */
+	private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS;
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal fields (not configurable by user)
+	// --------------------------------------------------------------------------------------------
+
+
+	/**
 	 * The part file that we are currently writing to.
 	 */
 	private transient Path currentPartPath;
@@ -232,10 +252,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements
InputTypeConf
 	 */
 	private transient Path currentBucketDirectory;
 
-	// --------------------------------------------------------------------------------------------
-	//  Internal fields (not configurable by user)
-	// --------------------------------------------------------------------------------------------
-
 	/**
 	 * The {@code FSDataOutputStream} for the current part file.
 	 */
@@ -587,7 +603,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements
InputTypeConf
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
 		synchronized (bucketState.pendingFilesPerCheckpoint) {
 			Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
-			Set<Long> checkpointsToRemove = Sets.newHashSet();
+			Set<Long> checkpointsToRemove = new HashSet<>();
 			for (Long pastCheckpointId : pastCheckpointIds) {
 				if (pastCheckpointId <= checkpointId) {
 					LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
@@ -628,7 +644,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements
InputTypeConf
 		synchronized (bucketState.pendingFilesPerCheckpoint) {
 			bucketState.pendingFilesPerCheckpoint.put(checkpointId, bucketState.pendingFiles);
 		}
-		bucketState.pendingFiles = Lists.newArrayList();
+		bucketState.pendingFiles = new ArrayList<>();
 		return bucketState;
 	}
 
@@ -675,7 +691,51 @@ public class RollingSink<T> extends RichSinkFunction<T> implements
InputTypeConf
 				// truncate it or write a ".valid-length" file to specify up to which point it is valid
 				if (refTruncate != null) {
 					LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
-					refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
+					// some-one else might still hold the lease from a previous try, we are
+					// recovering, after all ...
+					if (fs instanceof DistributedFileSystem) {
+						DistributedFileSystem dfs = (DistributedFileSystem) fs;
+						LOG.debug("Trying to recover file lease {}", partPath);
+						dfs.recoverLease(partPath);
+						boolean isclosed= dfs.isFileClosed(partPath);
+						StopWatch sw = new StopWatch();
+						sw.start();
+						while(!isclosed) {
+							if(sw.getTime() > asyncTimeout) {
+								break;
+							}
+							try {
+								Thread.sleep(500);
+							} catch (InterruptedException e1) {
+								// ignore it
+							}
+							isclosed = dfs.isFileClosed(partPath);
+						}
+					}
+					Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
+					if (!truncated) {
+						LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
+
+						// we must wait for the asynchronous truncate operation to complete
+						StopWatch sw = new StopWatch();
+						sw.start();
+						long newLen = fs.getFileStatus(partPath).getLen();
+						while(newLen != bucketState.currentFileValidLength) {
+							if(sw.getTime() > asyncTimeout) {
+								break;
+							}
+							try {
+								Thread.sleep(500);
+							} catch (InterruptedException e1) {
+								// ignore it
+							}
+							newLen = fs.getFileStatus(partPath).getLen();
+						}
+						if (newLen != bucketState.currentFileValidLength) {
+							throw new RuntimeException("Truncate did not truncate to right length. Should be "
+ bucketState.currentFileValidLength + " is " + newLen + ".");
+						}
+					}
+
 				} else {
 					LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
 					Path validLengthFilePath = new Path(partPath.getParent(), validLengthPrefix + partPath.getName()).suffix(validLengthSuffix);
@@ -864,6 +924,16 @@ public class RollingSink<T> extends RichSinkFunction<T> implements
InputTypeConf
 		return this;
 	}
 
+	/**
+	 * Sets the default timeout for asynchronous operations such as recoverLease and truncate.
+	 *
+	 * @param timeout The timeout, in milliseconds.
+	 */
+	public RollingSink<T> setAsyncTimeout(long timeout) {
+		this.asyncTimeout = timeout;
+		return this;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Internal Classes
 	// --------------------------------------------------------------------------------------------
@@ -888,13 +958,13 @@ public class RollingSink<T> extends RichSinkFunction<T>
implements InputTypeConf
 		/**
 		 * Pending files that accumulated since the last checkpoint.
 		 */
-		List<String> pendingFiles = Lists.newArrayList();
+		List<String> pendingFiles = new ArrayList<>();
 
 		/**
 		 * When doing a checkpoint we move the pending files since the last checkpoint to this
map
 		 * with the id of the checkpoint. When we get the checkpoint-complete notification we move
 		 * pending files of completed checkpoints to their final location.
 		 */
-		final Map<Long, List<String>> pendingFilesPerCheckpoint = Maps.newHashMap();
+		final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
 	}
 }


Mime
View raw message