flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [4/5] flink git commit: [FLINK-8941][serialization] make sure we use unique spilling files
Date Wed, 28 Mar 2018 06:12:27 GMT
[FLINK-8941][serialization] make sure we use unique spilling files

Although the spilling files were chosen with random names of 20 bytes, it could
rarely happen that these collide. In that case, have another try (at most 10) at
selecting a unique file name.

This closes #5709.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1bb9567f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1bb9567f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1bb9567f

Branch: refs/heads/release-1.5
Commit: 1bb9567f8ce53a7c4e03a2abc74989da7f1490bf
Parents: 7a060d4
Author: Nico Kruber <nico@data-artisans.com>
Authored: Fri Mar 16 11:51:29 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Mar 28 08:12:05 2018 +0200

----------------------------------------------------------------------
 .../SpillingAdaptiveSpanningRecordDeserializer.java | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1bb9567f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index fded258..41ee03d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -36,6 +36,7 @@ import java.io.UTFDataFormatException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
+import java.util.Arrays;
 import java.util.Random;
 
 /**
@@ -627,10 +628,19 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends
IOReadableWrit
 				throw new IllegalStateException("Spilling file already exists.");
 			}
 
-			String directory = tempDirs[rnd.nextInt(tempDirs.length)];
-			spillFile = new File(directory, randomString(rnd) + ".inputchannel");
+			// try to find a unique file name for the spilling channel
+			int maxAttempts = 10;
+			for (int attempt = 0; attempt < maxAttempts; attempt++) {
+				String directory = tempDirs[rnd.nextInt(tempDirs.length)];
+				spillFile = new File(directory, randomString(rnd) + ".inputchannel");
+				if (spillFile.createNewFile()) {
+					return new RandomAccessFile(spillFile, "rw").getChannel();
+				}
+			}
 
-			return new RandomAccessFile(spillFile, "rw").getChannel();
+			throw new IOException(
+				"Could not find a unique file channel name in '" + Arrays.toString(tempDirs) +
+					"' for spilling large records during deserialization.");
 		}
 
 		private static String randomString(Random random) {


Mime
View raw message