Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DC3DB184A9 for ; Mon, 22 Feb 2016 15:54:29 +0000 (UTC) Received: (qmail 62473 invoked by uid 500); 22 Feb 2016 15:38:46 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 62446 invoked by uid 500); 22 Feb 2016 15:38:46 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 62436 invoked by uid 99); 22 Feb 2016 15:38:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 15:38:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 81D00E040E; Mon, 22 Feb 2016 15:38:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Mon, 22 Feb 2016 15:38:41 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [17/47] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34074d71/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java new file mode 100644 index 0000000..b01a4e1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/SocketProxy.java @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.util; + +import javax.net.ssl.SSLServerSocketFactory; +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SocketProxy { + + private static final transient Logger LOG = LoggerFactory.getLogger(SocketProxy.class); + + public static final int ACCEPT_TIMEOUT_MILLIS = 100; + + private URI proxyUrl; + private URI target; + + private Acceptor acceptor; + private ServerSocket serverSocket; + + private CountDownLatch closed = new CountDownLatch(1); + + public final List connections = new LinkedList(); + + private int listenPort = 0; + + private int receiveBufferSize = -1; + + private boolean pauseAtStart = false; + + private int acceptBacklog = 50; + + public SocketProxy() throws Exception { + } + + public SocketProxy(URI uri) throws Exception { + this(0, uri); + } + + public SocketProxy(int port, URI uri) throws Exception { + listenPort = port; + target = uri; + open(); + } + + public void setReceiveBufferSize(int receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + + public void setTarget(URI tcpBrokerUri) { + target = tcpBrokerUri; + } + + public void open() throws Exception { + serverSocket = createServerSocket(target); + serverSocket.setReuseAddress(true); + if (receiveBufferSize > 0) { + serverSocket.setReceiveBufferSize(receiveBufferSize); + } + if (proxyUrl == null) { + serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog); + proxyUrl = urlFromSocket(target, serverSocket); + } else { + serverSocket.bind(new InetSocketAddress(proxyUrl.getPort())); + } + acceptor = new Acceptor(serverSocket, target); + if (pauseAtStart) { + acceptor.pause(); + } + new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start(); + closed = new CountDownLatch(1); + } + + private boolean isSsl(URI target) { + return "ssl".equals(target.getScheme()); + } + + private ServerSocket createServerSocket(URI target) throws Exception { + if (isSsl(target)) { + return SSLServerSocketFactory.getDefault().createServerSocket(); + } + return new ServerSocket(); + } + + private Socket createSocket(URI target) throws Exception { + if (isSsl(target)) { + return SSLSocketFactory.getDefault().createSocket(); + } + return new Socket(); + } + + public URI getUrl() { + return proxyUrl; + } + + /* + * close all proxy connections and acceptor + */ + public void close() { + List connections; + synchronized(this.connections) { + connections = new ArrayList(this.connections); + } + LOG.info("close, numConnections=" + connections.size()); + for (Bridge con : connections) { + closeConnection(con); + } + acceptor.close(); + closed.countDown(); + } + + /* + * close all proxy receive connections, leaving acceptor + * open + */ + public void halfClose() { + List connections; + synchronized(this.connections) { + connections = new ArrayList(this.connections); + } + LOG.info("halfClose, numConnections=" + connections.size()); + for (Bridge con : connections) { + halfCloseConnection(con); + } + } + + public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException { + return closed.await(timeoutSeconds, TimeUnit.SECONDS); + } + + /* + * called after a close to restart the acceptor on the same port + */ + public void reopen() { + LOG.info("reopen"); + try { + open(); + } catch (Exception e) { + LOG.debug("exception on reopen url:" + getUrl(), e); + } + } + + /* + * pause accepting new connections and data transfer through existing proxy + * connections. All sockets remain open + */ + public void pause() { + synchronized(connections) { + LOG.info("pause, numConnections=" + connections.size()); + acceptor.pause(); + for (Bridge con : connections) { + con.pause(); + } + } + } + + /* + * continue after pause + */ + public void goOn() { + synchronized(connections) { + LOG.info("goOn, numConnections=" + connections.size()); + for (Bridge con : connections) { + con.goOn(); + } + } + acceptor.goOn(); + } + + private void closeConnection(Bridge c) { + try { + c.close(); + } catch (Exception e) { + LOG.debug("exception on close of: " + c, e); + } + } + + private void halfCloseConnection(Bridge c) { + try { + c.halfClose(); + } catch (Exception e) { + LOG.debug("exception on half close of: " + c, e); + } + } + + public boolean isPauseAtStart() { + return pauseAtStart; + } + + public void setPauseAtStart(boolean pauseAtStart) { + this.pauseAtStart = pauseAtStart; + } + + public int getAcceptBacklog() { + return acceptBacklog; + } + + public void setAcceptBacklog(int acceptBacklog) { + this.acceptBacklog = acceptBacklog; + } + + private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception { + int listenPort = serverSocket.getLocalPort(); + + return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(), uri.getQuery(), uri.getFragment()); + } + + public class Bridge { + + private Socket receiveSocket; + private Socket sendSocket; + private Pump requestThread; + private Pump responseThread; + + public Bridge(Socket socket, URI target) throws Exception { + receiveSocket = socket; + sendSocket = createSocket(target); + if (receiveBufferSize > 0) { + sendSocket.setReceiveBufferSize(receiveBufferSize); + } + sendSocket.connect(new InetSocketAddress(target.getHost(), target.getPort())); + linkWithThreads(receiveSocket, sendSocket); + LOG.info("proxy connection " + sendSocket + ", receiveBufferSize=" + sendSocket.getReceiveBufferSize()); + } + + public void goOn() { + responseThread.goOn(); + requestThread.goOn(); + } + + public void pause() { + requestThread.pause(); + responseThread.pause(); + } + + public void close() throws Exception { + synchronized(connections) { + connections.remove(this); + } + receiveSocket.close(); + sendSocket.close(); + } + + public void halfClose() throws Exception { + receiveSocket.close(); + } + + private void linkWithThreads(Socket source, Socket dest) { + requestThread = new Pump(source, dest); + requestThread.start(); + responseThread = new Pump(dest, source); + responseThread.start(); + } + + public class Pump extends Thread { + + protected Socket src; + private Socket destination; + private AtomicReference pause = new AtomicReference(); + + public Pump(Socket source, Socket dest) { + super("SocketProxy-DataTransfer-" + source.getPort() + ":" + dest.getPort()); + src = source; + destination = dest; + pause.set(new CountDownLatch(0)); + } + + public void pause() { + pause.set(new CountDownLatch(1)); + } + + public void goOn() { + pause.get().countDown(); + } + + public void run() { + byte[] buf = new byte[1024]; + try { + InputStream in = src.getInputStream(); + OutputStream out = destination.getOutputStream(); + while (true) { + int len = in.read(buf); + if (len == -1) { + LOG.debug("read eof from:" + src); + break; + } + pause.get().await(); + out.write(buf, 0, len); + } + } catch (Exception e) { + LOG.debug("read/write failed, reason: " + e.getLocalizedMessage()); + try { + if (!receiveSocket.isClosed()) { + // for halfClose, on read/write failure if we close the + // remote end will see a close at the same time. + close(); + } + } catch (Exception ignore) { + } + } + } + } + } + + public class Acceptor implements Runnable { + + private ServerSocket socket; + private URI target; + private AtomicReference pause = new AtomicReference(); + + + public Acceptor(ServerSocket serverSocket, URI uri) { + socket = serverSocket; + target = uri; + pause.set(new CountDownLatch(0)); + try { + socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS); + } catch (SocketException e) { + e.printStackTrace(); + } + } + + public void pause() { + pause.set(new CountDownLatch(1)); + } + + public void goOn() { + pause.get().countDown(); + } + + public void run() { + try { + while(!socket.isClosed()) { + pause.get().await(); + try { + Socket source = socket.accept(); + pause.get().await(); + if (receiveBufferSize > 0) { + source.setReceiveBufferSize(receiveBufferSize); + } + LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize()); + synchronized(connections) { + connections.add(new Bridge(source, target)); + } + } catch (SocketTimeoutException expected) { + } + } + } catch (Exception e) { + LOG.debug("acceptor: finished for reason: " + e.getLocalizedMessage()); + } + } + + public void close() { + try { + socket.close(); + closed.countDown(); + goOn(); + } catch (IOException ignored) { + } + } + } + +} + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34074d71/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java new file mode 100644 index 0000000..244db59 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/Wait.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.util; + + +import java.util.concurrent.TimeUnit; + +public class Wait { + + public static final long MAX_WAIT_MILLIS = 30*1000; + public static final long SLEEP_MILLIS = 1000; + + public interface Condition { + boolean isSatisified() throws Exception; + } + + public static boolean waitFor(Condition condition) throws Exception { + return waitFor(condition, MAX_WAIT_MILLIS); + } + + public static boolean waitFor(final Condition condition, final long duration) throws Exception { + return waitFor(condition, duration, SLEEP_MILLIS); + } + + public static boolean waitFor(final Condition condition, final long duration, final long sleepMillis) throws Exception { + + final long expiry = System.currentTimeMillis() + duration; + boolean conditionSatisified = condition.isSatisified(); + while (!conditionSatisified && System.currentTimeMillis() < expiry) { + TimeUnit.MILLISECONDS.sleep(sleepMillis); + conditionSatisified = condition.isSatisified(); + } + return conditionSatisified; + } +}