flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [1/3] flink git commit: [FLINK-7975][QS] Wait for QS client to shutdown.
Date Wed, 06 Dec 2017 15:11:48 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5221a70e0 -> a3fd548e9


[FLINK-7975][QS] Wait for QS client to shutdown.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5760677b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5760677b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5760677b

Branch: refs/heads/master
Commit: 5760677b3bb26245ca4816548833da0257ec7c7a
Parents: 5221a70
Author: kkloudas <kkloudas@gmail.com>
Authored: Thu Nov 9 19:21:43 2017 +0100
Committer: kkloudas <kkloudas@gmail.com>
Committed: Wed Dec 6 14:33:16 2017 +0100

----------------------------------------------------------------------
 .../client/QueryableStateClient.java            |  30 +++-
 .../flink/queryablestate/network/Client.java    | 171 +++++++++++++------
 .../queryablestate/network/ClientTest.java      |  88 ++++++++--
 .../query/AbstractQueryableStateOperator.java   |   2 +
 4 files changed, 215 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 7abf6bc..f1c69ed 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -108,9 +108,33 @@ public class QueryableStateClient {
 				new DisabledKvStateRequestStats());
 	}
 
-	/** Shuts down the client. */
-	public void shutdown() {
-		client.shutdown();
+	/**
+	 * Shuts down the client and returns a {@link CompletableFuture} that
+	 * will be completed when the shutdown process is completed.
+	 *
+	 * <p>If an exception is thrown for any reason, then the returned future
+	 * will be completed exceptionally with that exception.
+	 *
+	 * @return A {@link CompletableFuture} for further handling of the
+	 * shutdown result.
+	 */
+	public CompletableFuture<?> shutdownAndHandle() {
+		return client.shutdown();
+	}
+
+	/**
+	 * Shuts down the client and waits until shutdown is completed.
+	 *
+	 * <p>If an exception is thrown, a warning is logged containing
+	 * the exception message.
+	 */
+	public void shutdownAndWait() {
+		try {
+			client.shutdown().get();
+			LOG.info("The Queryable State Client was shutdown successfully.");
+		} catch (Exception e) {
+			LOG.warn("The Queryable State Client shutdown failed: ", e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 12286fa..364f835 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -42,15 +42,19 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChann
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -64,6 +68,8 @@ import java.util.concurrent.atomic.AtomicReference;
 @Internal
 public class Client<REQ extends MessageBody, RESP extends MessageBody> {
 
+	private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+
 	/** The name of the client. Used for logging and stack traces.*/
 	private final String clientName;
 
@@ -82,8 +88,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 	/** Pending connections. */
 	private final Map<InetSocketAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<>();
 
-	/** Atomic shut down flag. */
-	private final AtomicBoolean shutDown = new AtomicBoolean();
+	/** Atomic shut down future. */
+	private final AtomicReference<CompletableFuture<Void>> clientShutdownFuture
= new AtomicReference<>(null);
 
 	/**
 	 * Creates a client with the specified number of event loop threads.
@@ -133,7 +139,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 	}
 
 	public CompletableFuture<RESP> sendRequest(final InetSocketAddress serverAddress,
final REQ request) {
-		if (shutDown.get()) {
+		if (clientShutdownFuture.get() != null) {
 			return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already
shut down."));
 		}
 
@@ -166,28 +172,57 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 	 * Shuts down the client and closes all connections.
 	 *
 	 * <p>After a call to this method, all returned futures will be failed.
+	 *
+	 * @return A {@link CompletableFuture} that will be completed when the shutdown process
is done.
 	 */
-	public void shutdown() {
-		if (shutDown.compareAndSet(false, true)) {
+	public CompletableFuture<Void> shutdown() {
+		final CompletableFuture<Void> newShutdownFuture = new CompletableFuture<>();
+		if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) {
+
+			final List<CompletableFuture<Void>> connectionFutures = new ArrayList<>();
+
 			for (Map.Entry<InetSocketAddress, EstablishedConnection> conn : establishedConnections.entrySet())
{
 				if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
-					conn.getValue().close();
+					connectionFutures.add(conn.getValue().close());
 				}
 			}
 
 			for (Map.Entry<InetSocketAddress, PendingConnection> conn : pendingConnections.entrySet())
{
 				if (pendingConnections.remove(conn.getKey()) != null) {
-					conn.getValue().close();
+					connectionFutures.add(conn.getValue().close());
 				}
 			}
 
-			if (bootstrap != null) {
-				EventLoopGroup group = bootstrap.group();
-				if (group != null) {
-					group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
+			CompletableFuture.allOf(
+					connectionFutures.toArray(new CompletableFuture<?>[connectionFutures.size()])
+			).whenComplete((result, throwable) -> {
+
+				if (throwable != null) {
+					LOG.warn("Problem while shutting down the connections at the {}: {}", clientName, throwable);
 				}
-			}
+
+				if (bootstrap != null) {
+					EventLoopGroup group = bootstrap.group();
+					if (group != null && !group.isShutdown()) {
+						group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
+								.addListener(finished -> {
+									if (finished.isSuccess()) {
+										newShutdownFuture.complete(null);
+									} else {
+										newShutdownFuture.completeExceptionally(finished.cause());
+									}
+								});
+					} else {
+						newShutdownFuture.complete(null);
+					}
+				} else {
+					newShutdownFuture.complete(null);
+				}
+			});
+
+			return newShutdownFuture;
 		}
+		return clientShutdownFuture.get();
 	}
 
 	/**
@@ -209,8 +244,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 		/** The established connection after the connect succeeds. */
 		private EstablishedConnection established;
 
-		/** Closed flag. */
-		private boolean closed;
+		/** Atomic shut down future. */
+		private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture
= new AtomicReference<>(null);
 
 		/** Failure cause if something goes wrong. */
 		private Throwable failureCause;
@@ -250,7 +285,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 			synchronized (connectLock) {
 				if (failureCause != null) {
 					return FutureUtils.getFailedFuture(failureCause);
-				} else if (closed) {
+				} else if (connectionShutdownFuture.get() != null) {
 					return FutureUtils.getFailedFuture(new ClosedChannelException());
 				} else {
 					if (established != null) {
@@ -272,7 +307,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 		 */
 		private void handInChannel(Channel channel) {
 			synchronized (connectLock) {
-				if (closed || failureCause != null) {
+				if (connectionShutdownFuture.get() != null || failureCause != null) {
 					// Close the channel and we are done. Any queued requests
 					// are removed on the close/failure call and after that no
 					// new ones can be enqueued.
@@ -300,7 +335,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 					// Check shut down for possible race with shut down. We
 					// don't want any lingering connections after shut down,
 					// which can happen if we don't check this here.
-					if (shutDown.get()) {
+					if (clientShutdownFuture.get() != null) {
 						if (establishedConnections.remove(serverAddress, established)) {
 							established.close();
 						}
@@ -312,32 +347,40 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 		/**
 		 * Close the connecting channel with a ClosedChannelException.
 		 */
-		private void close() {
-			close(new ClosedChannelException());
+		private CompletableFuture<Void> close() {
+			return close(new ClosedChannelException());
 		}
 
 		/**
 		 * Close the connecting channel with an Exception (can be {@code null})
 		 * or forward to the established channel.
 		 */
-		private void close(Throwable cause) {
-			synchronized (connectLock) {
-				if (!closed) {
+		private CompletableFuture<Void> close(Throwable cause) {
+			CompletableFuture<Void> future = new CompletableFuture<>();
+			if (connectionShutdownFuture.compareAndSet(null, future)) {
+				synchronized (connectLock) {
 					if (failureCause == null) {
 						failureCause = cause;
 					}
 
 					if (established != null) {
-						established.close();
+						established.close().whenComplete((result, throwable) -> {
+							if (throwable != null) {
+								future.completeExceptionally(throwable);
+							} else {
+								future.complete(null);
+							}
+						});
 					} else {
 						PendingRequest pending;
 						while ((pending = queuedRequests.poll()) != null) {
 							pending.completeExceptionally(cause);
 						}
+						future.complete(null);
 					}
-					closed = true;
 				}
 			}
+			return connectionShutdownFuture.get();
 		}
 
 		@Override
@@ -347,7 +390,7 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 						"serverAddress=" + serverAddress +
 						", queuedRequests=" + queuedRequests.size() +
 						", established=" + (established != null) +
-						", closed=" + closed +
+						", closed=" + (connectionShutdownFuture.get() != null) +
 						'}';
 			}
 		}
@@ -383,8 +426,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 		/** Current request number used to assign unique request IDs. */
 		private final AtomicLong requestCount = new AtomicLong();
 
-		/** Reference to a failure that was reported by the channel. */
-		private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
+		/** Atomic shut down future. */
+		private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture
= new AtomicReference<>(null);
 
 		/**
 		 * Creates an established connection with the given channel.
@@ -412,8 +455,8 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 		/**
 		 * Close the channel with a ClosedChannelException.
 		 */
-		void close() {
-			close(new ClosedChannelException());
+		CompletableFuture<Void> close() {
+			return close(new ClosedChannelException());
 		}
 
 		/**
@@ -422,20 +465,33 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 		 * @param cause The cause to close the channel with.
 		 * @return Channel close future
 		 */
-		private boolean close(Throwable cause) {
-			if (failureCause.compareAndSet(null, cause)) {
-				channel.close();
-				stats.reportInactiveConnection();
+		private CompletableFuture<Void> close(final Throwable cause) {
+			final CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
 
-				for (long requestId : pendingRequests.keySet()) {
-					TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-					if (pending != null && pending.completeExceptionally(cause)) {
-						stats.reportFailedRequest();
+			if (connectionShutdownFuture.compareAndSet(null, shutdownFuture)) {
+				channel.close().addListener(finished -> {
+					stats.reportInactiveConnection();
+					for (long requestId : pendingRequests.keySet()) {
+						TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
+						if (pending != null && pending.completeExceptionally(cause)) {
+							stats.reportFailedRequest();
+						}
 					}
-				}
-				return true;
+
+					// when finishing, if netty successfully closes the channel, then the provided exception
is used
+					// as the reason for the closing. If there was something wrong at the netty side, then
that exception
+					// is prioritized over the provided one.
+					if (finished.isSuccess()) {
+						shutdownFuture.completeExceptionally(cause);
+					} else {
+						LOG.warn("Something went wrong when trying to close connection due to : ", cause);
+						shutdownFuture.completeExceptionally(finished.cause());
+					}
+				});
 			}
-			return false;
+
+			// in case we had a race condition, return the winner of the race.
+			return connectionShutdownFuture.get();
 		}
 
 		/**
@@ -464,16 +520,22 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 					}
 				});
 
-				// Check failure for possible race. We don't want any lingering
+				// Check for possible race. We don't want any lingering
 				// promises after a failure, which can happen if we don't check
 				// this here. Note that close is treated as a failure as well.
-				Throwable failure = failureCause.get();
-				if (failure != null) {
-					// Remove from pending requests to guard against concurrent
-					// removal and to make sure that we only count it once as failed.
+				CompletableFuture<Void> clShutdownFuture = clientShutdownFuture.get();
+				if (clShutdownFuture != null) {
 					TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-					if (pending != null && pending.completeExceptionally(failure)) {
-						stats.reportFailedRequest();
+					if (pending != null) {
+						clShutdownFuture.whenComplete((ignored, throwable) -> {
+							if (throwable != null && pending.completeExceptionally(throwable)) {
+								stats.reportFailedRequest();
+							} else {
+								// the shutdown future is always completed exceptionally so we should not arrive
here.
+								// but in any case, we complete the pending connection request exceptionally.
+								pending.completeExceptionally(new ClosedChannelException());
+							}
+						});
 					}
 				}
 			} catch (Throwable t) {
@@ -486,27 +548,25 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 		@Override
 		public void onRequestResult(long requestId, RESP response) {
 			TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-			if (pending != null && pending.complete(response)) {
+			if (pending != null && !pending.isDone()) {
 				long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L;
 				stats.reportSuccessfulRequest(durationMillis);
+				pending.complete(response);
 			}
 		}
 
 		@Override
 		public void onRequestFailure(long requestId, Throwable cause) {
 			TimestampedCompletableFuture pending = pendingRequests.remove(requestId);
-			if (pending != null && pending.completeExceptionally(cause)) {
+			if (pending != null && !pending.isDone()) {
 				stats.reportFailedRequest();
+				pending.completeExceptionally(cause);
 			}
 		}
 
 		@Override
 		public void onFailure(Throwable cause) {
-			if (close(cause)) {
-				// Remove from established channels, otherwise future
-				// requests will be handled by this failed channel.
-				establishedConnections.remove(serverAddress, this);
-			}
+			close(cause).handle((cancelled, ignored) -> establishedConnections.remove(serverAddress,
this));
 		}
 
 		@Override
@@ -516,7 +576,6 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody>
{
 					", channel=" + channel +
 					", pendingRequests=" + pendingRequests.size() +
 					", requestCount=" + requestCount +
-					", failureCause=" + failureCause +
 					'}';
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 1fa4deb..8638efa 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -54,7 +55,9 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
-import org.junit.AfterClass;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,15 +98,20 @@ public class ClientTest {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class);
 
+	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(20L, TimeUnit.SECONDS);
+
 	// Thread pool for client bootstrap (shared between tests)
-	private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
+	private NioEventLoopGroup nioGroup;
 
-	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+	@Before
+	public void setUp() throws Exception {
+		nioGroup = new NioEventLoopGroup();
+	}
 
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (NIO_GROUP != null) {
-			NIO_GROUP.shutdownGracefully();
+	@After
+	public void tearDown() throws Exception {
+		if (nioGroup != null) {
+			nioGroup.shutdownGracefully();
 		}
 	}
 
@@ -218,7 +226,24 @@ public class ClientTest {
 			assertEquals(expectedRequests, stats.getNumFailed());
 		} finally {
 			if (client != null) {
-				client.shutdown();
+				Exception exc = null;
+				try {
+
+					// todo here we were seeing this problem:
+					// https://github.com/netty/netty/issues/4357 if we do a get().
+					// this is why we now simply wait a bit so that everything is
+					// shut down and then we check
+
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					exc = e;
+					LOG.error("An exception occurred while shutting down netty.", e);
+				}
+
+				Assert.assertTrue(
+						ExceptionUtils.stringifyException(exc),
+						client.isEventGroupShutdown()
+				);
 			}
 
 			if (serverChannel != null) {
@@ -265,7 +290,12 @@ public class ClientTest {
 			}
 		} finally {
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				Assert.assertTrue(client.isEventGroupShutdown());
 			}
 
 			assertEquals("Channel leak", 0L, stats.getNumConnections());
@@ -366,7 +396,12 @@ public class ClientTest {
 			}
 
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				Assert.assertTrue(client.isEventGroupShutdown());
 			}
 
 			assertEquals("Channel leak", 0L, stats.getNumConnections());
@@ -467,7 +502,12 @@ public class ClientTest {
 			assertEquals(2L, stats.getNumFailed());
 		} finally {
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				Assert.assertTrue(client.isEventGroupShutdown());
 			}
 
 			if (serverChannel != null) {
@@ -548,7 +588,12 @@ public class ClientTest {
 			assertEquals(1L, stats.getNumFailed());
 		} finally {
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				Assert.assertTrue(client.isEventGroupShutdown());
 			}
 
 			if (serverChannel != null) {
@@ -661,7 +706,7 @@ public class ClientTest {
 					Collections.shuffle(random);
 
 					// Dispatch queries
-					List<Future<KvStateResponse>> futures = new ArrayList<>(batchSize);
+					List<CompletableFuture<KvStateResponse>> futures = new ArrayList<>(batchSize);
 
 					for (int j = 0; j < batchSize; j++) {
 						int targetServer = random.get(j) % numServers;
@@ -700,8 +745,12 @@ public class ClientTest {
 				LOG.info("Number of requests {}/100_000", numRequests);
 			}
 
-			// Shut down
-			client.shutdown();
+			try {
+				client.shutdown().get(10L, TimeUnit.SECONDS);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+			Assert.assertTrue(client.isEventGroupShutdown());
 
 			for (Future<Void> future : taskFutures) {
 				try {
@@ -739,7 +788,12 @@ public class ClientTest {
 			}
 		} finally {
 			if (client != null) {
-				client.shutdown();
+				try {
+					client.shutdown().get(10L, TimeUnit.SECONDS);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				Assert.assertTrue(client.isEventGroupShutdown());
 			}
 
 			for (int i = 0; i < numServers; i++) {
@@ -761,7 +815,7 @@ public class ClientTest {
 				// Bind address and port
 				.localAddress(InetAddress.getLocalHost(), 0)
 				// NIO server channels
-				.group(NIO_GROUP)
+				.group(nioGroup)
 				.channel(NioServerSocketChannel.class)
 				// See initializer for pipeline details
 				.childHandler(new ChannelInitializer<SocketChannel>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5760677b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
index 7522a61..5ca9c1e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
@@ -36,6 +36,8 @@ abstract class AbstractQueryableStateOperator<S extends State, IN>
 		extends AbstractStreamOperator<IN>
 		implements OneInputStreamOperator<IN, IN> {
 
+	private static final long serialVersionUID = 7842489558298787382L;
+
 	/** State descriptor for the queryable state instance. */
 	protected final StateDescriptor<? extends S, ?> stateDescriptor;
 


Mime
View raw message