flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [4/4] flink git commit: [FLINK-7343][utils] Add network proxy utility to simulate network failures
Date Tue, 08 Aug 2017 08:13:32 GMT
[FLINK-7343][utils] Add network proxy utility to simulate network failures


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7f96f79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7f96f79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7f96f79

Branch: refs/heads/master
Commit: b7f96f79e7665f10880333d816d1694a227c5437
Parents: e2d3e1f
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Tue Aug 1 18:11:27 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200

----------------------------------------------------------------------
 .../flink/networking/NetworkFailureHandler.java | 178 +++++++++++++++++++
 .../flink/networking/NetworkFailuresProxy.java  | 125 +++++++++++++
 .../org/apache/flink/networking/EchoServer.java | 113 ++++++++++++
 .../networking/NetworkFailuresProxyTest.java    | 124 +++++++++++++
 4 files changed, 540 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
new file mode 100644
index 0000000..0ce0b12
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to the target channel
on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic can be blocked
at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+	private static final Logger LOG = LoggerFactory.getLogger(NetworkFailureHandler.class);
+	private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler";
+
+	// mapping between source and target channels, used for finding correct target channel to
use for given source.
+	private final Map<Channel, Channel> sourceToTargetChannels = new ConcurrentHashMap<>();
+	private final Consumer<NetworkFailureHandler> onClose;
+	private final ClientSocketChannelFactory channelFactory;
+	private final String remoteHost;
+	private final int remotePort;
+
+	private final AtomicBoolean blocked;
+
+	public NetworkFailureHandler(
+			AtomicBoolean blocked,
+			Consumer<NetworkFailureHandler> onClose,
+			ClientSocketChannelFactory channelFactory,
+			String remoteHost,
+			int remotePort) {
+		this.blocked = blocked;
+		this.onClose = onClose;
+		this.channelFactory = channelFactory;
+		this.remoteHost = remoteHost;
+		this.remotePort = remotePort;
+	}
+
+	/**
+	 * Closes the specified channel after all queued write requests are flushed.
+	 */
+	static void closeOnFlush(Channel channel) {
+		if (channel.isConnected()) {
+			channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+		}
+	}
+
+	public void closeConnections() {
+		for (Map.Entry<Channel, Channel> entry : sourceToTargetChannels.entrySet()) {
+			// target channel is closed on source's channel channelClosed even
+			entry.getKey().close();
+		}
+	}
+
+	@Override
+	public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception
{
+		// Suspend incoming traffic until connected to the remote host.
+		final Channel sourceChannel = event.getChannel();
+		sourceChannel.setReadable(false);
+
+		if (blocked.get()) {
+			sourceChannel.close();
+			return;
+		}
+
+		// Start the connection attempt.
+		ClientBootstrap targetConnectionBootstrap = new ClientBootstrap(channelFactory);
+		targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, new TargetChannelHandler(event.getChannel(),
blocked));
+		ChannelFuture connectFuture = targetConnectionBootstrap.connect(new InetSocketAddress(remoteHost,
remotePort));
+		sourceToTargetChannels.put(sourceChannel, connectFuture.getChannel());
+
+		connectFuture.addListener(future -> {
+			if (future.isSuccess()) {
+				// Connection attempt succeeded:
+				// Begin to accept incoming traffic.
+				sourceChannel.setReadable(true);
+			} else {
+				// Close the connection if the connection attempt has failed.
+				sourceChannel.close();
+			}
+		});
+	}
+
+	@Override
+	public void messageReceived(ChannelHandlerContext context, MessageEvent event) throws Exception
{
+		if (blocked.get()) {
+			return;
+		}
+
+		ChannelBuffer msg = (ChannelBuffer) event.getMessage();
+		Channel targetChannel = sourceToTargetChannels.get(event.getChannel());
+		if (targetChannel == null) {
+			throw new IllegalStateException("Could not find a target channel for the source channel");
+		}
+		targetChannel.write(msg);
+	}
+
+	@Override
+	public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) throws
Exception {
+		Channel targetChannel = sourceToTargetChannels.get(event.getChannel());
+		if (targetChannel == null) {
+			return;
+		}
+		closeOnFlush(targetChannel);
+		sourceToTargetChannels.remove(event.getChannel());
+		onClose.accept(this);
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws
Exception {
+		LOG.error("Closing communication channel because of an exception", event.getCause());
+		closeOnFlush(event.getChannel());
+	}
+
+	private static class TargetChannelHandler extends SimpleChannelUpstreamHandler {
+		private final Channel sourceChannel;
+		private final AtomicBoolean blocked;
+
+		TargetChannelHandler(Channel sourceChannel, AtomicBoolean blocked) {
+			this.sourceChannel = sourceChannel;
+			this.blocked = blocked;
+		}
+
+		@Override
+		public void messageReceived(ChannelHandlerContext context, MessageEvent event) throws Exception
{
+			if (blocked.get()) {
+				return;
+			}
+			ChannelBuffer msg = (ChannelBuffer) event.getMessage();
+			sourceChannel.write(msg);
+		}
+
+		@Override
+		public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) throws
Exception {
+			closeOnFlush(sourceChannel);
+		}
+
+		@Override
+		public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws
Exception {
+			LOG.error("Closing communication channel because of an exception", event.getCause());
+			closeOnFlush(event.getChannel());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
new file mode 100644
index 0000000..7030049
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class acts as a network proxy - listening on local port and forwarding all of the
network to the remote
+ * host/port. It allows to simulate a network failures in the communication.
+ */
+public class NetworkFailuresProxy implements AutoCloseable {
+	private static final Logger LOG = LoggerFactory.getLogger(NetworkFailuresProxy.class);
+	private static final String NETWORK_FAILURE_HANDLER_NAME = "network_failure_handler";
+
+	private final Executor executor = Executors.newCachedThreadPool();
+	private final ServerBootstrap serverBootstrap;
+	private final Channel channel;
+	private final AtomicBoolean blocked = new AtomicBoolean();
+	// collection of networkFailureHandlers so that we can call {@link NetworkFailureHandler.closeConnections}
on them.
+	private final Set<NetworkFailureHandler> networkFailureHandlers = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+
+	public NetworkFailuresProxy(int localPort, String remoteHost, int remotePort) {
+		LOG.info("Proxying [*:{}] to [{}:{}]", localPort, remoteHost, remotePort);
+
+		// Configure the bootstrap.
+		serverBootstrap = new ServerBootstrap(
+			new NioServerSocketChannelFactory(executor, executor));
+
+		// Set up the event pipeline factory.
+		ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(executor,
executor);
+		serverBootstrap.setOption("child.tcpNoDelay", true);
+		serverBootstrap.setOption("child.keepAlive", true);
+		serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+			public ChannelPipeline getPipeline() throws Exception {
+				ChannelPipeline pipeline = Channels.pipeline();
+
+				// synchronized for a race between blocking and creating new handlers
+				synchronized (networkFailureHandlers) {
+					NetworkFailureHandler failureHandler = new NetworkFailureHandler(
+						blocked,
+						networkFailureHandler -> networkFailureHandlers.remove(networkFailureHandler),
+						channelFactory,
+						remoteHost,
+						remotePort);
+					networkFailureHandlers.add(failureHandler);
+					pipeline.addLast(NETWORK_FAILURE_HANDLER_NAME, failureHandler);
+				}
+				return pipeline;
+			}
+		});
+		channel = serverBootstrap.bind(new InetSocketAddress(localPort));
+
+	}
+
+	/**
+	 * @return local port on which {@link NetworkFailuresProxy} is listening.
+	 */
+	public int getLocalPort() {
+		return ((InetSocketAddress) channel.getLocalAddress()).getPort();
+	}
+
+	/**
+	 * Blocks all ongoing traffic, closes all ongoing and closes any new incoming connections.
+	 */
+	public void blockTraffic() {
+		setTrafficBlocked(true);
+	}
+
+	/**
+	 * Resumes normal communication.
+	 */
+	public void unblockTraffic() {
+		setTrafficBlocked(false);
+	}
+
+	@Override
+	public void close() throws Exception {
+		channel.close();
+	}
+
+	private void setTrafficBlocked(boolean blocked) {
+		this.blocked.set(blocked);
+		if (blocked) {
+			// synchronized for a race between blocking and creating new handlers
+			synchronized (networkFailureHandlers) {
+				for (NetworkFailureHandler failureHandler : networkFailureHandlers) {
+					failureHandler.closeConnections();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
new file mode 100644
index 0000000..06e77ea
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * TCP EchoServer for test purposes.
+ */
+public class EchoServer extends Thread implements AutoCloseable {
+	private final ServerSocket serverSocket = new ServerSocket(0);
+	private final int socketTimeout;
+	private final List<EchoWorkerThread> workerThreads = Collections.synchronizedList(new
ArrayList<>());
+
+	private volatile boolean close = false;
+	private Exception threadException;
+
+	public EchoServer(int socketTimeout) throws IOException {
+		serverSocket.setSoTimeout(socketTimeout);
+		this.socketTimeout = socketTimeout;
+	}
+
+	public int getLocalPort() {
+		return serverSocket.getLocalPort();
+	}
+
+	@Override
+	public void run() {
+		while (!close) {
+			try {
+				EchoWorkerThread thread = new EchoWorkerThread(serverSocket.accept(), socketTimeout);
+				thread.start();
+			} catch (IOException e) {
+				threadException = e;
+			}
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		for (EchoWorkerThread thread : workerThreads) {
+			thread.close();
+			thread.join();
+		}
+		close = true;
+		if (threadException != null) {
+			throw threadException;
+		}
+		serverSocket.close();
+		this.join();
+	}
+
+	private static class EchoWorkerThread extends Thread implements AutoCloseable {
+		private final PrintWriter output;
+		private final BufferedReader input;
+
+		private volatile boolean close;
+		private Exception threadException;
+
+		public EchoWorkerThread(Socket clientSocket, int socketTimeout) throws IOException {
+			output = new PrintWriter(clientSocket.getOutputStream(), true);
+			input = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
+			clientSocket.setSoTimeout(socketTimeout);
+		}
+
+		@Override
+		public void run() {
+			try {
+				String inputLine;
+				while (!close && (inputLine = input.readLine()) != null) {
+					output.println(inputLine);
+				}
+			} catch (IOException e) {
+				threadException = e;
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			close = true;
+			if (threadException != null) {
+				throw threadException;
+			}
+			input.close();
+			output.close();
+			this.join();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
new file mode 100644
index 0000000..0046868
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.SocketException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for NetworkFailuresProxy.
+ */
+public class NetworkFailuresProxyTest {
+	public static final int SOCKET_TIMEOUT = 500_000;
+
+	@Test
+	public void testProxy() throws Exception {
+		try (
+				EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT);
+				NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort());
+				EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT))
{
+			echoServer.start();
+
+			assertEquals("42", echoClient.write("42"));
+			assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!"));
+		}
+	}
+
+	@Test
+	public void testMultipleConnections() throws Exception {
+		try (
+				EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT);
+				NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort());
+				EchoClient echoClient1 = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT);
+				EchoClient echoClient2 = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT))
{
+			echoServer.start();
+
+			assertEquals("42", echoClient1.write("42"));
+			assertEquals("Ala ma kota!", echoClient2.write("Ala ma kota!"));
+			assertEquals("Ala hat eine Katze!", echoClient1.write("Ala hat eine Katze!"));
+		}
+	}
+
+	@Test
+	public void testBlockTraffic() throws Exception {
+		try (
+				EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT);
+				NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort()))
{
+			echoServer.start();
+
+			try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT))
{
+				assertEquals("42", echoClient.write("42"));
+				proxy.blockTraffic();
+				try {
+					echoClient.write("Ala ma kota!");
+				} catch (SocketException ex) {
+					assertEquals("Connection reset", ex.getMessage());
+				}
+			}
+
+			try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT))
{
+				assertEquals(null, echoClient.write("42"));
+			} catch (SocketException ex) {
+				assertEquals("Connection reset", ex.getMessage());
+			}
+
+			proxy.unblockTraffic();
+			try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT))
{
+				assertEquals("42", echoClient.write("42"));
+				assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!"));
+			}
+		}
+	}
+
+	/**
+	 * Simple echo client that sends a message over the network and waits for the answer.
+	 */
+	public static class EchoClient implements AutoCloseable {
+		private final Socket socket;
+		private final PrintWriter output;
+		private final BufferedReader input;
+
+		public EchoClient(String hostName, int portNumber, int socketTimeout) throws IOException
{
+			socket = new Socket(hostName, portNumber);
+			socket.setSoTimeout(socketTimeout);
+			output = new PrintWriter(socket.getOutputStream(), true);
+			input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+		}
+
+		public String write(String message) throws IOException {
+			output.println(message);
+			return input.readLine();
+		}
+
+		@Override
+		public void close() throws Exception {
+			input.close();
+			output.close();
+			socket.close();
+		}
+	}
+}


Mime
View raw message