flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [01/12] flink git commit: [FLINK-5332] [core] Synchronize FileSystem::initOutPathLocalFS() to prevent lost files when called concurrently.
Date Wed, 14 Dec 2016 12:10:08 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6cfc841b5 -> e4c767a37


[FLINK-5332] [core] Synchronize FileSystem::initOutPathLocalFS() to prevent lost files when
called concurrently.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f3ad58b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f3ad58b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f3ad58b

Branch: refs/heads/master
Commit: 2f3ad58b7b73463aa1827baef0eb2e9d87fdb882
Parents: 790153c
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Dec 13 19:12:12 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    | 268 ++++++++++---------
 .../core/fs/local/LocalDataOutputStream.java    |  24 +-
 .../flink/core/fs/InitOutputPathTest.java       | 265 ++++++++++++++++++
 .../flink/core/testutils/OneShotLatch.java      |   5 +
 .../apache/flink/test/util/TestBaseUtils.java   |  50 ++--
 5 files changed, 436 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 3dced6f..e6313aa 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -44,6 +44,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -86,6 +87,12 @@ public abstract class FileSystem {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
 
+	/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
+	 * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible
to races */
+	private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
+
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning
of the task's
 	 * main thread.
@@ -537,23 +544,37 @@ public abstract class FileSystem {
 	/**
 	 * Initializes output directories on local file systems according to the given write mode.
 	 *
-	 * WriteMode.NO_OVERWRITE &amp; parallel output:
-	 *  - A directory is created if the output path does not exist.
-	 *  - An existing directory is reused, files contained in the directory are NOT deleted.
-	 *  - An existing file raises an exception.
+	 * <ul>
+	 *   <li>WriteMode.NO_OVERWRITE &amp; parallel output:
+	 *     <ul>
+	 *       <li>A directory is created if the output path does not exist.</li>
+	 *       <li>An existing directory is reused, files contained in the directory are
NOT deleted.</li>
+	 *       <li>An existing file raises an exception.</li>
+	 *     </ul>
+	 *   </li>
 	 *
-	 * WriteMode.NO_OVERWRITE &amp; NONE parallel output:
-	 *  - An existing file or directory raises an exception.
+	 *   <li>WriteMode.NO_OVERWRITE &amp; NONE parallel output:
+	 *     <ul>
+	 *       <li>An existing file or directory raises an exception.</li>
+	 *     </ul>
+	 *   </li>
 	 *
-	 * WriteMode.OVERWRITE &amp; parallel output:
-	 *  - A directory is created if the output path does not exist.
-	 *  - An existing directory is reused, files contained in the directory are NOT deleted.
-	 *  - An existing file is deleted and replaced by a new directory.
-	 *
-	 * WriteMode.OVERWRITE &amp; NONE parallel output:
-	 *  - An existing file or directory (and all its content) is deleted
+	 *   <li>WriteMode.OVERWRITE &amp; parallel output:
+	 *     <ul>
+	 *       <li>A directory is created if the output path does not exist.</li>
+	 *       <li>An existing directory is reused, files contained in the directory are
NOT deleted.</li>
+	 *       <li>An existing file is deleted and replaced by a new directory.</li>
+	 *     </ul>
+	 *   </li>
 	 *
-	 * Files contained in an existing directory are not deleted, because multiple instances
of a
+	 *   <li>WriteMode.OVERWRITE &amp; NONE parallel output:
+	 *     <ul>
+	 *       <li>An existing file or directory (and all its content) is deleted</li>
+	 *     </ul>
+	 *   </li>
+	 * </ul>
+	 * 
+	 * <p>Files contained in an existing directory are not deleted, because multiple instances
of a
 	 * DataSinkTask might call this function at the same time and hence might perform concurrent
 	 * delete operations on the file system (possibly deleting output files of concurrently
running tasks).
 	 * Since concurrent DataSinkTasks are not aware of each other, coordination of delete and
create
@@ -561,48 +582,58 @@ public abstract class FileSystem {
 	 *
 	 * @param outPath Output path that should be prepared.
 	 * @param writeMode Write mode to consider.
-	 * @param createDirectory True, to initialize a directory at the given path, false otherwise.
+	 * @param createDirectory True, to initialize a directory at the given path, false to prepare
space for a file.
+	 *    
 	 * @return True, if the path was successfully prepared, false otherwise.
-	 * @throws IOException
+	 * @throws IOException Thrown, if any of the file system access operations failed.
 	 */
 	public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory)
throws IOException {
-		if (this.isDistributedFS()) {
+		if (isDistributedFS()) {
 			return false;
 		}
 
-		// NOTE: we sometimes see this code block fail due to a races when changes to the file
system take small time fractions before being
-		//       visible to other threads. for example:
-		// - the check whether the directory exists returns false
-		// - the call to create the directory fails (some concurrent thread is creating the directory,
locked)
-		// - the call to check whether the directory exists does not yet see the new directory
(change is not committed)
+		// NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
+		// concurrently work in this method (multiple output formats writing locally) might end
+		// up deleting each other's directories and leave non-retrievable files, without necessarily
+		// causing an exception. That results in very subtle issues, like output files looking
as if
+		// they are not getting created.
 
-		// try for 30 seconds
-		final long now = System.currentTimeMillis();
-		final long deadline = now + 30000;
+		// we acquire the lock interruptibly here, to make sure that concurrent threads waiting
+		// here can cancel faster
+		try {
+			OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
+		}
+		catch (InterruptedException e) {
+			// restore the interruption state
+			Thread.currentThread().interrupt();
 
-		Exception lastError = null;
+			// leave the method - we don't have the lock anyways 
+			throw new IOException("The thread was interrupted while trying to initialize the output
directory");
+		}
 
-		do {
-			FileStatus status = null;
+		try {
+			FileStatus status;
 			try {
 				status = getFileStatus(outPath);
 			}
 			catch (FileNotFoundException e) {
 				// okay, the file is not there
+				status = null;
 			}
 
 			// check if path exists
 			if (status != null) {
 				// path exists, check write mode
 				switch (writeMode) {
+
 				case NO_OVERWRITE:
 					if (status.isDir() && createDirectory) {
 						return true;
 					} else {
 						// file may not be overwritten
-						throw new IOException("File or directory already exists. Existing files and directories
are not overwritten in " +
-								WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
-								" mode to overwrite existing files and directories.");
+						throw new IOException("File or directory already exists. Existing files and directories
" +
+								"are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " + 
+								WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
 					}
 
 				case OVERWRITE:
@@ -612,31 +643,27 @@ public abstract class FileSystem {
 							return true;
 						} else {
 							// we will write in a single file, delete directory
-							// (there is also no other thread trying to delete the directory, since there is only
one writer).
 							try {
-								this.delete(outPath, true);
+								delete(outPath, true);
 							}
 							catch (IOException e) {
-								// due to races in some file systems, it may spuriously occur that a deleted the
file looks
-								// as if it still exists and is gone a millisecond later, once the change is committed
-								// we ignore the exception, possibly fall through the loop later
-								lastError = e;
+								throw new IOException("Could not remove existing directory '" + outPath + 
+										"' to allow overwrite by result file", e);
 							}
 						}
 					}
 					else {
 						// delete file
 						try {
-							this.delete(outPath, false);
+							delete(outPath, false);
 						}
 						catch (IOException e) {
-							// Some other thread might already have deleted the file.
-							// If - for some other reason - the file could not be deleted,
-							// the error will be handled later.
-							lastError = e;
+							throw new IOException("Could not remove existing file '" + outPath +
+									"' to allow overwrite by result file/directory", e);
 						}
 					}
 					break;
+
 				default:
 					throw new IllegalArgumentException("Invalid write mode: " + writeMode);
 				}
@@ -644,54 +671,26 @@ public abstract class FileSystem {
 
 			if (createDirectory) {
 				// Output directory needs to be created
-
-				try {
-					if (!this.exists(outPath)) {
-						this.mkdirs(outPath);
-					}
-				}
-				catch (IOException e) {
-					// Some other thread might already have created the directory concurrently.
-					lastError = e;
+				if (!exists(outPath)) {
+					mkdirs(outPath);
 				}
 
 				// double check that the output directory exists
 				try {
-					FileStatus check = getFileStatus(outPath);
-					if (check != null) {
-						if (check.isDir()) {
-							return true;
-						}
-						else {
-							lastError = new IOException("FileSystem should create an output directory, but the
path points to a file instead.");
-						}
-					}
-					// fall through the loop
+					return getFileStatus(outPath).isDir();
 				}
 				catch (FileNotFoundException e) {
-					// fall though the loop
+					return false;
 				}
-
 			}
 			else {
-				// check that the output path does not exist and an output file can be created by the
output format.
-				return !this.exists(outPath);
-			}
-
-			// small delay to allow changes to make progress
-			try {
-				Thread.sleep(10);
-			}
-			catch (InterruptedException e) {
-				throw new IOException("Thread was interrupted");
+				// check that the output path does not exist and an output file
+				// can be created by the output format.
+				return !exists(outPath);
 			}
 		}
-		while (System.currentTimeMillis() < deadline);
-
-		if (lastError != null) {
-			throw new IOException("File system failed to prepare output path " + outPath + " with
write mode " + writeMode.name(), lastError);
-		} else {
-			return false;
+		finally {
+			OUTPUT_DIRECTORY_INIT_LOCK.unlock();
 		}
 	}
 
@@ -716,58 +715,87 @@ public abstract class FileSystem {
 	 * @param outPath Output path that should be prepared.
 	 * @param writeMode Write mode to consider.
 	 * @param createDirectory True, to initialize a directory at the given path, false otherwise.
+	 *    
 	 * @return True, if the path was successfully prepared, false otherwise.
-	 * @throws IOException
+	 * 
+	 * @throws IOException Thrown, if any of the file system access operations failed.
 	 */
 	public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory)
throws IOException {
-		if (!this.isDistributedFS()) {
+		if (!isDistributedFS()) {
 			return false;
 		}
 
-		// check if path exists
-		if (this.exists(outPath)) {
-			// path exists, check write mode
-			switch(writeMode) {
-			case NO_OVERWRITE:
-				// file or directory may not be overwritten
-				throw new IOException("File or directory already exists. Existing files and directories
are not overwritten in " +
-						WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
-							" mode to overwrite existing files and directories.");
-			case OVERWRITE:
-				// output path exists. We delete it and all contained files in case of a directory.
-				try {
-					this.delete(outPath, true);
-				} catch(IOException ioe) {
-					// Some other thread might already have deleted the path.
-					// If - for some other reason - the path could not be deleted,
-					// this will be handled later.
-				}
-				break;
-			default:
-				throw new IllegalArgumentException("Invalid write mode: "+writeMode);
-			}
+		// NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
+		// concurrently work in this method (multiple output formats writing locally) might end
+		// up deleting each other's directories and leave non-retrievable files, without necessarily
+		// causing an exception. That results in very subtle issues, like output files looking
as if
+		// they are not getting created.
+
+		// we acquire the lock interruptibly here, to make sure that concurrent threads waiting
+		// here can cancel faster
+		try {
+			OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
 		}
+		catch (InterruptedException e) {
+			// restore the interruption state
+			Thread.currentThread().interrupt();
 
-		if (createDirectory) {
-			// Output directory needs to be created
-			try {
-				if (!this.exists(outPath)) {
-					this.mkdirs(outPath);
+			// leave the method - we don't have the lock anyways 
+			throw new IOException("The thread was interrupted while trying to initialize the output
directory");
+		}
+
+		try {
+			// check if path exists
+			if (exists(outPath)) {
+				// path exists, check write mode
+				switch(writeMode) {
+	
+				case NO_OVERWRITE:
+					// file or directory may not be overwritten
+					throw new IOException("File or directory already exists. Existing files and directories
are not overwritten in " +
+							WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
+								" mode to overwrite existing files and directories.");
+	
+				case OVERWRITE:
+					// output path exists. We delete it and all contained files in case of a directory.
+					try {
+						delete(outPath, true);
+					} catch (IOException e) {
+						// Some other thread might already have deleted the path.
+						// If - for some other reason - the path could not be deleted,
+						// this will be handled later.
+					}
+					break;
+	
+				default:
+					throw new IllegalArgumentException("Invalid write mode: "+writeMode);
 				}
-			} catch(IOException ioe) {
-				// Some other thread might already have created the directory.
-				// If - for some other reason - the directory could not be created  
-				// and the path does not exist, this will be handled later.
 			}
-
-			// double check that the output directory exists
-			return this.exists(outPath) && this.getFileStatus(outPath).isDir();
-		} else {
-
-			// check that the output path does not exist and an output file can be created by the
output format.
-			return !this.exists(outPath);
+	
+			if (createDirectory) {
+				// Output directory needs to be created
+				try {
+					if (!exists(outPath)) {
+						mkdirs(outPath);
+					}
+				} catch (IOException ioe) {
+					// Some other thread might already have created the directory.
+					// If - for some other reason - the directory could not be created  
+					// and the path does not exist, this will be handled later.
+				}
+	
+				// double check that the output directory exists
+				return exists(outPath) && getFileStatus(outPath).isDir();
+			}
+			else {
+				// single file case: check that the output path does not exist and
+				// an output file can be created by the output format.
+				return !exists(outPath);
+			}
+		}
+		finally {
+			OUTPUT_DIRECTORY_INIT_LOCK.unlock();
 		}
-
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
index c3b793d..5cc011b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
@@ -19,7 +19,6 @@
 package org.apache.flink.core.fs.local;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
@@ -33,12 +32,8 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 @Internal
 public class LocalDataOutputStream extends FSDataOutputStream {
 
-	private static final int MAX_OPEN_TRIES = 3;
-	
-	/**
-	 * The file output stream used to write data.
-	 */
-	private FileOutputStream fos;
+	/** The file output stream used to write data.*/
+	private final FileOutputStream fos;
 
 	/**
 	 * Constructs a new <code>LocalDataOutputStream</code> object from a given {@link
File} object.
@@ -49,20 +44,7 @@ public class LocalDataOutputStream extends FSDataOutputStream {
 	 *         thrown if the data output stream cannot be created
 	 */
 	public LocalDataOutputStream(final File file) throws IOException {
-		// we allow multiple tries to create the file, to increase resilience against spurious
I/O failures
-		
-		FileNotFoundException lastException = null;
-		
-		for (int attempt = 0; attempt < MAX_OPEN_TRIES; attempt++) {
-			try {
-				this.fos = new FileOutputStream(file);
-				return;
-			}
-			catch (FileNotFoundException e) {
-				lastException = e;
-			}
-		}
-		throw lastException;
+		this.fos = new FileOutputStream(file);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
new file mode 100644
index 0000000..c332324
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.powermock.api.mockito.PowerMockito.*;
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(LocalFileSystem.class)
+public class InitOutputPathTest {
+
+	@Rule
+	public final TemporaryFolder tempDir = new TemporaryFolder();
+
+	/**
+	 * This test validates that this test case makes sense - that the error can be produced
+	 * in the absence of synchronization, if the threads make progress in a certain way,
+	 * here enforced by latches.
+	 */
+	@Test
+	public void testErrorOccursUnSynchronized() throws Exception {
+		// deactivate the lock to produce the original un-synchronized state
+		Field lock = FileSystem.class.getDeclaredField("OUTPUT_DIRECTORY_INIT_LOCK");
+		lock.setAccessible(true);
+		lock.set(null, new NoOpLock());
+
+		try {
+			// in the original un-synchronized state, we can force the race to occur by using
+			// the proper latch order to control the process of the concurrent threads
+			runTest(true);
+			fail("should fail with an exception");
+		}
+		catch (FileNotFoundException e) {
+			// expected
+		}
+		finally {
+			// reset the proper value
+			lock.set(null, new ReentrantLock(true));
+		}
+	}
+
+	@Test
+	public void testProperSynchronized() throws Exception {
+		// in the synchronized variant, we cannot use the "await latches" because not 
+		// both threads can make process interleaved (due to the synchronization)
+		// the test uses sleeps (rather than latches) to produce the same interleaving.
+		// while that is not guaranteed to produce the pathological interleaving,
+		// it helps to provoke it very often. together with validating that this order
+		// is in fact pathological (see testErrorOccursUnSynchronized()), this gives
+		// a rather confident guard
+		runTest(false);
+	}
+
+	private void runTest(final boolean useAwaits) throws Exception {
+		final File tempFile = tempDir.newFile();
+		final Path path1 = new Path(tempFile.getAbsolutePath(), "1");
+		final Path path2 = new Path(tempFile.getAbsolutePath(), "2");
+
+		final OneShotLatch deleteAwaitLatch1 = new OneShotLatch();
+		final OneShotLatch deleteAwaitLatch2 = new OneShotLatch();
+		final OneShotLatch mkdirsAwaitLatch1 = new OneShotLatch();
+		final OneShotLatch mkdirsAwaitLatch2 = new OneShotLatch();
+
+		final OneShotLatch deleteTriggerLatch1 = new OneShotLatch();
+		final OneShotLatch deletetriggerLatch2 = new OneShotLatch();
+		final OneShotLatch mkdirsTriggerLatch1 = new OneShotLatch();
+		final OneShotLatch mkdirsTriggerLatch2 = new OneShotLatch();
+
+		final OneShotLatch createAwaitLatch = new OneShotLatch();
+		final OneShotLatch createTriggerLatch = new OneShotLatch();
+
+		// this "new LocalDataOutputStream()" is in the end called by the async threads
+		whenNew(LocalDataOutputStream.class).withAnyArguments().thenAnswer(new Answer<LocalDataOutputStream>()
{
+
+			@Override
+			public LocalDataOutputStream answer(InvocationOnMock invocation) throws Throwable {
+				createAwaitLatch.trigger();
+				createTriggerLatch.await();
+
+				final File file = (File) invocation.getArguments()[0];
+				return new LocalDataOutputStream(file);
+			}
+		});
+
+		final LocalFileSystem fs1 = new SyncedFileSystem(
+				deleteAwaitLatch1, mkdirsAwaitLatch1, 
+				deleteTriggerLatch1, mkdirsTriggerLatch1);
+
+		final LocalFileSystem fs2 = new SyncedFileSystem(
+				deleteAwaitLatch2, mkdirsAwaitLatch2,
+				deletetriggerLatch2, mkdirsTriggerLatch2);
+
+		// start the concurrent file creators
+		FileCreator thread1 = new FileCreator(fs1, path1);
+		FileCreator thread2 = new FileCreator(fs2, path2);
+		thread1.start();
+		thread2.start();
+
+		// wait until they both decide to delete the directory
+		if (useAwaits) {
+			deleteAwaitLatch1.await();
+			deleteAwaitLatch2.await();
+		} else {
+			Thread.sleep(5);
+		}
+
+		// now send off #1 to delete the directory (it will pass the 'mkdirs' fast) and wait to
create the file
+		mkdirsTriggerLatch1.trigger();
+		deleteTriggerLatch1.trigger();
+
+		if (useAwaits) {
+			createAwaitLatch.await();
+		} else {
+			// this needs a bit more sleep time, because here mockito is working
+			Thread.sleep(100);
+		}
+
+		// now send off #2 to delete the directory - it waits at 'mkdirs'
+		deletetriggerLatch2.trigger();
+		if (useAwaits) {
+			mkdirsAwaitLatch2.await();
+		} else {
+			Thread.sleep(5);
+		}
+
+		// let #1 try to create the file and see if it succeeded
+		createTriggerLatch.trigger();
+		if (useAwaits) {
+			thread1.sync();
+		} else {
+			Thread.sleep(5);
+		}
+
+		// now let #1 finish up
+		mkdirsTriggerLatch2.trigger();
+
+		thread1.sync();
+		thread2.sync();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class FileCreator extends CheckedThread {
+
+		private final FileSystem fs;
+		private final Path path;
+
+		FileCreator(FileSystem fs, Path path) {
+			this.fs = fs;
+			this.path = path;
+		}
+
+		@Override
+		public void go() throws Exception {
+			fs.initOutPathLocalFS(path.getParent(), WriteMode.OVERWRITE, true);
+			try (FSDataOutputStream out = fs.create(path, true)) {
+				out.write(11);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class SyncedFileSystem extends LocalFileSystem {
+
+		private final OneShotLatch deleteTriggerLatch;
+		private final OneShotLatch mkdirsTriggerLatch;
+
+		private final OneShotLatch deleteAwaitLatch;
+		private final OneShotLatch mkdirsAwaitLatch;
+
+		SyncedFileSystem(
+				OneShotLatch deleteTriggerLatch,
+				OneShotLatch mkdirsTriggerLatch,
+				OneShotLatch deleteAwaitLatch,
+				OneShotLatch mkdirsAwaitLatch) {
+
+			this.deleteTriggerLatch = deleteTriggerLatch;
+			this.mkdirsTriggerLatch = mkdirsTriggerLatch;
+			this.deleteAwaitLatch = deleteAwaitLatch;
+			this.mkdirsAwaitLatch = mkdirsAwaitLatch;
+		}
+
+		@Override
+		public boolean delete(Path f, boolean recursive) throws IOException {
+			deleteTriggerLatch.trigger();
+			try {
+				deleteAwaitLatch.await();
+			}
+			catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				throw new IOException("interrupted");
+			}
+
+			return super.delete(f, recursive);
+		}
+
+		@Override
+		public boolean mkdirs(Path f) throws IOException {
+			mkdirsTriggerLatch.trigger();
+			try {
+				mkdirsAwaitLatch.await();
+			}
+			catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				throw new IOException("interrupted");
+			}
+
+			return super.mkdirs(f);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class NoOpLock extends ReentrantLock {
+
+		@Override
+		public void lock() {}
+
+		@Override
+		public void lockInterruptibly() {}
+
+		@Override
+		public void unlock() {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index b3d86e5..d2eeb04 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -116,4 +116,9 @@ public final class OneShotLatch {
 			triggered = false;
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "Latch " + (triggered ? "TRIGGERED" : "PENDING");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 251c465..8431226 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -309,43 +309,23 @@ public class TestBaseUtils extends TestLogger {
 			String resultPath,
 			String[] excludePrefixes) throws Exception {
 
-		// because of some strange I/O inconsistency effects on CI infrastructure, we need
-		// to retry this a few times
-		final int numAttempts = 5;
-		int attempt = 0;
-		while (true) {
-			try {
-				ArrayList<String> list = new ArrayList<>();
-				readAllResultLines(list, resultPath, excludePrefixes, false);
-
-				String[] result = list.toArray(new String[list.size()]);
-				Arrays.sort(result);
-
-				String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
-				Arrays.sort(expected);
-
-				if (expected.length != result.length || !Arrays.deepEquals(expected, result)) {
-					String msg = String.format(
-							"Different elements in arrays: expected %d elements and received %d\n" +
-							"files: %s\n expected: %s\n received: %s",
-							expected.length, result.length, 
-							Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), 
-							Arrays.toString(expected), Arrays.toString(result));
-					fail(msg);
-				}
+		ArrayList<String> list = new ArrayList<>();
+		readAllResultLines(list, resultPath, excludePrefixes, false);
 
-				break;
-			}
-			catch (AssertionError e) {
-				if (++attempt > numAttempts) {
-					throw e;
-				}
+		String[] result = list.toArray(new String[list.size()]);
+		Arrays.sort(result);
 
-				// else wait, then fall through the loop and try again
-				// on normal setups, this should change nothing, but it seems to help the
-				// Travis CI container infrastructure
-				Thread.sleep(100);
-			}
+		String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
+		Arrays.sort(expected);
+
+		if (expected.length != result.length || !Arrays.deepEquals(expected, result)) {
+			String msg = String.format(
+					"Different elements in arrays: expected %d elements and received %d\n" +
+					"files: %s\n expected: %s\n received: %s",
+					expected.length, result.length, 
+					Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), 
+					Arrays.toString(expected), Arrays.toString(result));
+			fail(msg);
 		}
 	}
 


Mime
View raw message