Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DED6D200C86 for ; Wed, 17 May 2017 08:19:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DD3E2160BC1; Wed, 17 May 2017 06:19:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 60E81160BD2 for ; Wed, 17 May 2017 08:19:58 +0200 (CEST) Received: (qmail 54576 invoked by uid 500); 17 May 2017 06:19:57 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 54384 invoked by uid 99); 17 May 2017 06:19:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 May 2017 06:19:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4D1ACE115D; Wed, 17 May 2017 06:19:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Wed, 17 May 2017 06:20:01 -0000 Message-Id: <35edd3d2a9024f6ebdb91c73031f1ce6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/5] flink git commit: [FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation and deletion archived-at: Wed, 17 May 2017 06:20:00 -0000 [FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation and deletion This commit introduces a BlobServer#readWriteLock in order to synchronize file creation and deletion operations in BlobServerConnection and BlobServer. This will prevent that multiple put and get operations interfere with each other and with get operations. The get operations are synchronized using the read lock in order to guarantee some kind of parallelism. Add Get and Delete operation tests This closes #3888. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ad489d8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ad489d8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ad489d8 Branch: refs/heads/master Commit: 7ad489d87281b74c53d3b1a0dd97e56b7a8ef303 Parents: 88b0f2a Author: Till Rohrmann Authored: Wed May 10 17:38:49 2017 +0200 Committer: Till Rohrmann Committed: Wed May 17 08:19:05 2017 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/blob/BlobClient.java | 2 +- .../apache/flink/runtime/blob/BlobServer.java | 29 ++- .../runtime/blob/BlobServerConnection.java | 240 +++++++++++++++---- .../runtime/blob/BlobServerDeleteTest.java | 73 +++++- .../flink/runtime/blob/BlobServerGetTest.java | 115 ++++++++- .../flink/runtime/blob/BlobServerPutTest.java | 109 ++++++++- .../src/test/resources/log4j-test.properties | 2 +- 7 files changed, 509 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index 49e54a1..fab3c5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -537,7 +537,7 @@ public final class BlobClient implements Closeable { throw new IOException("Server side error: " + cause.getMessage(), cause); } else { - throw new IOException("Unrecognized response"); + throw new IOException("Unrecognized response: " + response + '.'); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 937eab0..5ad4b6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -44,6 +44,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -85,6 +87,9 @@ public class BlobServer extends Thread implements BlobService { /** The maximum number of concurrent connections */ private final int maxConnections; + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + /** * Shutdown hook thread to ensure deletion of the storage directory (or null if * the configured high availability mode does not equal{@link HighAvailabilityMode#NONE}) @@ -104,6 +109,7 @@ public class BlobServer extends Thread implements BlobService { public BlobServer(Configuration config, BlobStore blobStore) throws IOException { this.blobServiceConfiguration = checkNotNull(config); this.blobStore = checkNotNull(blobStore); + this.readWriteLock = new ReentrantReadWriteLock(); // configure and create the storage directory String storageDirectory = config.getString(BlobServerOptions.STORAGE_DIRECTORY); @@ -235,6 +241,13 @@ public class BlobServer extends Thread implements BlobService { return blobStore; } + /** + * Returns the lock used to guard file accesses + */ + public ReadWriteLock getReadWriteLock() { + return readWriteLock; + } + @Override public void run() { try { @@ -395,13 +408,19 @@ public class BlobServer extends Thread implements BlobService { public void delete(BlobKey key) throws IOException { final File localFile = BlobUtils.getStorageLocation(storageDir, key); - if (localFile.exists()) { - if (!localFile.delete()) { - LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); + readWriteLock.writeLock().lock(); + + try { + if (localFile.exists()) { + if (!localFile.delete()) { + LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); + } } - } - blobStore.delete(key); + blobStore.delete(key); + } finally { + readWriteLock.writeLock().unlock(); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java index 13a90c6..a76dbd5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.blob; -import com.google.common.io.Files; import org.apache.flink.api.common.JobID; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; @@ -33,7 +32,11 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.SocketException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; import java.security.MessageDigest; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE; import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE; @@ -67,6 +70,12 @@ class BlobServerConnection extends Thread { /** The HA blob store. */ private final BlobStore blobStore; + /** Write lock to synchronize file accesses */ + private final Lock writeLock; + + /** Read lock to synchronize file accesses */ + private final Lock readLock; + /** * Creates a new BLOB connection for a client request * @@ -74,7 +83,7 @@ class BlobServerConnection extends Thread { * @param blobServer The BLOB server. */ BlobServerConnection(Socket clientSocket, BlobServer blobServer) { - super("BLOB connection for " + clientSocket.getRemoteSocketAddress().toString()); + super("BLOB connection for " + clientSocket.getRemoteSocketAddress()); setDaemon(true); if (blobServer == null) { @@ -84,6 +93,11 @@ class BlobServerConnection extends Thread { this.clientSocket = clientSocket; this.blobServer = blobServer; this.blobStore = blobServer.getBlobStore(); + + ReadWriteLock readWriteLock = blobServer.getReadWriteLock(); + + this.writeLock = readWriteLock.writeLock(); + this.readLock = readWriteLock.readLock(); } // -------------------------------------------------------------------------------------------- @@ -178,8 +192,13 @@ class BlobServerConnection extends Thread { */ File blobFile; + int contentAddressable = -1; + JobID jobId = null; + String key = null; + BlobKey blobKey = null; + try { - final int contentAddressable = inputStream.read(); + contentAddressable = inputStream.read(); if (contentAddressable < 0) { throw new EOFException("Premature end of GET request"); @@ -189,37 +208,18 @@ class BlobServerConnection extends Thread { byte[] jidBytes = new byte[JobID.SIZE]; readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); - JobID jobID = JobID.fromByteArray(jidBytes); - String key = readKey(buf, inputStream); - blobFile = this.blobServer.getStorageLocation(jobID, key); - - if (!blobFile.exists()) { - blobStore.get(jobID, key, blobFile); - } + jobId = JobID.fromByteArray(jidBytes); + key = readKey(buf, inputStream); + blobFile = blobServer.getStorageLocation(jobId, key); } else if (contentAddressable == CONTENT_ADDRESSABLE) { - final BlobKey key = BlobKey.readFromInputStream(inputStream); - blobFile = blobServer.getStorageLocation(key); - - if (!blobFile.exists()) { - blobStore.get(key, blobFile); - } + blobKey = BlobKey.readFromInputStream(inputStream); + blobFile = blobServer.getStorageLocation(blobKey); } else { - throw new IOException("Unknown type of BLOB addressing."); - } - - // Check if BLOB exists - if (!blobFile.exists()) { - throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath()); - } - - if (blobFile.length() > Integer.MAX_VALUE) { - throw new IOException("BLOB size exceeds the maximum size (2 GB)."); + throw new IOException("Unknown type of BLOB addressing: " + contentAddressable + '.'); } - outputStream.write(RETURN_OKAY); - // up to here, an error can give a good message } catch (Throwable t) { @@ -235,8 +235,58 @@ class BlobServerConnection extends Thread { return; } - // from here on, we started sending data, so all we can do is close the connection when something happens + readLock.lock(); + try { + try { + if (!blobFile.exists()) { + // first we have to release the read lock in order to acquire the write lock + readLock.unlock(); + writeLock.lock(); + + try { + if (blobFile.exists()) { + LOG.debug("Blob file {} has downloaded from the BlobStore by a different connection.", blobFile); + } else { + if (contentAddressable == NAME_ADDRESSABLE) { + blobStore.get(jobId, key, blobFile); + } else if (contentAddressable == CONTENT_ADDRESSABLE) { + blobStore.get(blobKey, blobFile); + } else { + throw new IOException("Unknown type of BLOB addressing: " + contentAddressable + '.'); + } + } + } finally { + writeLock.unlock(); + } + + readLock.lock(); + + // Check if BLOB exists + if (!blobFile.exists()) { + throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath()); + } + } + + if (blobFile.length() > Integer.MAX_VALUE) { + throw new IOException("BLOB size exceeds the maximum size (2 GB)."); + } + + outputStream.write(RETURN_OKAY); + } catch (Throwable t) { + LOG.error("GET operation failed", t); + try { + writeErrorToStream(outputStream, t); + } + catch (IOException e) { + // since we are in an exception case, it means not much that we could not send the error + // ignore this + } + clientSocket.close(); + return; + } + + // from here on, we started sending data, so all we can do is close the connection when something happens int blobLen = (int) blobFile.length(); writeLength(blobLen, outputStream); @@ -251,14 +301,14 @@ class BlobServerConnection extends Thread { bytesRemaining -= read; } } - } - catch (SocketException e) { + } catch (SocketException e) { // happens when the other side disconnects LOG.debug("Socket connection closed", e); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("GET operation failed", t); clientSocket.close(); + } finally { + readLock.unlock(); } } @@ -328,21 +378,83 @@ class BlobServerConnection extends Thread { fos.close(); if (contentAddressable == NAME_ADDRESSABLE) { - File storageFile = this.blobServer.getStorageLocation(jobID, key); - Files.move(incomingFile, storageFile); - incomingFile = null; + File storageFile = blobServer.getStorageLocation(jobID, key); - blobStore.put(storageFile, jobID, key); + writeLock.lock(); + + try { + // first check whether the file already exists + if (!storageFile.exists()) { + try { + // only move the file if it does not yet exist + Files.move(incomingFile.toPath(), storageFile.toPath()); + + incomingFile = null; + + } catch (FileAlreadyExistsException ignored) { + LOG.warn("Detected concurrent file modifications. This should only happen if multiple" + + "BlobServer use the same storage directory."); + // we cannot be sure at this point whether the file has already been uploaded to the blob + // store or not. Even if the blobStore might shortly be in an inconsistent state, we have + // persist the blob. Otherwise we might not be able to recover the job. + } + + // only the one moving the incoming file to its final destination is allowed to upload the + // file to the blob store + blobStore.put(storageFile, jobID, key); + } + } catch(IOException ioe) { + // we failed to either create the local storage file or to upload it --> try to delete the local file + // while still having the write lock + if (storageFile.exists() && !storageFile.delete()) { + LOG.warn("Could not delete the storage file."); + } + + throw ioe; + } finally { + writeLock.unlock(); + } outputStream.write(RETURN_OKAY); } else { BlobKey blobKey = new BlobKey(md.digest()); File storageFile = blobServer.getStorageLocation(blobKey); - Files.move(incomingFile, storageFile); - incomingFile = null; - blobStore.put(storageFile, blobKey); + writeLock.lock(); + + try { + // first check whether the file already exists + if (!storageFile.exists()) { + try { + // only move the file if it does not yet exist + Files.move(incomingFile.toPath(), storageFile.toPath()); + + incomingFile = null; + + } catch (FileAlreadyExistsException ignored) { + LOG.warn("Detected concurrent file modifications. This should only happen if multiple" + + "BlobServer use the same storage directory."); + // we cannot be sure at this point whether the file has already been uploaded to the blob + // store or not. Even if the blobStore might shortly be in an inconsistent state, we have + // persist the blob. Otherwise we might not be able to recover the job. + } + + // only the one moving the incoming file to its final destination is allowed to upload the + // file to the blob store + blobStore.put(storageFile, blobKey); + } + } catch(IOException ioe) { + // we failed to either create the local storage file or to upload it --> try to delete the local file + // while still having the write lock + if (storageFile.exists() && !storageFile.delete()) { + LOG.warn("Could not delete the storage file."); + } + + throw ioe; + } finally { + writeLock.unlock(); + } // Return computed key to client for validation outputStream.write(RETURN_OKAY); @@ -397,12 +509,21 @@ class BlobServerConnection extends Thread { if (type == CONTENT_ADDRESSABLE) { BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = this.blobServer.getStorageLocation(key); - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); - } + File blobFile = blobServer.getStorageLocation(key); + + writeLock.lock(); - blobStore.delete(key); + try { + // we should make the local and remote file deletion atomic, otherwise we might risk not + // removing the remote file in case of a concurrent put operation + if (blobFile.exists() && !blobFile.delete()) { + throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); + } + + blobStore.delete(key); + } finally { + writeLock.unlock(); + } } else if (type == NAME_ADDRESSABLE) { byte[] jidBytes = new byte[JobID.SIZE]; @@ -412,20 +533,37 @@ class BlobServerConnection extends Thread { String key = readKey(buf, inputStream); File blobFile = this.blobServer.getStorageLocation(jobID, key); - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); - } - blobStore.delete(jobID, key); + writeLock.lock(); + + try { + // we should make the local and remote file deletion atomic, otherwise we might risk not + // removing the remote file in case of a concurrent put operation + if (blobFile.exists() && !blobFile.delete()) { + throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); + } + + blobStore.delete(jobID, key); + } finally { + writeLock.unlock(); + } } else if (type == JOB_ID_SCOPE) { byte[] jidBytes = new byte[JobID.SIZE]; readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); JobID jobID = JobID.fromByteArray(jidBytes); - blobServer.deleteJobDirectory(jobID); + writeLock.lock(); + + try { + // we should make the local and remote file deletion atomic, otherwise we might risk not + // removing the remote file in case of a concurrent put operation + blobServer.deleteJobDirectory(jobID); - blobStore.deleteAll(jobID); + blobStore.deleteAll(jobID); + } finally { + writeLock.unlock(); + } } else { throw new IOException("Unrecognized addressing type: " + type); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index e8e28a1..5e1d86e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -20,23 +20,35 @@ package org.apache.flink.runtime.blob; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; import org.junit.Test; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.mock; /** * Tests how DELETE requests behave. */ -public class BlobServerDeleteTest { +public class BlobServerDeleteTest extends TestLogger { private final Random rnd = new Random(); @@ -285,6 +297,65 @@ public class BlobServerDeleteTest { } } + /** + * FLINK-6020 + * + * Tests that concurrent delete operations don't interfere with each other. + * + * Note: The test checks that there cannot be two threads which have checked whether a given blob file exist + * and then one of them fails deleting it. Without the introduced lock, this situation should rarely happen + * and make this test fail. Thus, if this test should become "unstable", then the delete atomicity is most likely + * broken. + */ + @Test + public void testConcurrentDeleteOperations() throws IOException, ExecutionException, InterruptedException { + final Configuration configuration = new Configuration(); + final BlobStore blobStore = mock(BlobStore.class); + + final int concurrentDeleteOperations = 3; + final ExecutorService executor = Executors.newFixedThreadPool(concurrentDeleteOperations); + + final List> deleteFutures = new ArrayList<>(concurrentDeleteOperations); + + final byte[] data = {1, 2, 3}; + + try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) { + + final BlobKey blobKey; + + try (BlobClient client = blobServer.createClient()) { + blobKey = client.put(data); + } + + assertTrue(blobServer.getStorageLocation(blobKey).exists()); + + for (int i = 0; i < concurrentDeleteOperations; i++) { + Future deleteFuture = FlinkCompletableFuture.supplyAsync(new Callable() { + @Override + public Void call() throws Exception { + try (BlobClient blobClient = blobServer.createClient()) { + blobClient.delete(blobKey); + } + + return null; + } + }, executor); + + deleteFutures.add(deleteFuture); + } + + Future waitFuture = FutureUtils.waitForAll(deleteFutures); + + // make sure all delete operation have completed successfully + // in case of no lock, one of the delete operations should eventually fail + waitFuture.get(); + + assertFalse(blobServer.getStorageLocation(blobKey).exists()); + } finally { + executor.shutdownNow(); + } + } + private void cleanup(BlobServer server, BlobClient client) { if (client != null) { try { http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 6d1dba8..3209648 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -18,27 +18,57 @@ package org.apache.flink.runtime.blob; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Tests how failing GET requests behave in the presence of failures. * Successful GET requests are tested in conjunction wit the PUT * requests. */ -public class BlobServerGetTest { +public class BlobServerGetTest extends TestLogger { private final Random rnd = new Random(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Test public void testGetFailsDuringLookup() throws IOException { BlobServer server = null; @@ -128,4 +158,87 @@ public class BlobServerGetTest { } } } + + /** + * FLINK-6020 + * + * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob. + */ + @Test + public void testConcurrentGetOperations() throws IOException, ExecutionException, InterruptedException { + final Configuration configuration = new Configuration(); + + configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + final BlobStore blobStore = mock(BlobStore.class); + + final int numberConcurrentGetOperations = 3; + final List> getOperations = new ArrayList<>(numberConcurrentGetOperations); + + final byte[] data = {1, 2, 3, 4, 99, 42}; + final ByteArrayInputStream bais = new ByteArrayInputStream(data); + + MessageDigest md = BlobUtils.createMessageDigest(); + + // create the correct blob key by hashing our input data + final BlobKey blobKey = new BlobKey(md.digest(data)); + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + File targetFile = (File) invocation.getArguments()[1]; + + FileUtils.copyInputStreamToFile(bais, targetFile); + + return null; + } + } + ).when(blobStore).get(any(BlobKey.class), any(File.class)); + + final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations); + + try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) { + for (int i = 0; i < numberConcurrentGetOperations; i++) { + Future getOperation = FlinkCompletableFuture.supplyAsync(new Callable() { + @Override + public InputStream call() throws Exception { + try (BlobClient blobClient = blobServer.createClient(); + InputStream inputStream = blobClient.get(blobKey)) { + byte[] buffer = new byte[data.length]; + + IOUtils.readFully(inputStream, buffer); + + return new ByteArrayInputStream(buffer); + } + } + }, executor); + + getOperations.add(getOperation); + } + + Future> inputStreamsFuture = FutureUtils.combineAll(getOperations); + + Collection inputStreams = inputStreamsFuture.get(); + + // check that we have read the right data + for (InputStream inputStream : inputStreams) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length); + + IOUtils.copy(inputStream, baos); + + baos.close(); + byte[] input = baos.toByteArray(); + + assertArrayEquals(data, input); + + inputStream.close(); + } + + // verify that we downloaded the requested blob exactly once from the BlobStore + verify(blobStore, times(1)).get(eq(blobKey), any(File.class)); + } finally { + executor.shutdownNow(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index 441ca7d..35ef968 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -20,7 +20,12 @@ package org.apache.flink.runtime.blob; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -28,16 +33,29 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.junit.Assert.*; import static org.junit.Assume.assumeTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Tests for successful and failing PUT operations against the BLOB server, * and successful GET operations. */ -public class BlobServerPutTest { +public class BlobServerPutTest extends TestLogger { private final Random rnd = new Random(); @@ -299,6 +317,95 @@ public class BlobServerPutTest { } } + /** + * FLINK-6020 + * + * Tests that concurrent put operations will only upload the file once to the {@link BlobStore}. + */ + @Test + public void testConcurrentPutOperations() throws IOException, ExecutionException, InterruptedException { + final Configuration configuration = new Configuration(); + BlobStore blobStore = mock(BlobStore.class); + int concurrentPutOperations = 2; + int dataSize = 1024; + + final CountDownLatch countDownLatch = new CountDownLatch(concurrentPutOperations); + final byte[] data = new byte[dataSize]; + + ArrayList> allFutures = new ArrayList(concurrentPutOperations); + + ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations); + + try ( + final BlobServer blobServer = new BlobServer(configuration, blobStore)) { + + for (int i = 0; i < concurrentPutOperations; i++) { + Future putFuture = FlinkCompletableFuture.supplyAsync(new Callable() { + @Override + public BlobKey call() throws Exception { + try (BlobClient blobClient = blobServer.createClient()) { + return blobClient.put(new BlockingInputStream(countDownLatch, data)); + } + } + }, executor); + + allFutures.add(putFuture); + } + + FutureUtils.ConjunctFuture> conjunctFuture = FutureUtils.combineAll(allFutures); + + // wait until all operations have completed and check that no exception was thrown + Collection blobKeys = conjunctFuture.get(); + + Iterator blobKeyIterator = blobKeys.iterator(); + + assertTrue(blobKeyIterator.hasNext()); + + BlobKey blobKey = blobKeyIterator.next(); + + // make sure that all blob keys are the same + while(blobKeyIterator.hasNext()) { + assertEquals(blobKey, blobKeyIterator.next()); + } + + // check that we only uploaded the file once to the blob store + verify(blobStore, times(1)).put(any(File.class), eq(blobKey)); + } finally { + executor.shutdownNow(); + } + } + + private static final class BlockingInputStream extends InputStream { + + private final CountDownLatch countDownLatch; + private final byte[] data; + private int index = 0; + + public BlockingInputStream(CountDownLatch countDownLatch, byte[] data) { + this.countDownLatch = Preconditions.checkNotNull(countDownLatch); + this.data = Preconditions.checkNotNull(data); + } + + @Override + public int read() throws IOException { + + countDownLatch.countDown(); + + try { + countDownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Blocking operation was interrupted.", e); + } + + if (index >= data.length) { + return -1; + } else { + return data[index++]; + } + } + } + // -------------------------------------------------------------------------------------------- private static final class ChunkedInputStream extends InputStream { http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties index 98f136a..7ba1633 100644 --- a/flink-runtime/src/test/resources/log4j-test.properties +++ b/flink-runtime/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=INFO, console +log4j.rootLogger=OFF, console # ----------------------------------------------------------------------------- # Console (use 'console')