Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 24182200BDC for ; Wed, 14 Dec 2016 13:10:11 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 22C66160B19; Wed, 14 Dec 2016 12:10:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A50C0160B13 for ; Wed, 14 Dec 2016 13:10:09 +0100 (CET) Received: (qmail 69996 invoked by uid 500); 14 Dec 2016 12:10:08 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 69983 invoked by uid 99); 14 Dec 2016 12:10:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Dec 2016 12:10:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7E588F16A3; Wed, 14 Dec 2016 12:10:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Wed, 14 Dec 2016 12:10:08 -0000 Message-Id: <9135ed50ed924571b3e16e7186f5ae40@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/12] flink git commit: [FLINK-5332] [core] Synchronize FileSystem::initOutPathLocalFS() to prevent lost files when called concurrently. archived-at: Wed, 14 Dec 2016 12:10:11 -0000 Repository: flink Updated Branches: refs/heads/master 6cfc841b5 -> e4c767a37 [FLINK-5332] [core] Synchronize FileSystem::initOutPathLocalFS() to prevent lost files when called concurrently. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f3ad58b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f3ad58b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f3ad58b Branch: refs/heads/master Commit: 2f3ad58b7b73463aa1827baef0eb2e9d87fdb882 Parents: 790153c Author: Stephan Ewen Authored: Tue Dec 13 19:12:12 2016 +0100 Committer: Stephan Ewen Committed: Wed Dec 14 12:43:32 2016 +0100 ---------------------------------------------------------------------- .../org/apache/flink/core/fs/FileSystem.java | 268 ++++++++++--------- .../core/fs/local/LocalDataOutputStream.java | 24 +- .../flink/core/fs/InitOutputPathTest.java | 265 ++++++++++++++++++ .../flink/core/testutils/OneShotLatch.java | 5 + .../apache/flink/test/util/TestBaseUtils.java | 50 ++-- 5 files changed, 436 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 3dced6f..e6313aa 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -44,6 +44,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -86,6 +87,12 @@ public abstract class FileSystem { private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class); + /** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and + * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */ + private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true); + + // ------------------------------------------------------------------------ + /** * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning of the task's * main thread. @@ -537,23 +544,37 @@ public abstract class FileSystem { /** * Initializes output directories on local file systems according to the given write mode. * - * WriteMode.NO_OVERWRITE & parallel output: - * - A directory is created if the output path does not exist. - * - An existing directory is reused, files contained in the directory are NOT deleted. - * - An existing file raises an exception. + *
    + *
  • WriteMode.NO_OVERWRITE & parallel output: + *
      + *
    • A directory is created if the output path does not exist.
    • + *
    • An existing directory is reused, files contained in the directory are NOT deleted.
    • + *
    • An existing file raises an exception.
    • + *
    + *
  • * - * WriteMode.NO_OVERWRITE & NONE parallel output: - * - An existing file or directory raises an exception. + *
  • WriteMode.NO_OVERWRITE & NONE parallel output: + *
      + *
    • An existing file or directory raises an exception.
    • + *
    + *
  • * - * WriteMode.OVERWRITE & parallel output: - * - A directory is created if the output path does not exist. - * - An existing directory is reused, files contained in the directory are NOT deleted. - * - An existing file is deleted and replaced by a new directory. - * - * WriteMode.OVERWRITE & NONE parallel output: - * - An existing file or directory (and all its content) is deleted + *
  • WriteMode.OVERWRITE & parallel output: + *
      + *
    • A directory is created if the output path does not exist.
    • + *
    • An existing directory is reused, files contained in the directory are NOT deleted.
    • + *
    • An existing file is deleted and replaced by a new directory.
    • + *
    + *
  • * - * Files contained in an existing directory are not deleted, because multiple instances of a + *
  • WriteMode.OVERWRITE & NONE parallel output: + *
      + *
    • An existing file or directory (and all its content) is deleted
    • + *
    + *
  • + *
+ * + *

Files contained in an existing directory are not deleted, because multiple instances of a * DataSinkTask might call this function at the same time and hence might perform concurrent * delete operations on the file system (possibly deleting output files of concurrently running tasks). * Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create @@ -561,48 +582,58 @@ public abstract class FileSystem { * * @param outPath Output path that should be prepared. * @param writeMode Write mode to consider. - * @param createDirectory True, to initialize a directory at the given path, false otherwise. + * @param createDirectory True, to initialize a directory at the given path, false to prepare space for a file. + * * @return True, if the path was successfully prepared, false otherwise. - * @throws IOException + * @throws IOException Thrown, if any of the file system access operations failed. */ public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { - if (this.isDistributedFS()) { + if (isDistributedFS()) { return false; } - // NOTE: we sometimes see this code block fail due to a races when changes to the file system take small time fractions before being - // visible to other threads. for example: - // - the check whether the directory exists returns false - // - the call to create the directory fails (some concurrent thread is creating the directory, locked) - // - the call to check whether the directory exists does not yet see the new directory (change is not committed) + // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that + // concurrently work in this method (multiple output formats writing locally) might end + // up deleting each other's directories and leave non-retrievable files, without necessarily + // causing an exception. That results in very subtle issues, like output files looking as if + // they are not getting created. - // try for 30 seconds - final long now = System.currentTimeMillis(); - final long deadline = now + 30000; + // we acquire the lock interruptibly here, to make sure that concurrent threads waiting + // here can cancel faster + try { + OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly(); + } + catch (InterruptedException e) { + // restore the interruption state + Thread.currentThread().interrupt(); - Exception lastError = null; + // leave the method - we don't have the lock anyways + throw new IOException("The thread was interrupted while trying to initialize the output directory"); + } - do { - FileStatus status = null; + try { + FileStatus status; try { status = getFileStatus(outPath); } catch (FileNotFoundException e) { // okay, the file is not there + status = null; } // check if path exists if (status != null) { // path exists, check write mode switch (writeMode) { + case NO_OVERWRITE: if (status.isDir() && createDirectory) { return true; } else { // file may not be overwritten - throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + - WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + - " mode to overwrite existing files and directories."); + throw new IOException("File or directory already exists. Existing files and directories " + + "are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " + + WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories."); } case OVERWRITE: @@ -612,31 +643,27 @@ public abstract class FileSystem { return true; } else { // we will write in a single file, delete directory - // (there is also no other thread trying to delete the directory, since there is only one writer). try { - this.delete(outPath, true); + delete(outPath, true); } catch (IOException e) { - // due to races in some file systems, it may spuriously occur that a deleted the file looks - // as if it still exists and is gone a millisecond later, once the change is committed - // we ignore the exception, possibly fall through the loop later - lastError = e; + throw new IOException("Could not remove existing directory '" + outPath + + "' to allow overwrite by result file", e); } } } else { // delete file try { - this.delete(outPath, false); + delete(outPath, false); } catch (IOException e) { - // Some other thread might already have deleted the file. - // If - for some other reason - the file could not be deleted, - // the error will be handled later. - lastError = e; + throw new IOException("Could not remove existing file '" + outPath + + "' to allow overwrite by result file/directory", e); } } break; + default: throw new IllegalArgumentException("Invalid write mode: " + writeMode); } @@ -644,54 +671,26 @@ public abstract class FileSystem { if (createDirectory) { // Output directory needs to be created - - try { - if (!this.exists(outPath)) { - this.mkdirs(outPath); - } - } - catch (IOException e) { - // Some other thread might already have created the directory concurrently. - lastError = e; + if (!exists(outPath)) { + mkdirs(outPath); } // double check that the output directory exists try { - FileStatus check = getFileStatus(outPath); - if (check != null) { - if (check.isDir()) { - return true; - } - else { - lastError = new IOException("FileSystem should create an output directory, but the path points to a file instead."); - } - } - // fall through the loop + return getFileStatus(outPath).isDir(); } catch (FileNotFoundException e) { - // fall though the loop + return false; } - } else { - // check that the output path does not exist and an output file can be created by the output format. - return !this.exists(outPath); - } - - // small delay to allow changes to make progress - try { - Thread.sleep(10); - } - catch (InterruptedException e) { - throw new IOException("Thread was interrupted"); + // check that the output path does not exist and an output file + // can be created by the output format. + return !exists(outPath); } } - while (System.currentTimeMillis() < deadline); - - if (lastError != null) { - throw new IOException("File system failed to prepare output path " + outPath + " with write mode " + writeMode.name(), lastError); - } else { - return false; + finally { + OUTPUT_DIRECTORY_INIT_LOCK.unlock(); } } @@ -716,58 +715,87 @@ public abstract class FileSystem { * @param outPath Output path that should be prepared. * @param writeMode Write mode to consider. * @param createDirectory True, to initialize a directory at the given path, false otherwise. + * * @return True, if the path was successfully prepared, false otherwise. - * @throws IOException + * + * @throws IOException Thrown, if any of the file system access operations failed. */ public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { - if (!this.isDistributedFS()) { + if (!isDistributedFS()) { return false; } - // check if path exists - if (this.exists(outPath)) { - // path exists, check write mode - switch(writeMode) { - case NO_OVERWRITE: - // file or directory may not be overwritten - throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + - WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + - " mode to overwrite existing files and directories."); - case OVERWRITE: - // output path exists. We delete it and all contained files in case of a directory. - try { - this.delete(outPath, true); - } catch(IOException ioe) { - // Some other thread might already have deleted the path. - // If - for some other reason - the path could not be deleted, - // this will be handled later. - } - break; - default: - throw new IllegalArgumentException("Invalid write mode: "+writeMode); - } + // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that + // concurrently work in this method (multiple output formats writing locally) might end + // up deleting each other's directories and leave non-retrievable files, without necessarily + // causing an exception. That results in very subtle issues, like output files looking as if + // they are not getting created. + + // we acquire the lock interruptibly here, to make sure that concurrent threads waiting + // here can cancel faster + try { + OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly(); } + catch (InterruptedException e) { + // restore the interruption state + Thread.currentThread().interrupt(); - if (createDirectory) { - // Output directory needs to be created - try { - if (!this.exists(outPath)) { - this.mkdirs(outPath); + // leave the method - we don't have the lock anyways + throw new IOException("The thread was interrupted while trying to initialize the output directory"); + } + + try { + // check if path exists + if (exists(outPath)) { + // path exists, check write mode + switch(writeMode) { + + case NO_OVERWRITE: + // file or directory may not be overwritten + throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + + WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + + " mode to overwrite existing files and directories."); + + case OVERWRITE: + // output path exists. We delete it and all contained files in case of a directory. + try { + delete(outPath, true); + } catch (IOException e) { + // Some other thread might already have deleted the path. + // If - for some other reason - the path could not be deleted, + // this will be handled later. + } + break; + + default: + throw new IllegalArgumentException("Invalid write mode: "+writeMode); } - } catch(IOException ioe) { - // Some other thread might already have created the directory. - // If - for some other reason - the directory could not be created - // and the path does not exist, this will be handled later. } - - // double check that the output directory exists - return this.exists(outPath) && this.getFileStatus(outPath).isDir(); - } else { - - // check that the output path does not exist and an output file can be created by the output format. - return !this.exists(outPath); + + if (createDirectory) { + // Output directory needs to be created + try { + if (!exists(outPath)) { + mkdirs(outPath); + } + } catch (IOException ioe) { + // Some other thread might already have created the directory. + // If - for some other reason - the directory could not be created + // and the path does not exist, this will be handled later. + } + + // double check that the output directory exists + return exists(outPath) && getFileStatus(outPath).isDir(); + } + else { + // single file case: check that the output path does not exist and + // an output file can be created by the output format. + return !exists(outPath); + } + } + finally { + OUTPUT_DIRECTORY_INIT_LOCK.unlock(); } - } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java index c3b793d..5cc011b 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java @@ -19,7 +19,6 @@ package org.apache.flink.core.fs.local; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; @@ -33,12 +32,8 @@ import org.apache.flink.core.fs.FSDataOutputStream; @Internal public class LocalDataOutputStream extends FSDataOutputStream { - private static final int MAX_OPEN_TRIES = 3; - - /** - * The file output stream used to write data. - */ - private FileOutputStream fos; + /** The file output stream used to write data.*/ + private final FileOutputStream fos; /** * Constructs a new LocalDataOutputStream object from a given {@link File} object. @@ -49,20 +44,7 @@ public class LocalDataOutputStream extends FSDataOutputStream { * thrown if the data output stream cannot be created */ public LocalDataOutputStream(final File file) throws IOException { - // we allow multiple tries to create the file, to increase resilience against spurious I/O failures - - FileNotFoundException lastException = null; - - for (int attempt = 0; attempt < MAX_OPEN_TRIES; attempt++) { - try { - this.fos = new FileOutputStream(file); - return; - } - catch (FileNotFoundException e) { - lastException = e; - } - } - throw lastException; + this.fos = new FileOutputStream(file); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java new file mode 100644 index 0000000..c332324 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.fs.local.LocalDataOutputStream; +import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.core.testutils.OneShotLatch; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.concurrent.locks.ReentrantLock; + +import static org.powermock.api.mockito.PowerMockito.*; +import static org.junit.Assert.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(LocalFileSystem.class) +public class InitOutputPathTest { + + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + + /** + * This test validates that this test case makes sense - that the error can be produced + * in the absence of synchronization, if the threads make progress in a certain way, + * here enforced by latches. + */ + @Test + public void testErrorOccursUnSynchronized() throws Exception { + // deactivate the lock to produce the original un-synchronized state + Field lock = FileSystem.class.getDeclaredField("OUTPUT_DIRECTORY_INIT_LOCK"); + lock.setAccessible(true); + lock.set(null, new NoOpLock()); + + try { + // in the original un-synchronized state, we can force the race to occur by using + // the proper latch order to control the process of the concurrent threads + runTest(true); + fail("should fail with an exception"); + } + catch (FileNotFoundException e) { + // expected + } + finally { + // reset the proper value + lock.set(null, new ReentrantLock(true)); + } + } + + @Test + public void testProperSynchronized() throws Exception { + // in the synchronized variant, we cannot use the "await latches" because not + // both threads can make process interleaved (due to the synchronization) + // the test uses sleeps (rather than latches) to produce the same interleaving. + // while that is not guaranteed to produce the pathological interleaving, + // it helps to provoke it very often. together with validating that this order + // is in fact pathological (see testErrorOccursUnSynchronized()), this gives + // a rather confident guard + runTest(false); + } + + private void runTest(final boolean useAwaits) throws Exception { + final File tempFile = tempDir.newFile(); + final Path path1 = new Path(tempFile.getAbsolutePath(), "1"); + final Path path2 = new Path(tempFile.getAbsolutePath(), "2"); + + final OneShotLatch deleteAwaitLatch1 = new OneShotLatch(); + final OneShotLatch deleteAwaitLatch2 = new OneShotLatch(); + final OneShotLatch mkdirsAwaitLatch1 = new OneShotLatch(); + final OneShotLatch mkdirsAwaitLatch2 = new OneShotLatch(); + + final OneShotLatch deleteTriggerLatch1 = new OneShotLatch(); + final OneShotLatch deletetriggerLatch2 = new OneShotLatch(); + final OneShotLatch mkdirsTriggerLatch1 = new OneShotLatch(); + final OneShotLatch mkdirsTriggerLatch2 = new OneShotLatch(); + + final OneShotLatch createAwaitLatch = new OneShotLatch(); + final OneShotLatch createTriggerLatch = new OneShotLatch(); + + // this "new LocalDataOutputStream()" is in the end called by the async threads + whenNew(LocalDataOutputStream.class).withAnyArguments().thenAnswer(new Answer() { + + @Override + public LocalDataOutputStream answer(InvocationOnMock invocation) throws Throwable { + createAwaitLatch.trigger(); + createTriggerLatch.await(); + + final File file = (File) invocation.getArguments()[0]; + return new LocalDataOutputStream(file); + } + }); + + final LocalFileSystem fs1 = new SyncedFileSystem( + deleteAwaitLatch1, mkdirsAwaitLatch1, + deleteTriggerLatch1, mkdirsTriggerLatch1); + + final LocalFileSystem fs2 = new SyncedFileSystem( + deleteAwaitLatch2, mkdirsAwaitLatch2, + deletetriggerLatch2, mkdirsTriggerLatch2); + + // start the concurrent file creators + FileCreator thread1 = new FileCreator(fs1, path1); + FileCreator thread2 = new FileCreator(fs2, path2); + thread1.start(); + thread2.start(); + + // wait until they both decide to delete the directory + if (useAwaits) { + deleteAwaitLatch1.await(); + deleteAwaitLatch2.await(); + } else { + Thread.sleep(5); + } + + // now send off #1 to delete the directory (it will pass the 'mkdirs' fast) and wait to create the file + mkdirsTriggerLatch1.trigger(); + deleteTriggerLatch1.trigger(); + + if (useAwaits) { + createAwaitLatch.await(); + } else { + // this needs a bit more sleep time, because here mockito is working + Thread.sleep(100); + } + + // now send off #2 to delete the directory - it waits at 'mkdirs' + deletetriggerLatch2.trigger(); + if (useAwaits) { + mkdirsAwaitLatch2.await(); + } else { + Thread.sleep(5); + } + + // let #1 try to create the file and see if it succeeded + createTriggerLatch.trigger(); + if (useAwaits) { + thread1.sync(); + } else { + Thread.sleep(5); + } + + // now let #1 finish up + mkdirsTriggerLatch2.trigger(); + + thread1.sync(); + thread2.sync(); + } + + // ------------------------------------------------------------------------ + + private static class FileCreator extends CheckedThread { + + private final FileSystem fs; + private final Path path; + + FileCreator(FileSystem fs, Path path) { + this.fs = fs; + this.path = path; + } + + @Override + public void go() throws Exception { + fs.initOutPathLocalFS(path.getParent(), WriteMode.OVERWRITE, true); + try (FSDataOutputStream out = fs.create(path, true)) { + out.write(11); + } + } + } + + // ------------------------------------------------------------------------ + + private static class SyncedFileSystem extends LocalFileSystem { + + private final OneShotLatch deleteTriggerLatch; + private final OneShotLatch mkdirsTriggerLatch; + + private final OneShotLatch deleteAwaitLatch; + private final OneShotLatch mkdirsAwaitLatch; + + SyncedFileSystem( + OneShotLatch deleteTriggerLatch, + OneShotLatch mkdirsTriggerLatch, + OneShotLatch deleteAwaitLatch, + OneShotLatch mkdirsAwaitLatch) { + + this.deleteTriggerLatch = deleteTriggerLatch; + this.mkdirsTriggerLatch = mkdirsTriggerLatch; + this.deleteAwaitLatch = deleteAwaitLatch; + this.mkdirsAwaitLatch = mkdirsAwaitLatch; + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + deleteTriggerLatch.trigger(); + try { + deleteAwaitLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("interrupted"); + } + + return super.delete(f, recursive); + } + + @Override + public boolean mkdirs(Path f) throws IOException { + mkdirsTriggerLatch.trigger(); + try { + mkdirsAwaitLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("interrupted"); + } + + return super.mkdirs(f); + } + } + + // ------------------------------------------------------------------------ + + @SuppressWarnings("serial") + private static final class NoOpLock extends ReentrantLock { + + @Override + public void lock() {} + + @Override + public void lockInterruptibly() {} + + @Override + public void unlock() {} + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java index b3d86e5..d2eeb04 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java @@ -116,4 +116,9 @@ public final class OneShotLatch { triggered = false; } } + + @Override + public String toString() { + return "Latch " + (triggered ? "TRIGGERED" : "PENDING"); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2f3ad58b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 251c465..8431226 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -309,43 +309,23 @@ public class TestBaseUtils extends TestLogger { String resultPath, String[] excludePrefixes) throws Exception { - // because of some strange I/O inconsistency effects on CI infrastructure, we need - // to retry this a few times - final int numAttempts = 5; - int attempt = 0; - while (true) { - try { - ArrayList list = new ArrayList<>(); - readAllResultLines(list, resultPath, excludePrefixes, false); - - String[] result = list.toArray(new String[list.size()]); - Arrays.sort(result); - - String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n"); - Arrays.sort(expected); - - if (expected.length != result.length || !Arrays.deepEquals(expected, result)) { - String msg = String.format( - "Different elements in arrays: expected %d elements and received %d\n" + - "files: %s\n expected: %s\n received: %s", - expected.length, result.length, - Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), - Arrays.toString(expected), Arrays.toString(result)); - fail(msg); - } + ArrayList list = new ArrayList<>(); + readAllResultLines(list, resultPath, excludePrefixes, false); - break; - } - catch (AssertionError e) { - if (++attempt > numAttempts) { - throw e; - } + String[] result = list.toArray(new String[list.size()]); + Arrays.sort(result); - // else wait, then fall through the loop and try again - // on normal setups, this should change nothing, but it seems to help the - // Travis CI container infrastructure - Thread.sleep(100); - } + String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n"); + Arrays.sort(expected); + + if (expected.length != result.length || !Arrays.deepEquals(expected, result)) { + String msg = String.format( + "Different elements in arrays: expected %d elements and received %d\n" + + "files: %s\n expected: %s\n received: %s", + expected.length, result.length, + Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), + Arrays.toString(expected), Arrays.toString(result)); + fail(msg); } }