qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject svn commit: r1715194 - in /qpid/java/trunk: qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
Date Thu, 19 Nov 2015 15:16:18 GMT
Author: orudyy
Date: Thu Nov 19 15:16:18 2015
New Revision: 1715194

URL: http://svn.apache.org/viewvc?rev=1715194&view=rev
Log:
QPID-6869: Add system test that ensures messaging is reliable when the client is disconnected
abraptly

Added:
    qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java

Added: qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java?rev=1715194&view=auto
==============================================================================
--- qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
(added)
+++ qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
Thu Nov 19 15:16:18 2015
@@ -0,0 +1,553 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A basic implementation of TCP traffic forwarder between ports.
+ * It is intended to use in tests.
+ */
+public class TCPTunneler
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(TCPTunneler.class);
+
+    private final TCPWorker _tcpWorker;
+    private final ExecutorService _executor;
+
+    public TCPTunneler(final String targetHost,
+                       final int targetPort,
+                       final int proxyPort,
+                       final int numberOfConcurrentClients)
+    {
+        _executor = Executors.newFixedThreadPool(numberOfConcurrentClients * 2 + 1);
+        _tcpWorker = new TCPWorker(proxyPort, targetHost, targetPort, _executor);
+    }
+
+    public void start() throws IOException
+    {
+        _tcpWorker.start();
+    }
+
+    public void stop()
+    {
+        try
+        {
+            _tcpWorker.stop();
+        }
+        finally
+        {
+            _executor.shutdown();
+        }
+    }
+
+    public void addClientListener(TunnelListener listener)
+    {
+        _tcpWorker.addClientListener(listener);
+    }
+
+    public void removeClientListener(TunnelListener listener)
+    {
+        _tcpWorker.removeClientListener(listener);
+    }
+
+    public void disconnect(InetSocketAddress address)
+    {
+        LOGGER.info("Disconnecting {}", address);
+        if (address != null)
+        {
+            _tcpWorker.disconnect(address);
+        }
+    }
+
+    interface TunnelListener
+    {
+        void clientConnected(InetSocketAddress clientAddress);
+
+        void clientDisconnected(InetSocketAddress clientAddress);
+    }
+
+    public static class NoopTunnelListener implements TunnelListener
+    {
+        @Override
+        public void clientConnected(final InetSocketAddress clientAddress)
+        {
+        }
+
+        @Override
+        public void clientDisconnected(final InetSocketAddress clientAddress)
+        {
+        }
+    }
+
+    public static class TCPWorker implements Runnable
+    {
+        private final String _targetHost;
+        private final int _targetPort;
+        private final int _localPort;
+        private final String _hostPort;
+        private final AtomicBoolean _closed;
+        private final Collection<SocketTunnel> _tunnels;
+        private final Collection<TunnelListener> _tunnelListeners;
+        private final TunnelListener _notifyingListener;
+        private volatile ServerSocket _serverSocket;
+        private volatile ExecutorService _executor;
+
+        public TCPWorker(final int localPort,
+                         final String targetHost,
+                         final int targetPort,
+                         final ExecutorService executor)
+        {
+            _closed = new AtomicBoolean();
+            _targetHost = targetHost;
+            _targetPort = targetPort;
+            _localPort = localPort;
+            _hostPort = _targetHost + ":" + _targetPort;
+            _executor = executor;
+            _tunnels = new CopyOnWriteArrayList<>();
+            _tunnelListeners = new CopyOnWriteArrayList<>();
+            _notifyingListener = new NoopTunnelListener()
+            {
+                @Override
+                public void clientConnected(final InetSocketAddress clientAddress)
+                {
+                    notifyClientConnected(clientAddress);
+                }
+
+                @Override
+                public void clientDisconnected(final InetSocketAddress clientAddress)
+                {
+                    try
+                    {
+                        notifyClientDisconnected(clientAddress);
+                    }
+                    finally
+                    {
+                        removeTunnel(clientAddress);
+                    }
+                }
+            };
+        }
+
+        @Override
+        public void run()
+        {
+            String threadName = Thread.currentThread().getName();
+            try
+            {
+                Thread.currentThread().setName("TCPTunnelerAcceptingThread");
+                while (!_closed.get())
+                {
+                    Socket clientSocket = _serverSocket.accept();
+                    LOGGER.debug("Client opened socket {}", clientSocket);
+
+                    createTunnel(clientSocket);
+                }
+            }
+            catch (IOException e)
+            {
+                if (!_closed.get())
+                {
+                    LOGGER.error("Exception in accepting thread", e);
+                }
+            }
+            finally
+            {
+                closeServerSocket();
+                _closed.set(true);
+                Thread.currentThread().setName(threadName);
+            }
+        }
+
+        public void start()
+        {
+            LOGGER.info("Starting TCPTunneler forwarding from port {} to {}", _localPort,
_hostPort);
+            try
+            {
+                _serverSocket = new ServerSocket(_localPort);
+                _serverSocket.setReuseAddress(true);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException("Cannot start TCPTunneler on port " + _localPort,
e);
+            }
+
+            if (_serverSocket != null)
+            {
+                LOGGER.info("Listening on port {}", _localPort);
+                try
+                {
+                    _executor.execute(this);
+                }
+                catch (Exception e)
+                {
+                    try
+                    {
+                        closeServerSocket();
+                    }
+                    finally
+                    {
+                        throw new RuntimeException("Cannot start acceptor thread for TCPTunneler
on port " + _localPort,
+                                                   e);
+                    }
+                }
+            }
+        }
+
+        public void stop()
+        {
+            if (_closed.compareAndSet(false, true))
+            {
+                LOGGER.info("Stopping TCPTunneler forwarding from port {} to {}",
+                            _localPort,
+                            _hostPort);
+                try
+                {
+                    for (SocketTunnel tunnel : _tunnels)
+                    {
+                        tunnel.close();
+                    }
+                }
+                finally
+                {
+                    closeServerSocket();
+                }
+
+                LOGGER.info("TCPTunneler forwarding from port {} to {} is stopped",
+                            _localPort,
+                            _hostPort);
+            }
+        }
+
+        public void addClientListener(TunnelListener listener)
+        {
+            _tunnelListeners.add(listener);
+            for (SocketTunnel socketTunnel : _tunnels)
+            {
+                try
+                {
+                    listener.clientConnected(socketTunnel.getClientAddress());
+                }
+                catch (Exception e)
+                {
+                    LOGGER.warn("Exception on notifying client listener about connected client",
e);
+                }
+            }
+        }
+
+        public void removeClientListener(TunnelListener listener)
+        {
+            _tunnelListeners.remove(listener);
+        }
+
+        public void disconnect(final InetSocketAddress address)
+        {
+            SocketTunnel client = removeTunnel(address);
+            if (client != null && !client.isClosed())
+            {
+                client.close();
+                LOGGER.info("Tunnel for {} is disconnected", address);
+            }
+            else
+            {
+                LOGGER.info("Tunnel for {} not found", address);
+            }
+        }
+
+
+        private void createTunnel(final Socket clientSocket)
+        {
+            Socket serverSocket = null;
+            try
+            {
+                LOGGER.debug("Opening socket to {} for {}", _hostPort, clientSocket);
+                serverSocket = new Socket(_targetHost, _targetPort);
+                LOGGER.debug("Opened socket to {} for {}", serverSocket, clientSocket);
+                SocketTunnel tunnel = new SocketTunnel(clientSocket, serverSocket, _notifyingListener);
+                LOGGER.debug("Socket tunnel is created from {} to {}", clientSocket, serverSocket);
+                _tunnels.add(tunnel);
+                tunnel.start(_executor);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Cannot forward i/o traffic between {} and {}", clientSocket,
_hostPort, e);
+                SocketTunnel.closeSocket(clientSocket);
+                SocketTunnel.closeSocket(serverSocket);
+            }
+        }
+
+        private void notifyClientConnected(final InetSocketAddress clientAddress)
+        {
+            for (TunnelListener listener : _tunnelListeners)
+            {
+                try
+                {
+                    listener.clientConnected(clientAddress);
+                }
+                catch (Exception e)
+                {
+                    LOGGER.warn("Exception on notifying client listener about connected client",
e);
+                }
+            }
+        }
+
+
+        private void notifyClientDisconnected(final InetSocketAddress clientAddress)
+        {
+            for (TunnelListener listener : _tunnelListeners)
+            {
+                try
+                {
+                    listener.clientDisconnected(clientAddress);
+                }
+                catch (Exception e)
+                {
+                    LOGGER.warn("Exception on notifying client listener about disconnected
client", e);
+                }
+            }
+        }
+
+        private void closeServerSocket()
+        {
+            if (_serverSocket != null)
+            {
+                try
+                {
+                    _serverSocket.close();
+                }
+                catch (IOException e)
+                {
+                    LOGGER.warn("Exception on closing of accepting socket", e);
+                }
+                finally
+                {
+                    _serverSocket = null;
+                }
+            }
+        }
+
+
+        private SocketTunnel removeTunnel(final InetSocketAddress clientAddress)
+        {
+            SocketTunnel client = null;
+            for (SocketTunnel c : _tunnels)
+            {
+                if (c.isClientAddress(clientAddress))
+                {
+                    client = c;
+                    break;
+                }
+            }
+            if (client != null)
+            {
+                _tunnels.remove(client);
+            }
+            return client;
+        }
+
+    }
+
+    public static class SocketTunnel
+    {
+        private final Socket _clientSocket;
+        private final Socket _serverSocket;
+        private final TunnelListener _tunnelListener;
+        private final AtomicBoolean _closed;
+        private final ClosableStreamForwarder _inputStreamForwarder;
+        private final ClosableStreamForwarder _outputStreamForwarder;
+        private final InetSocketAddress _clientSocketAddress;
+
+        public SocketTunnel(final Socket clientSocket,
+                            final Socket serverSocket,
+                            final TunnelListener tunnelListener) throws IOException
+        {
+            _clientSocket = clientSocket;
+            _clientSocketAddress =
+                    new InetSocketAddress(clientSocket.getInetAddress().getHostName(), _clientSocket.getPort());
+            _serverSocket = serverSocket;
+            _closed = new AtomicBoolean();
+            _tunnelListener = tunnelListener;
+            _clientSocket.setKeepAlive(true);
+            _serverSocket.setKeepAlive(true);
+            _inputStreamForwarder = new ClosableStreamForwarder(new StreamForwarder(_clientSocket,
_serverSocket));
+            _outputStreamForwarder = new ClosableStreamForwarder(new StreamForwarder(_serverSocket,
_clientSocket));
+        }
+
+        public void close()
+        {
+            if (_closed.compareAndSet(false, true))
+            {
+                try
+                {
+                    closeSocket(_serverSocket);
+                    closeSocket(_clientSocket);
+                }
+                finally
+                {
+                    _tunnelListener.clientDisconnected(getClientAddress());
+                }
+            }
+        }
+
+        public void start(Executor executor) throws IOException
+        {
+            executor.execute(_inputStreamForwarder);
+            executor.execute(_outputStreamForwarder);
+            _tunnelListener.clientConnected(getClientAddress());
+        }
+
+        public boolean isClosed()
+        {
+            return _closed.get();
+        }
+
+        public boolean isClientAddress(final InetSocketAddress clientAddress)
+        {
+            return getClientAddress().equals(clientAddress);
+        }
+
+        public InetSocketAddress getClientAddress()
+        {
+            return _clientSocketAddress;
+        }
+
+
+        private static void closeSocket(Socket socket)
+        {
+            if (socket != null)
+            {
+                try
+                {
+                    socket.close();
+                }
+                catch (IOException e)
+                {
+                    LOGGER.warn("Exception on closing of socket {}", socket, e);
+                }
+            }
+        }
+
+
+        private class ClosableStreamForwarder implements Runnable
+        {
+            private StreamForwarder _streamForwarder;
+
+            public ClosableStreamForwarder(StreamForwarder streamForwarder)
+            {
+                _streamForwarder = streamForwarder;
+            }
+
+            @Override
+            public void run()
+            {
+                Thread currentThread = Thread.currentThread();
+                String originalThreadName = currentThread.getName();
+                try
+                {
+                    currentThread.setName(_streamForwarder.getName());
+                    _streamForwarder.run();
+                }
+                finally
+                {
+                    close();
+                    currentThread.setName(originalThreadName);
+                }
+            }
+        }
+    }
+
+    public static class StreamForwarder implements Runnable
+    {
+        private static final int BUFFER_SIZE = 4096;
+
+        private final InputStream _inputStream;
+        private final OutputStream _outputStream;
+        private final String _name;
+
+        public StreamForwarder(Socket input, Socket output) throws IOException
+        {
+            _inputStream = input.getInputStream();
+            _outputStream = output.getOutputStream();
+            _name = "Forwarder-" + input.getInetAddress().getHostName() + ":" + input.getPort()
+ "->"
+                    + output.getInetAddress().getHostName() + ":" + output.getPort();
+        }
+
+        @Override
+        public void run()
+        {
+            byte[] buffer = new byte[BUFFER_SIZE];
+            int bytesRead;
+            try
+            {
+                while ((bytesRead = _inputStream.read(buffer)) != -1)
+                {
+                    _outputStream.write(buffer, 0, bytesRead);
+                    _outputStream.flush();
+                }
+            }
+            catch (IOException e)
+            {
+                LOGGER.warn("Exception on forwarding data for {}: {}", _name, e.getMessage());
+            }
+            finally
+            {
+                try
+                {
+                    _inputStream.close();
+                }
+                catch (IOException e)
+                {
+                    // ignore
+                }
+
+                try
+                {
+                    _outputStream.close();
+                }
+                catch (IOException e)
+                {
+                    // ignore
+                }
+            }
+        }
+
+
+        public String getName()
+        {
+            return _name;
+        }
+    }
+}

Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java?rev=1715194&view=auto
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
(added)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/AbruptClientDisconnectTest.java
Thu Nov 19 15:16:18 2015
@@ -0,0 +1,491 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TCPTunneler;
+import org.apache.qpid.url.URLSyntaxException;
+
+public class AbruptClientDisconnectTest extends QpidBrokerTestCase
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbruptClientDisconnectTest.class);
+    private static final String CONNECTION_URL_TEMPLATE =
+            "amqp://guest:guest@clientid/?brokerlist='localhost:%d?failover='false''";
+
+    private TCPTunneler _tcpTunneler;
+    private Connection _tunneledConnection;
+    private ExecutorService _executorService;
+    private Queue _testQueue;
+    private Connection _utilityConnection;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _executorService = Executors.newFixedThreadPool(3);
+
+        _testQueue = getTestQueue();
+        _utilityConnection = getConnection();
+        _utilityConnection.start();
+
+        // create queue
+        consumeIgnoringLastSeenOmission(_utilityConnection, _testQueue, 1, 0, -1);
+
+        _tcpTunneler = new TCPTunneler("localhost", getPort(), getFailingPort(), 1);
+        _tcpTunneler.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            if (_tunneledConnection != null)
+            {
+                _tunneledConnection.close();
+            }
+        }
+        finally
+        {
+            try
+            {
+                if (_tcpTunneler != null)
+                {
+                    _tcpTunneler.stop();
+                }
+            }
+            finally
+            {
+                if (_executorService != null)
+                {
+                    _executorService.shutdown();
+                }
+                super.tearDown();
+            }
+        }
+
+    }
+
+    public void testMessagingOnAbruptConnectivityLostWhilstPublishing() throws Exception
+    {
+        final ClientMonitor clientMonitor = new ClientMonitor();
+        _tunneledConnection = createTunneledConnection(clientMonitor);
+        Producer producer =
+                new Producer(_tunneledConnection, _testQueue, Session.SESSION_TRANSACTED,
0, 10, new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        _tcpTunneler.disconnect(clientMonitor.getClientAddress());
+                    }
+                }
+                );
+        _executorService.submit(producer);
+        boolean disconnected = clientMonitor.awaitDisconnect(10, TimeUnit.SECONDS);
+        producer.stop();
+        assertTrue("Client disconnect did not happen", disconnected);
+        assertTrue("Unexpected number of published messages " + producer.getNumberOfPublished(),
+                   producer.getNumberOfPublished() >= 10);
+
+        consumeIgnoringLastSeenOmission(_utilityConnection, _testQueue, 0, producer.getNumberOfPublished(),
-1);
+    }
+
+
+    public void testMessagingOnAbruptConnectivityLostWhilstConsuming() throws Exception
+    {
+        int minimumNumberOfMessagesToProduce = 40;
+        int minimumNumberOfMessagesToConsume = 20;
+
+        // produce minimum required number of messages before starting consumption
+        final CountDownLatch queueDataWaiter = new CountDownLatch(1);
+        final Producer producer = new Producer(_utilityConnection,
+                                               _testQueue,
+                                               Session.SESSION_TRANSACTED,
+                                               0,
+                                               minimumNumberOfMessagesToProduce,
+                                               new Runnable()
+                                               {
+                                                   @Override
+                                                   public void run()
+                                                   {
+                                                       queueDataWaiter.countDown();
+                                                   }
+                                               });
+
+        // create tunneled connection to consume messages
+        final ClientMonitor clientMonitor = new ClientMonitor();
+        _tunneledConnection = createTunneledConnection(clientMonitor);
+        _tunneledConnection.start();
+
+        // consumer will consume minimum number of messages before abrupt disconnect
+        Consumer consumer = new Consumer(_tunneledConnection,
+                                         _testQueue,
+                                         Session.SESSION_TRANSACTED,
+                                         minimumNumberOfMessagesToConsume,
+                                         new Runnable()
+                                         {
+                                             @Override
+                                             public void run()
+                                             {
+                                                 producer.stop();
+                                                 _tcpTunneler.disconnect(clientMonitor.getClientAddress());
+                                             }
+                                         }
+        );
+
+        LOGGER.debug("Waiting for producer to produce {} messages before consuming", minimumNumberOfMessagesToProduce);
+        _executorService.submit(producer);
+
+        assertTrue("Latch waiting for produced messages was not count down", queueDataWaiter.await(10,
TimeUnit.SECONDS));
+
+        LOGGER.debug("Producer sent {} messages. Starting consumption...", producer.getNumberOfPublished());
+
+        _executorService.submit(consumer);
+
+        boolean disconnectOccurred = clientMonitor.awaitDisconnect(10, TimeUnit.SECONDS);
+        consumer.stop();
+        producer.stop();
+
+        LOGGER.debug("Producer sent {} messages. Consumer received {} messages",
+                     producer.getNumberOfPublished(),
+                     consumer.getNumberOfConsumed());
+
+        assertTrue("Client disconnect did not happen", disconnectOccurred);
+        assertTrue("Unexpected number of published messages " + producer.getNumberOfPublished(),
+                   producer.getNumberOfPublished() >= minimumNumberOfMessagesToProduce);
+        assertTrue("Unexpected number of consumed messages " + consumer.getNumberOfConsumed(),
+                   consumer.getNumberOfConsumed() >= minimumNumberOfMessagesToConsume);
+
+        LOGGER.debug("Remaining number to consume {}.",
+                     (producer.getNumberOfPublished() - consumer.getNumberOfConsumed()));
+        consumeIgnoringLastSeenOmission(_utilityConnection,
+                                        _testQueue,
+                                        consumer.getNumberOfConsumed(),
+                                        producer.getNumberOfPublished(),
+                                        consumer.getLastSeenMessageIndex());
+
+    }
+
+
+    private Connection createTunneledConnection(final ClientMonitor clientMonitor)
+            throws URLSyntaxException, JMSException
+    {
+        final ConnectionURL url = new AMQConnectionURL(String.format(CONNECTION_URL_TEMPLATE,
getFailingPort()));
+        Connection tunneledConnection = getConnection(url);
+        _tcpTunneler.addClientListener(clientMonitor);
+        final AtomicReference _exception = new AtomicReference();
+        tunneledConnection.setExceptionListener(new ExceptionListener()
+        {
+            @Override
+            public void onException(final JMSException exception)
+            {
+                _exception.set(exception);
+                _tcpTunneler.disconnect(clientMonitor.getClientAddress());
+            }
+        });
+        return tunneledConnection;
+    }
+
+    private void consumeIgnoringLastSeenOmission(final Connection connection,
+                                                 final Queue testQueue,
+                                                 int fromIndex,
+                                                 int toIndex,
+                                                 int consumerLastSeenMessageIndex)
+            throws JMSException
+    {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(testQueue);
+        int expectedIndex = fromIndex;
+        while (expectedIndex < toIndex)
+        {
+            Message message = consumer.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Expected message with index " + expectedIndex + " but got null",
message);
+            int messageIndex = message.getIntProperty(INDEX);
+            LOGGER.debug("Received message with index {}, expected index is {}", messageIndex,
expectedIndex);
+            if (messageIndex != expectedIndex
+                && expectedIndex == fromIndex
+                && messageIndex == consumerLastSeenMessageIndex + 1)
+            {
+                LOGGER.debug( "Broker transaction was completed for message {}"
+                              + " but there was no network to notify client about its completion.",
+                        consumerLastSeenMessageIndex);
+                expectedIndex = messageIndex;
+            }
+            assertEquals("Unexpected message index", expectedIndex, messageIndex);
+            expectedIndex++;
+        }
+        session.close();
+    }
+
+    private class ClientMonitor extends TCPTunneler.NoopTunnelListener
+    {
+        private final CountDownLatch _closeLatch = new CountDownLatch(1);
+        private final AtomicReference<InetSocketAddress> _clientAddress = new AtomicReference();
+
+        @Override
+        public void clientConnected(final InetSocketAddress clientAddress)
+        {
+            _clientAddress.set(clientAddress);
+        }
+
+        @Override
+        public void clientDisconnected(final InetSocketAddress clientAddress)
+        {
+            if (clientAddress.equals(getClientAddress()))
+            {
+                _closeLatch.countDown();
+            }
+        }
+
+        public boolean awaitDisconnect(int period, TimeUnit timeUnit) throws InterruptedException
+        {
+            return _closeLatch.await(period, timeUnit);
+        }
+
+        public InetSocketAddress getClientAddress()
+        {
+            return _clientAddress.get();
+        }
+    }
+
+    private class Producer implements Runnable
+    {
+        private final Runnable _runnable;
+        private final Session _session;
+        private final MessageProducer _messageProducer;
+        private final int _numberOfMessagesToInvokeRunnableAfter;
+        private final int _delay;
+        private volatile int _publishedMessageCounter;
+        private volatile Exception _exception;
+        private volatile Thread _thread;
+        private AtomicBoolean _closed = new AtomicBoolean();
+
+        public Producer(Connection connection, Destination queue, int acknowledgeMode, int
publishDelay,
+                        int numberOfMessagesToInvokeRunnableAfter, Runnable runnableToInvoke)
+                throws JMSException
+        {
+            _session = connection.createSession(acknowledgeMode == Session.SESSION_TRANSACTED,
acknowledgeMode);
+            _messageProducer = _session.createProducer(queue);
+            _runnable = runnableToInvoke;
+            _numberOfMessagesToInvokeRunnableAfter = numberOfMessagesToInvokeRunnableAfter;
+            _delay = publishDelay;
+        }
+
+        @Override
+        public void run()
+        {
+            _thread = Thread.currentThread();
+            try
+            {
+                Message message = _session.createMessage();
+                while (!_closed.get())
+                {
+                    if (_publishedMessageCounter == _numberOfMessagesToInvokeRunnableAfter
&& _runnable != null)
+                    {
+                        _executorService.execute(_runnable);
+                    }
+
+                    message.setIntProperty(INDEX, _publishedMessageCounter);
+                    _messageProducer.send(message);
+                    if (_session.getTransacted())
+                    {
+                        _session.commit();
+                    }
+                    LOGGER.debug("Produced message with index {}", _publishedMessageCounter);
+                    _publishedMessageCounter++;
+
+                    if (_delay > 0 && !_closed.get())
+                    {
+                        synchronized (this)
+                        {
+                            this.wait(_delay);
+                        }
+                    }
+                }
+                LOGGER.debug("Stopping producer gracefully");
+            }
+            catch (Exception e)
+            {
+                LOGGER.debug("Stopping producer due to exception", e);
+                _exception = e;
+            }
+        }
+
+        public void stop()
+        {
+            if (_closed.compareAndSet(false, true))
+            {
+                synchronized (this)
+                {
+                    this.notify();
+                }
+
+                if (_thread != null)
+                {
+                    try
+                    {
+                        _thread.join(2000);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        _thread.interrupt();
+                    }
+                }
+            }
+        }
+
+        public int getNumberOfPublished()
+        {
+            return _publishedMessageCounter;
+        }
+
+        public Exception getException()
+        {
+            return _exception;
+        }
+
+    }
+
+    private class Consumer implements Runnable
+    {
+        private final Runnable _runnable;
+        private final Session _session;
+        private final MessageConsumer _messageConsumer;
+        private final int _numberOfMessagesToInvokeRunnableAfter;
+        private volatile int _consumedMessageCounter;
+        private volatile Exception _exception;
+        private volatile Thread _thread;
+        private AtomicBoolean _closed = new AtomicBoolean();
+        private volatile int _lastSeenMessageIndex;
+
+        public Consumer(Connection connection,
+                        Destination queue,
+                        int acknowledgeMode,
+                        int numberOfMessagesToInvokeRunnableAfter,
+                        Runnable runnableToInvoke)
+                throws JMSException
+        {
+            _session = connection.createSession(acknowledgeMode == Session.SESSION_TRANSACTED,
acknowledgeMode);
+            _messageConsumer = _session.createConsumer(queue);
+            _runnable = runnableToInvoke;
+            _numberOfMessagesToInvokeRunnableAfter = numberOfMessagesToInvokeRunnableAfter;
+        }
+
+        @Override
+        public void run()
+        {
+            _thread = Thread.currentThread();
+            try
+            {
+                while (!_closed.get())
+                {
+                    if (_consumedMessageCounter == _numberOfMessagesToInvokeRunnableAfter
&& _runnable != null)
+                    {
+                        _executorService.execute(_runnable);
+                    }
+
+                    Message message = _messageConsumer.receive(RECEIVE_TIMEOUT);
+                    if (message != null)
+                    {
+                        int messageIndex = message.getIntProperty(INDEX);
+                        _lastSeenMessageIndex = messageIndex;
+                        LOGGER.debug("Received message with index {}, expected index {}",
+                                     messageIndex,
+                                     _consumedMessageCounter);
+                        assertEquals("Unexpected message index",
+                                     _consumedMessageCounter,
+                                     messageIndex);
+
+                        if (_session.getTransacted())
+                        {
+                            _session.commit();
+                            LOGGER.debug("Committed message with index {}", messageIndex);
+                        }
+                        _consumedMessageCounter++;
+                    }
+                }
+                LOGGER.debug("Stopping consumer gracefully");
+            }
+            catch (Exception e)
+            {
+                LOGGER.debug("Stopping consumer due to exception, number of consumed {}",
_consumedMessageCounter, e);
+                _exception = e;
+            }
+        }
+
+        public void stop()
+        {
+            if (_closed.compareAndSet(false, true))
+            {
+                if (_thread != null)
+                {
+                    try
+                    {
+                        _thread.join(2000);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        _thread.interrupt();
+                    }
+                }
+            }
+        }
+
+        public int getNumberOfConsumed()
+        {
+            return _consumedMessageCounter;
+        }
+
+        public Exception getException()
+        {
+            return _exception;
+        }
+
+        public int getLastSeenMessageIndex()
+        {
+            return _lastSeenMessageIndex;
+        }
+
+    }
+}




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message