flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [tests] Harden SocketClientSinkTest
Date Wed, 23 Sep 2015 17:49:55 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5b46cbee2 -> 672979bac


[tests] Harden SocketClientSinkTest

Depending on the OS, noticing the closed server socket requires more payload.

(I tried to rewrite a test before noticing this and just kept it as is.)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/672979ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/672979ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/672979ba

Branch: refs/heads/master
Commit: 672979bacf192744bb1620525987c34e3b17e47e
Parents: 5b46cbe
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Sep 23 19:33:20 2015 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Sep 23 19:49:44 2015 +0200

----------------------------------------------------------------------
 .../api/functions/sink/SocketClientSink.java    |   5 +-
 .../functions/sink/SocketClientSinkTest.java    | 235 ++++++++-----------
 2 files changed, 106 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/672979ba/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 96bc497..1356263 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -142,6 +142,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN>
{
 	@Override
 	public void invoke(IN value) throws Exception {
 		byte[] msg = schema.serialize(value);
+
 		try {
 			outputStream.write(msg);
 			if (autoFlush) {
@@ -260,6 +261,8 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN>
{
 	// ------------------------------------------------------------------------
 	
 	int getCurrentNumberOfRetries() {
-		return retries;
+		synchronized (lock) {
+			return retries;
+		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/672979ba/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index ee3d604..8f4acde 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -22,7 +22,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.TestLogger;
-
 import org.junit.Test;
 
 import java.io.BufferedReader;
@@ -30,6 +29,11 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -43,11 +47,10 @@ import static org.junit.Assert.fail;
 public class SocketClientSinkTest extends TestLogger {
 
 	private static final String TEST_MESSAGE = "testSocketSinkInvoke";
-	
+
 	private static final String EXCEPTION_MESSGAE = "Failed to send message '" + TEST_MESSAGE
+ "\n'";
 
 	private static final String host = "127.0.0.1";
-	
 
 	private SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String,
byte[]>() {
 		@Override
@@ -55,7 +58,6 @@ public class SocketClientSinkTest extends TestLogger {
 			return element.getBytes();
 		}
 	};
-	
 
 	@Test
 	public void testSocketSink() throws Exception {
@@ -63,7 +65,7 @@ public class SocketClientSinkTest extends TestLogger {
 		final int port = server.getLocalPort();
 
 		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-		
+
 		Thread sinkRunner = new Thread("Test sink runner") {
 			@Override
 			public void run() {
@@ -73,17 +75,17 @@ public class SocketClientSinkTest extends TestLogger {
 					simpleSink.invoke(TEST_MESSAGE + '\n');
 					simpleSink.close();
 				}
-				catch (Throwable t){
+				catch (Throwable t) {
 					error.set(t);
 				}
 			}
 		};
-		
+
 		sinkRunner.start();
 
 		Socket sk = server.accept();
 		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
-		
+
 		String value = rdr.readLine();
 
 		sinkRunner.join();
@@ -105,7 +107,7 @@ public class SocketClientSinkTest extends TestLogger {
 
 		final SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port,
simpleSchema, 0, true);
 		simpleSink.open(new Configuration());
-		
+
 		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 
 		Thread sinkRunner = new Thread("Test sink runner") {
@@ -115,7 +117,7 @@ public class SocketClientSinkTest extends TestLogger {
 					// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the
server is CLOSE_WAIT
 					simpleSink.invoke(TEST_MESSAGE + '\n');
 				}
-				catch (Throwable t){
+				catch (Throwable t) {
 					error.set(t);
 				}
 			}
@@ -144,12 +146,12 @@ public class SocketClientSinkTest extends TestLogger {
 	public void testSocketSinkNoRetry() throws Exception {
 		final ServerSocket server = new ServerSocket(0);
 		final int port = server.getLocalPort();
-		
+
 		try {
 			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-	
+
 			Thread serverRunner = new Thread("Test server runner") {
-	
+
 				@Override
 				public void run() {
 					try {
@@ -162,10 +164,10 @@ public class SocketClientSinkTest extends TestLogger {
 				}
 			};
 			serverRunner.start();
-	
+
 			SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema,
0, true);
 			simpleSink.open(new Configuration());
-	
+
 			// wait socket server to close
 			serverRunner.join();
 			if (error.get() != null) {
@@ -173,13 +175,13 @@ public class SocketClientSinkTest extends TestLogger {
 				t.printStackTrace();
 				fail("Error in server thread: " + t.getMessage());
 			}
-			
+
 			try {
 				// socket should be closed, so this should trigger a re-try
 				// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the
server is CLOSE_WAIT
-				simpleSink.invoke(TEST_MESSAGE + '\n');
-				simpleSink.invoke(TEST_MESSAGE + '\n');
-				fail("This should have failed with an exception");
+				while (true) { // we have to do this more often as the server side closed is not guaranteed
to be noticed immediately
+					simpleSink.invoke(TEST_MESSAGE + '\n');
+				}
 			}
 			catch (IOException e) {
 				// check whether throw a exception that reconnect failed.
@@ -188,7 +190,7 @@ public class SocketClientSinkTest extends TestLogger {
 			catch (Exception e) {
 				fail("wrong exception: " + e.getClass().getName() + " - " + e.getMessage());
 			}
-			
+
 			assertEquals(0, simpleSink.getCurrentNumberOfRetries());
 		}
 		finally {
@@ -197,138 +199,105 @@ public class SocketClientSinkTest extends TestLogger {
 	}
 
 	@Test
-	public void testSocketSinkRetryThreeTimes() throws Exception {
-		final ServerSocket server = new ServerSocket(0);
-		final int port = server.getLocalPort();
-		
-		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+	public void testRetry() throws Exception {
 
-		Thread serverRunner = new Thread("Test server runner") {
-			@Override
-			public void run() {
-				try {
-					Socket sk = server.accept();
-					sk.close();
-				}
-				catch (Throwable t) {
-					error.set(t);
-				}
-				finally {
-					// close the server now to prevent reconnects
-					IOUtils.closeQuietly(server);
+		final ServerSocket[] serverSocket = new ServerSocket[1];
+		final ExecutorService[] executor = new ExecutorService[1];
+
+		try {
+			serverSocket[0] = new ServerSocket(0);
+			executor[0] = Executors.newCachedThreadPool();
+
+			int port = serverSocket[0].getLocalPort();
+
+			Callable<Void> serverTask = new Callable<Void>() {
+				@Override
+				public Void call() throws Exception {
+					Socket socket = serverSocket[0].accept();
+
+					BufferedReader reader = new BufferedReader(new InputStreamReader(
+							socket.getInputStream()));
+
+					String value = reader.readLine();
+					assertEquals("0", value);
+
+					socket.close();
+					return null;
 				}
-			}
-		};
+			};
 
-		serverRunner.start();
+			Future<Void> serverFuture = executor[0].submit(serverTask);
 
-		SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema,
3);
-		simpleSink.open(new Configuration());
+			final SocketClientSink<String> sink = new SocketClientSink<>(
+					host, serverSocket[0].getLocalPort(), simpleSchema, -1, true);
 
-		// wait socket server to close
-		serverRunner.join();
-		if (error.get() != null) {
-			Throwable t = error.get();
-			t.printStackTrace();
-			fail("Error in server thread: " + t.getMessage());
-		}
+			// Create the connection
+			sink.open(new Configuration());
 
-		try {
-			// socket should be closed, so this should trigger a re-try
-			// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the
server is CLOSE_WAIT
-			simpleSink.invoke(TEST_MESSAGE + '\n');
-			simpleSink.invoke(TEST_MESSAGE + '\n');
-		}
-		catch (IOException e) {
-			// check whether throw a exception that reconnect failed.
-			assertTrue("Wrong exception", e.getMessage().contains(EXCEPTION_MESSGAE));
-		}
-		catch (Exception e) {
-			fail("wrong exception: " + e.getClass().getName() + " - " + e.getMessage());
-		}
+			// Initial payload => this will be received by the server an then the socket will be
+			// closed.
+			sink.invoke("0\n");
 
-		assertEquals(3, simpleSink.getCurrentNumberOfRetries());
-	}
+			// Get future an make sure there was no problem. This will rethrow any Exceptions from
+			// the server.
+			serverFuture.get();
 
-	/**
-	 * This test the reconnect to server success.
-	 * First close the server and let the sink get reconnecting.
-	 * Meanwhile, reopen the server to let the sink reconnect success to socket.
-	 */
-	@Test
-	public void testSocketSinkRetryAccess() throws Exception {
-		final ServerSocket server1 = new ServerSocket(0);
-		final int port = server1.getLocalPort();
+			// Shutdown the server socket
+			serverSocket[0].close();
+			assertTrue(serverSocket[0].isClosed());
 
-		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+			// No retries expected at this point
+			assertEquals(0, sink.getCurrentNumberOfRetries());
 
-		// start a server, for the sink's open() method to connect against
-		// the server is immediately shut down again
-		Thread serverRunner = new Thread("Test server runner") {
+			final CountDownLatch retryLatch = new CountDownLatch(1);
+			final CountDownLatch again = new CountDownLatch(1);
 
-			@Override
-			public void run() {
-				try {
-					Socket sk = server1.accept();
-					sk.close();
-				}
-				catch (Throwable t) {
-					error.set(t);
-				}
-				finally {
-					IOUtils.closeQuietly(server1);
+			Callable<Void> sinkTask = new Callable<Void>() {
+				@Override
+				public Void call() throws Exception {
+					// Send next payload => server is down, should try to reconnect.
+
+					// We need to send more than just one packet to notice the closed connection.
+					while (retryLatch.getCount() != 0) {
+						sink.invoke("1\n");
+					}
+
+					return null;
 				}
+			};
+
+			Future<Void> sinkFuture = executor[0].submit(sinkTask);
+
+			while (sink.getCurrentNumberOfRetries() == 0) {
+				// Wait for a retry
+				Thread.sleep(100);
 			}
-		};
-		serverRunner.start();
 
-		final SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port,
simpleSchema, -1, true);
-		simpleSink.open(new Configuration());
+			// OK the poor guy retried to write
+			retryLatch.countDown();
 
-		// wait until the server is shut down
-		serverRunner.join();
-		if (error.get() != null) {
-			Throwable t = error.get();
-			t.printStackTrace();
-			fail("Error in server thread: " + t.getMessage());
-		}
+			// Restart the server
+			serverSocket[0] = new ServerSocket(port);
+			Socket socket = serverSocket[0].accept();
 
-		// run some data output on the sink. this should fail due to the inactive server, but retry
-		Thread sinkRunner = new Thread("Test sink runner") {
+			BufferedReader reader = new BufferedReader(new InputStreamReader(
+					socket.getInputStream()));
 
-			@Override
-			public void run() {
-				try {
-					// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the
server is CLOSE_WAIT
-					simpleSink.invoke(TEST_MESSAGE + '\n');
-					simpleSink.invoke(TEST_MESSAGE + '\n');
-				}
-				catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-		sinkRunner.start();
-		
-		// we start another server now, which will make the sink complete its task
-		ServerSocket server2 = new ServerSocket(port);
-		Socket sk = server2.accept();
-		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
-		String value = rdr.readLine();
-		int retry = simpleSink.getCurrentNumberOfRetries();
-		
-		// let the sink finish
-		sinkRunner.join();
+			// Wait for the reconnect
+			String value = reader.readLine();
 
-		// make sure that the sink did not throw an error
-		if (error.get() != null) {
-			Throwable t = error.get();
-			t.printStackTrace();
-			fail("Error in spawned thread: " + t.getMessage());
+			assertEquals("1", value);
+
+			// OK the sink re-connected. :)
 		}
+		finally {
+			if (serverSocket[0] != null) {
+				serverSocket[0].close();
+			}
 
-		// validate state and results
-		assertEquals(TEST_MESSAGE, value);
-		assertTrue(retry > 0);
+			if (executor[0] != null) {
+				executor[0].shutdown();
+			}
+		}
 	}
 }
\ No newline at end of file


Mime
View raw message