flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/5] flink git commit: [FLINK-7056][tests][hotfix] make sure the client and a created InputStream are closed
Date Fri, 18 Aug 2017 07:36:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6c6d90084 -> 7b2362406


[FLINK-7056][tests][hotfix] make sure the client and a created InputStream are closed

If not and the server has not yet sent all data packets, it may still occupy the
read lock and block any writing operations (also see FLINK-7467).

This closes #4558.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0a15060
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0a15060
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0a15060

Branch: refs/heads/master
Commit: d0a150609b46cabfe7f5f0d760c465dcee5588fb
Parents: 6c6d900
Author: Nico Kruber <nico@data-artisans.com>
Authored: Thu Aug 17 12:04:09 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Aug 18 09:29:18 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/blob/BlobClientTest.java      | 59 ++++++++++----------
 .../runtime/blob/BlobServerDeleteTest.java      | 11 ++--
 .../flink/runtime/blob/BlobServerGetTest.java   |  7 ++-
 .../flink/runtime/blob/BlobServerPutTest.java   | 25 +++------
 4 files changed, 47 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0a15060/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index cfec4c5..d511e86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -139,30 +139,35 @@ public class BlobClientTest {
 	 * the specified buffer.
 	 * 
 	 * @param inputStream
-	 *        the input stream returned from the GET operation
+	 *        the input stream returned from the GET operation (will be closed by this method)
 	 * @param buf
 	 *        the buffer to compare the input stream's data to
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while reading the input stream
 	 */
-	static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException
{
-		byte[] receivedBuffer = new byte[buf.length];
+	static void validateGetAndClose(final InputStream inputStream, final byte[] buf) throws
IOException {
+		try {
+			byte[] receivedBuffer = new byte[buf.length];
 
-		int bytesReceived = 0;
+			int bytesReceived = 0;
 
-		while (true) {
+			while (true) {
 
-			final int read = inputStream.read(receivedBuffer, bytesReceived, receivedBuffer.length
- bytesReceived);
-			if (read < 0) {
-				throw new EOFException();
-			}
-			bytesReceived += read;
+				final int read = inputStream
+					.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived);
+				if (read < 0) {
+					throw new EOFException();
+				}
+				bytesReceived += read;
 
-			if (bytesReceived == receivedBuffer.length) {
-				assertEquals(-1, inputStream.read());
-				assertArrayEquals(buf, receivedBuffer);
-				return;
+				if (bytesReceived == receivedBuffer.length) {
+					assertEquals(-1, inputStream.read());
+					assertArrayEquals(buf, receivedBuffer);
+					return;
+				}
 			}
+		} finally {
+			inputStream.close();
 		}
 	}
 
@@ -171,13 +176,13 @@ public class BlobClientTest {
 	 * the specified file.
 	 * 
 	 * @param inputStream
-	 *        the input stream returned from the GET operation
+	 *        the input stream returned from the GET operation (will be closed by this method)
 	 * @param file
 	 *        the file to compare the input stream's data to
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while reading the input stream or the file
 	 */
-	private static void validateGet(final InputStream inputStream, final File file) throws IOException
{
+	private static void validateGetAndClose(final InputStream inputStream, final File file)
throws IOException {
 
 		InputStream inputStream2 = null;
 		try {
@@ -200,6 +205,7 @@ public class BlobClientTest {
 			if (inputStream2 != null) {
 				inputStream2.close();
 			}
+			inputStream.close();
 		}
 
 	}
@@ -231,14 +237,11 @@ public class BlobClientTest {
 			assertEquals(origKey, receivedKey);
 
 			// Retrieve the data
-			InputStream is = client.get(receivedKey);
-			validateGet(is, testBuffer);
-			is = client.get(jobId, receivedKey);
-			validateGet(is, testBuffer);
+			validateGetAndClose(client.get(receivedKey), testBuffer);
+			validateGetAndClose(client.get(jobId, receivedKey), testBuffer);
 
 			// Check reaction to invalid keys
-			try {
-				client.get(new BlobKey());
+			try (InputStream ignored = client.get(new BlobKey())) {
 				fail("Expected IOException did not occur");
 			}
 			catch (IOException fnfe) {
@@ -246,8 +249,7 @@ public class BlobClientTest {
 			}
 			// new client needed (closed from failure above)
 			client = new BlobClient(serverAddress, getBlobClientConfig());
-			try {
-				client.get(jobId, new BlobKey());
+			try (InputStream ignored = client.get(jobId, new BlobKey())) {
 				fail("Expected IOException did not occur");
 			}
 			catch (IOException fnfe) {
@@ -308,10 +310,8 @@ public class BlobClientTest {
 			is = null;
 
 			// Retrieve the data
-			is = client.get(receivedKey);
-			validateGet(is, testFile);
-			is = client.get(jobId, receivedKey);
-			validateGet(is, testFile);
+			validateGetAndClose(client.get(receivedKey), testFile);
+			validateGetAndClose(client.get(jobId, receivedKey), testFile);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -362,8 +362,7 @@ public class BlobClientTest {
 		assertEquals(1, blobKeys.size());
 
 		try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) {
-			InputStream is = blobClient.get(blobKeys.get(0));
-			validateGet(is, testFile);
+			validateGetAndClose(blobClient.get(blobKeys.get(0)), testFile);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0a15060/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index d91aae42..413e2e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -31,6 +31,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -40,6 +41,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -99,8 +101,7 @@ public class BlobServerDeleteTest extends TestLogger {
 			client.close();
 
 			client = new BlobClient(serverAddress, config);
-			try {
-				client.get(key1);
+			try (InputStream ignored = client.get(key1)) {
 				fail("BLOB should have been deleted");
 			}
 			catch (IOException e) {
@@ -111,12 +112,14 @@ public class BlobServerDeleteTest extends TestLogger {
 
 			client = new BlobClient(serverAddress, config);
 			try {
-				client.get(jobId, key1);
+				// NOTE: the server will stall in its send operation until either the data is fully
+				//       read or the socket is closed, e.g. via a client.close() call
+				validateGetAndClose(client.get(jobId, key1), data);
 			}
 			catch (IOException e) {
-				// expected
 				fail("Deleting a job-unrelated BLOB should not affect a job-related BLOB with the same
key");
 			}
+			client.close();
 
 			// delete a file directly on the server
 			server.delete(key2);

http://git-wip-us.apache.org/repos/asf/flink/blob/d0a15060/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 5ad8d95..7ccf075 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -122,7 +123,7 @@ public class BlobServerGetTest extends TestLogger {
 			assertNotNull(key);
 			assertEquals(key, key2);
 			// request for jobId2 should succeed
-			getFileHelper(client, jobId2, key);
+			validateGetAndClose(getFileHelper(client, jobId2, key), data);
 			// request for jobId1 should still fail
 			client = verifyDeleted(client, jobId1, key, serverAddress, config);
 
@@ -160,8 +161,7 @@ public class BlobServerGetTest extends TestLogger {
 	private static BlobClient verifyDeleted(
 			BlobClient client, JobID jobId, BlobKey key,
 			InetSocketAddress serverAddress, Configuration config) throws IOException {
-		try {
-			getFileHelper(client, jobId, key);
+		try (InputStream ignored = getFileHelper(client, jobId, key)) {
 			fail("This should not succeed.");
 		} catch (IOException e) {
 			// expected
@@ -227,6 +227,7 @@ public class BlobServerGetTest extends TestLogger {
 			catch (IOException e) {
 				// expected
 			}
+			is.close();
 		} finally {
 			if (client != null) {
 				client.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/d0a15060/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index f55adb7..2b8e2d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -46,8 +46,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.getFileHelper;
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -226,9 +226,9 @@ public class BlobServerPutTest extends TestLogger {
 	 * @param jobId
 	 * 		job ID or <tt>null</tt> if job-unrelated
 	 * @param key1
-	 * 		first key
+	 * 		first key for 44 bytes starting at byte 10 of data in the BLOB
 	 * @param key2
-	 * 		second key
+	 * 		second key for the complete data in the BLOB
 	 * @param data
 	 * 		expected data
 	 * @param serverAddress
@@ -241,12 +241,9 @@ public class BlobServerPutTest extends TestLogger {
 			InetSocketAddress serverAddress, Configuration config) throws IOException {
 
 		BlobClient client = new BlobClient(serverAddress, config);
-		InputStream is1 = null;
-		InputStream is2 = null;
 
-		try {
-			// one get request on the same client
-			is1 = getFileHelper(client, jobId, key2);
+		// one get request on the same client
+		try (InputStream is1 = getFileHelper(client, jobId, key2)) {
 			byte[] result1 = new byte[44];
 			BlobUtils.readFully(is1, result1, 0, result1.length, null);
 			is1.close();
@@ -255,20 +252,12 @@ public class BlobServerPutTest extends TestLogger {
 				assertEquals(data[j], result1[i]);
 			}
 
-			// close the client and create a new one for the remaining requests
+			// close the client and create a new one for the remaining request
 			client.close();
 			client = new BlobClient(serverAddress, config);
 
-			is2 = getFileHelper(client, jobId, key1);
-			BlobClientTest.validateGet(is2, data);
-			is2.close();
+			validateGetAndClose(getFileHelper(client, jobId, key1), data);
 		} finally {
-			if (is1 != null) {
-				is1.close();
-			}
-			if (is2 != null) {
-				is1.close();
-			}
 			client.close();
 		}
 	}


Mime
View raw message