qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject svn commit: r1143867 [2/2] - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/se...
Date Thu, 07 Jul 2011 15:10:32 GMT
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java?rev=1143867&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
(added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
Thu Jul  7 15:10:30 2011
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.util.SessionUtil;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MinaNetworkHandler extends IoHandlerAdapter
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(MinaNetworkHandler.class);
+
+    private ProtocolEngineFactory _factory;
+    private SSLContextFactory _sslFactory = null;
+
+    static
+    {
+        boolean directBuffers = Boolean.getBoolean("amqj.enableDirectBuffers");
+        LOGGER.debug("Using " + (directBuffers ? "direct" : "heap") + " buffers");
+        ByteBuffer.setUseDirectBuffers(directBuffers);
+
+        //override the MINA defaults to prevent use of the PooledByteBufferAllocator
+        ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+    }
+
+    public MinaNetworkHandler(SSLContextFactory sslFactory, ProtocolEngineFactory factory)
+    {
+        _sslFactory = sslFactory;
+        _factory = factory;
+    }
+
+    public MinaNetworkHandler(SSLContextFactory sslFactory)
+    {
+        this(sslFactory, null);
+    }
+
+    public void messageReceived(IoSession session, Object message)
+    {
+        ProtocolEngine engine = (ProtocolEngine) session.getAttachment();
+        ByteBuffer buf = (ByteBuffer) message;
+        try
+        {
+            engine.received(buf.buf());
+        }
+        catch (RuntimeException re)
+        {
+            engine.exception(re);
+        }
+    }
+
+    public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception
+    {
+        ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment();
+        if(engine != null)
+        {
+            LOGGER.error("Exception caught by Mina", throwable);
+            engine.exception(throwable);
+        }
+        else
+        {
+            LOGGER.error("Exception caught by Mina but without protocol engine to handle
it", throwable);
+        }
+    }
+
+    public void sessionCreated(IoSession ioSession) throws Exception
+    {
+        if(LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Created session: " + ioSession.getRemoteAddress());
+        }
+
+        SessionUtil.initialize(ioSession);
+
+        if (_sslFactory != null)
+        {
+            ioSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+                    new SSLFilter(_sslFactory.buildServerContext()));
+        }
+
+        if (_factory != null)
+        {
+           NetworkConnection netConn = new MinaNetworkConnection(ioSession);
+
+           ProtocolEngine engine = _factory.newProtocolEngine(netConn);
+           ioSession.setAttachment(engine);
+        }
+    }
+
+    public void sessionClosed(IoSession ioSession) throws Exception
+    {
+        if(LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("closed: " + ioSession.getRemoteAddress());
+        }
+
+        ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment();
+        if(engine != null)
+        {
+            engine.closed();
+        }
+        else
+        {
+            LOGGER.error("Unable to close ProtocolEngine as none was present");
+        }
+    }
+
+   
+    public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+    {
+        if (IdleStatus.WRITER_IDLE.equals(status))
+        {
+            ((ProtocolEngine) session.getAttachment()).writerIdle();
+        }
+        else if (IdleStatus.READER_IDLE.equals(status))
+        {
+            ((ProtocolEngine) session.getAttachment()).readerIdle();
+        }
+    }
+
+}

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java?rev=1143867&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
(added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
Thu Jul  7 15:10:30 2011
@@ -0,0 +1,250 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.mina;
+
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExecutorThreadModel;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.SocketConnectorFactory;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.VMBrokerMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+{
+    private static final int UNKNOWN = -1;
+    private static final int TCP = 0;
+    private static final int VM = 1;
+
+    public NetworkConnection _connection;
+    private SocketAcceptor _acceptor;
+    private InetSocketAddress _address;
+
+    public NetworkConnection connect(ConnectionSettings settings,
+            Receiver<java.nio.ByteBuffer> delegate, SSLContextFactory sslFactory)
+    {
+        int transport = getTransport(settings.getProtocol());
+        
+        IoConnectorCreator stc;
+        switch(transport)
+        {
+            case TCP:
+                stc = new IoConnectorCreator(new SocketConnectorFactory()
+                {
+                    public IoConnector newConnector()
+                    {
+                        return new SocketConnector(1, new QpidThreadExecutor()); // non-blocking
connector
+                    }
+                });
+                _connection = stc.connect(delegate, settings, sslFactory);
+                break;
+            case VM:
+                stc = new IoConnectorCreator(new SocketConnectorFactory()
+                {
+                    public IoConnector newConnector()
+                    {
+                        return new QpidVmPipeConnector();
+                    }
+                });
+                _connection = stc.connect(delegate, settings, sslFactory);
+                break;
+            case UNKNOWN:
+            default:
+                    throw new TransportException("Unknown protocol: " + settings.getProtocol());
+        }
+
+        return _connection;
+    }
+
+    private static int getTransport(String transport)
+    {
+        if (transport.equals(Transport.TCP))
+        {
+            return TCP;
+        }
+
+        if (transport.equals(Transport.VM))
+        {
+            return VM;
+        }
+
+        return -1;
+    }
+
+    public void close()
+    {
+        if(_connection != null)
+        {
+            _connection.close();
+        }
+        if (_acceptor != null)
+        {
+            _acceptor.unbindAll();
+        }
+    }
+
+    public NetworkConnection getConnection()
+    {
+        return _connection;
+    }
+
+    public void accept(final NetworkTransportConfiguration config, final ProtocolEngineFactory
factory,
+            final SSLContextFactory sslFactory)
+    {
+        int processors = config.getConnectorProcessors();
+        
+        if (Transport.TCP.equalsIgnoreCase(config.getTransport()))
+        {
+            _acceptor = new SocketAcceptor(processors, new NewThreadExecutor());
+    
+            SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
+            sconfig.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Acceptor)"));
+            SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+            sc.setTcpNoDelay(config.getTcpNoDelay());
+            sc.setSendBufferSize(config.getSendBufferSize());
+            sc.setReceiveBufferSize(config.getReceiveBufferSize());
+
+            if (config.getHost().equals(WILDCARD_ADDRESS))
+            {
+                _address = new InetSocketAddress(config.getPort());
+            }
+            else
+            {
+                _address = new InetSocketAddress(config.getHost(), config.getPort());
+            }
+        }
+        else
+        {
+            throw new TransportException("Unknown transport: " + config.getTransport());
+        }
+
+        try
+        {
+            _acceptor.bind(_address, new MinaNetworkHandler(sslFactory, factory));
+        }
+        catch (IOException e)
+        {
+            throw new TransportException("Could not bind to " + _address, e);
+        }
+    }
+
+
+    private static class IoConnectorCreator
+    {
+        private static final Logger LOGGER = LoggerFactory.getLogger(IoConnectorCreator.class);
+        
+        private static final int CLIENT_DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+        private SocketConnectorFactory _ioConnectorFactory;
+        
+        public IoConnectorCreator(SocketConnectorFactory socketConnectorFactory)
+        {
+            _ioConnectorFactory = socketConnectorFactory;
+        }
+        
+        public NetworkConnection connect(Receiver<java.nio.ByteBuffer> receiver, ConnectionSettings
settings, SSLContextFactory sslFactory)
+        {
+            final IoConnector ioConnector = _ioConnectorFactory.newConnector();
+            final SocketAddress address;
+            final String protocol = settings.getProtocol();
+            final int port = settings.getPort();
+
+            if (Transport.TCP.equalsIgnoreCase(protocol))
+            {
+                address = new InetSocketAddress(settings.getHost(), port);
+            }
+            else if(Transport.VM.equalsIgnoreCase(protocol))
+            {
+                synchronized (VMBrokerMap.class)
+                {
+                    if(!VMBrokerMap.contains(port))
+                    {
+                        throw new TransportException("VM broker on port " + port + " does
not exist.");
+                    }
+                }
+
+                address = new VmPipeAddress(port);
+            }
+            else
+            {
+                throw new TransportException("Unknown transport: " + protocol);
+            }
+
+            LOGGER.info("Attempting connection to " + address);
+
+            if (ioConnector instanceof SocketConnector)
+            {
+                SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+                cfg.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Client)"));
+
+                SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+                scfg.setTcpNoDelay(true);
+                scfg.setSendBufferSize(CLIENT_DEFAULT_BUFFER_SIZE);
+                scfg.setReceiveBufferSize(CLIENT_DEFAULT_BUFFER_SIZE);
+
+                // Don't have the connector's worker thread wait around for other
+                // connections (we only use one SocketConnector per connection
+                // at the moment anyway). This allows short-running
+                // clients (like unit tests) to complete quickly.
+                ((SocketConnector) ioConnector).setWorkerTimeout(0);
+            }
+
+            ConnectFuture future = ioConnector.connect(address, new MinaNetworkHandler(sslFactory),
ioConnector.getDefaultConfig());
+            future.join();
+            if (!future.isConnected())
+            {
+                throw new TransportException("Could not open connection");
+            }
+
+            IoSession session = future.getSession();
+            session.setAttachment(receiver);
+
+            return new MinaNetworkConnection(session);
+        }
+    }
+}

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java?rev=1143867&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
(added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
Thu Jul  7 15:10:30 2011
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.mina;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.qpid.transport.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MinaSender
+ */
+public class MinaSender implements Sender<java.nio.ByteBuffer>
+{
+    private static final Logger _log = LoggerFactory.getLogger(MinaSender.class);
+    
+    private final IoSession _session;
+    private WriteFuture _lastWrite;
+
+    public MinaSender(IoSession session)
+    {
+        _session = session;
+    }
+
+    public synchronized void send(java.nio.ByteBuffer msg)
+    {
+        _log.debug("sending data:");
+        ByteBuffer mina = ByteBuffer.allocate(msg.limit());
+        mina.put(msg);
+        mina.flip();
+        _lastWrite = _session.write(mina);
+        _log.debug("sent data:");
+    }
+
+    public synchronized void flush()
+    {
+        if (_lastWrite != null)
+        {
+            _lastWrite.join();
+        }
+    }
+
+    public void close()
+    {
+        // MINA will sometimes throw away in-progress writes when you ask it to close
+        flush();
+        CloseFuture closed = _session.close();
+        closed.join();
+    }
+    
+    public void setIdleTimeout(int i)
+    {
+        //TODO:
+        //We are instead using the setMax[Read|Write]IdleTime methods in 
+        //MinaNetworkConnection for this. Should remove this method from
+        //sender interface, but currently being used by IoSender for 0-10.
+    }
+}

Copied: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/MockSender.java
(from r1143866, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/MockSender.java?p2=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/MockSender.java&p1=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java&r1=1143866&r2=1143867&rev=1143867&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
(original)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/MockSender.java Thu
Jul  7 15:10:30 2011
@@ -18,14 +18,30 @@
  * under the License.
  *
  */
-package org.apache.qpid.protocol;
+package org.apache.qpid.transport;
 
-import org.apache.qpid.transport.NetworkDriver;
+import java.nio.ByteBuffer;
 
-public interface ProtocolEngineFactory  
-{ 
- 
-  // Returns a new instance of a ProtocolEngine 
-  ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); 
-   
-} 
\ No newline at end of file
+public class MockSender implements Sender<ByteBuffer>
+{
+
+    public void setIdleTimeout(int i)
+    {
+
+    }
+
+    public void send(ByteBuffer msg)
+    {
+
+    }
+
+    public void flush()
+    {
+
+    }
+
+    public void close()
+    {
+
+    }
+}

Copied: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
(from r1143866, qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java?p2=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java&p1=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java&r1=1143866&r2=1143867&rev=1143867&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
(original)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
Thu Jul  7 15:10:30 2011
@@ -25,32 +25,32 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
 import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 /**
  * Test implementation of IoSession, which is required for some tests. Methods not being
used are not implemented,
  * so if this class is being used and some methods are to be used, then please update those.
  */
-public class TestNetworkDriver implements NetworkDriver
+public class TestNetworkConnection implements NetworkConnection
 {
-    private final ConcurrentMap attributes = new ConcurrentHashMap();
     private String _remoteHost = "127.0.0.1";
     private String _localHost = "127.0.0.1";
     private int _port = 1;
     private SocketAddress _localAddress = null;
     private SocketAddress _remoteAddress = null;
+    private final MockSender _sender;
 
-    public TestNetworkDriver()
+    public TestNetworkConnection()
     {
+        _sender = new MockSender();
     }
 
     public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
-            NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+            NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException
     {
         
     }
@@ -65,7 +65,7 @@ public class TestNetworkDriver implement
         return (_remoteAddress != null) ? _remoteAddress : new InetSocketAddress(_remoteHost,
_port);
     }
 
-    public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration
config,
+    public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkTransportConfiguration
config,
             SSLContextFactory sslFactory) throws OpenException
     {
         
@@ -130,4 +130,9 @@ public class TestNetworkDriver implement
     {
         _remoteAddress = address;
     }
+
+    public Sender<ByteBuffer> getSender()
+    {
+        return _sender;
+    }
 }

Copied: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
(from r1143866, qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java?p2=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java&p1=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java&r1=1143866&r2=1143867&rev=1143867&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
(original)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
Thu Jul  7 15:10:30 2011
@@ -21,44 +21,58 @@
 
 package org.apache.qpid.transport.network.mina;
 
-import java.net.BindException;
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import junit.framework.TestCase;
-
+import org.apache.mina.util.AvailablePortFinder;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.OpenException;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
 
-public class MINANetworkDriverTest extends TestCase
+public class MinaNetworkHandlerTest extends QpidTestCase
 {
  
     private static final String TEST_DATA = "YHALOTHAR";
-    private static int TEST_PORT = 2323;
-    private NetworkDriver _server;
-    private NetworkDriver _client;
+    private int _testPort;
+    private IncomingNetworkTransport _server;
+    private OutgoingNetworkTransport _client;
     private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's
read
     private Exception _thrownEx;
+    private ConnectionSettings _clientSettings;
+    private NetworkConnection _network;
+    private TestNetworkTransportConfiguration _brokerSettings;
 
     @Override
-    public void setUp()
+    public void setUp() throws Exception
     {
-        _server = new MINANetworkDriver();
-        _client = new MINANetworkDriver();
+        String host = InetAddress.getLocalHost().getHostName();
+        _testPort = AvailablePortFinder.getNextAvailable(10000);
+
+        _clientSettings = new ConnectionSettings();
+        _clientSettings.setHost(host);
+        _clientSettings.setPort(_testPort);
+
+        _brokerSettings = new TestNetworkTransportConfiguration(_testPort, host);
+
+        _server = new MinaNetworkTransport();
+        _client = new MinaNetworkTransport();
         _thrownEx = null;
         _countingEngine = new CountingProtocolEngine();
-        // increment the port to prevent tests clashing with each other when
-        // the port is in TIMED_WAIT state.
-        TEST_PORT++;
     }
 
     @Override
@@ -78,46 +92,40 @@ public class MINANetworkDriverTest exten
     /**
      * Tests that a socket can't be opened if a driver hasn't been bound
      * to the port and can be opened if a driver has been bound.
-     * @throws BindException
-     * @throws UnknownHostException
-     * @throws OpenException 
      */
-    public void testBindOpen() throws BindException, UnknownHostException, OpenException
 
+    public void testBindOpen() throws Exception
     {
         try
         {
-            _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+            _client.connect(_clientSettings, _countingEngine, null);
         } 
-        catch (OpenException e)
+        catch (TransportException e)
         {
             _thrownEx = e;
         }
 
         assertNotNull("Open should have failed since no engine bound", _thrownEx);      
 
         
-        _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+        _server.accept(_brokerSettings, null, null);
         
-        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        _client.connect(_clientSettings, _countingEngine, null);
     } 
     
     /**
      * Tests that a socket can't be opened after a bound NetworkDriver has been closed
-     * @throws BindException
-     * @throws UnknownHostException
-     * @throws OpenException
      */
-    public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException
 
+    public void testBindOpenCloseOpen() throws Exception
     {
-        _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
-        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+        _client.connect(_clientSettings, _countingEngine, null);
         _client.close();
         _server.close();
         
         try
         {
-            _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+            _client.connect(_clientSettings, _countingEngine, null);
         } 
-        catch (OpenException e)
+        catch (TransportException e)
         {
             _thrownEx = e;
         }
@@ -132,43 +140,60 @@ public class MINANetworkDriverTest exten
     {
         try
         {
-            _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null,
null);
+            _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
         }
-        catch (BindException e)
+        catch (TransportException e)
         {
             fail("First bind should not fail");
         }
         
         try
         {
-            _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null,
null);
+            IncomingNetworkTransport second = new MinaNetworkTransport();
+            second.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
         }
-        catch (BindException e)
+        catch (TransportException e)
         {
             _thrownEx = e;
         }
         assertNotNull("Second bind should throw BindException", _thrownEx);
-    } 
-    
+    }
+
+    /**
+     * Tests that binding to the wildcard address succeeds and a client can
+     * connect via localhost.
+     */
+    public void testWildcardBind() throws Exception
+    {
+        TestNetworkTransportConfiguration serverSettings = 
+            new TestNetworkTransportConfiguration(_testPort, WILDCARD_ADDRESS);
+
+        _server.accept(serverSettings, null, null);
+
+        try
+        {
+            _client.connect(_clientSettings, _countingEngine, null);
+        } 
+        catch (TransportException e)
+        {
+            fail("Open should have succeeded since we used a wildcard bind");        
+        }
+    }
+
     /**
      * tests that bytes sent on a network driver are received at the other end
-     * 
-     * @throws UnknownHostException
-     * @throws OpenException
-     * @throws InterruptedException 
-     * @throws BindException 
      */
-    public void testSend() throws UnknownHostException, OpenException, InterruptedException,
BindException 
+    public void testSend() throws Exception 
     {
         // Open a connection from a counting engine to an echo engine
-        _server.bind(TEST_PORT, null,  new EchoProtocolEngineSingletonFactory(), null, null);
-        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+        _network = _client.connect(_clientSettings, _countingEngine, null);
         
         // Tell the counting engine how much data we're sending
         _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
         
         // Send the data and wait for up to 2 seconds to get it back 
-        _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+        _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes()));
         _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
         
         // Check what we got
@@ -177,36 +202,30 @@ public class MINANetworkDriverTest exten
     
     /**
      * Opens a connection with a low read idle and check that it gets triggered
-     * @throws BindException 
-     * @throws OpenException 
-     * @throws UnknownHostException 
      * 
      */
-    public void testSetReadIdle() throws BindException, UnknownHostException, OpenException

+    public void testSetReadIdle() throws Exception
     {
         // Open a connection from a counting engine to an echo engine
-        _server.bind(TEST_PORT, null,  new EchoProtocolEngineSingletonFactory(), null, null);
-        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+        _network = _client.connect(_clientSettings, _countingEngine, null);
         assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle());
-        _client.setMaxReadIdle(1);
+        _network.setMaxReadIdle(1);
         sleepForAtLeast(1500);
         assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle());
     } 
     
     /**
      * Opens a connection with a low write idle and check that it gets triggered
-     * @throws BindException 
-     * @throws OpenException 
-     * @throws UnknownHostException 
      * 
      */
-    public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException

+    public void testSetWriteIdle() throws Exception
     {
         // Open a connection from a counting engine to an echo engine
-        _server.bind(TEST_PORT, null,  new EchoProtocolEngineSingletonFactory(), null, null);
-        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+        _network = _client.connect(_clientSettings, _countingEngine, null);
         assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle());
-        _client.setMaxWriteIdle(1);
+        _network.setMaxWriteIdle(1);
         sleepForAtLeast(1500);
         assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle());
     } 
@@ -216,16 +235,13 @@ public class MINANetworkDriverTest exten
      * Creates and then closes a connection from client to server and checks that the server
      * has its closed() method called. Then creates a new client and closes the server to
check
      * that the client has its closed() method called.  
-     * @throws BindException
-     * @throws UnknownHostException
-     * @throws OpenException
      */
-    public void testClosed() throws BindException, UnknownHostException, OpenException 
+    public void testClosed() throws Exception
     {
         // Open a connection from a counting engine to an echo engine
         EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory();
-        _server.bind(TEST_PORT, null, factory, null, null);
-        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        _server.accept(_brokerSettings, factory, null);
+        _network = _client.connect(_clientSettings, _countingEngine, null);
         EchoProtocolEngine serverEngine = null; 
         while (serverEngine == null)
         {
@@ -253,7 +269,7 @@ public class MINANetworkDriverTest exten
         }
         assertTrue("Server should have been closed", serverEngine.getClosed());
 
-        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        _client.connect(_clientSettings, _countingEngine, null);
         _countingEngine.setClosed(false);
         assertFalse("Client should not have been closed", _countingEngine.getClosed());
         _countingEngine.setNewLatch(1);
@@ -271,22 +287,18 @@ public class MINANetworkDriverTest exten
     /**
      * Create a connection and instruct the client to throw an exception when it gets some
data
      * and that the latch gets counted down. 
-     * @throws BindException
-     * @throws UnknownHostException
-     * @throws OpenException
-     * @throws InterruptedException
      */
-    public void testExceptionCaught() throws BindException, UnknownHostException, OpenException,
InterruptedException 
+    public void testExceptionCaught() throws Exception
     {
-        _server.bind(TEST_PORT, null,  new EchoProtocolEngineSingletonFactory(), null, null);
-        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+        _network = _client.connect(_clientSettings, _countingEngine, null);
 
 
         assertEquals("Exception should not have been thrown", 1, 
                 _countingEngine.getExceptionLatch().getCount());
         _countingEngine.setErrorOnNextRead(true);
         _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
-        _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+        _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes()));
         _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS);
         assertEquals("Exception should have been thrown", 0, 
                 _countingEngine.getExceptionLatch().getCount());
@@ -294,28 +306,24 @@ public class MINANetworkDriverTest exten
     
     /**
      * Opens a connection and checks that the remote address is the one that was asked for
-     * @throws BindException
-     * @throws UnknownHostException
-     * @throws OpenException
      */
-    public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException
+    public void testGetRemoteAddress() throws Exception
     {
-        _server.bind(TEST_PORT, null,  new EchoProtocolEngineSingletonFactory(), null, null);
-        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
-        assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT),
-                     _client.getRemoteAddress());
+        _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null);
+        _network = _client.connect(_clientSettings, _countingEngine, null);
+        assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), _testPort),
+                     _network.getRemoteAddress());
     }
 
     private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory
     {
-        EchoProtocolEngine _engine = null;
+        private EchoProtocolEngine _engine = null;
         
-        public ProtocolEngine newProtocolEngine(NetworkDriver driver)
+        public ProtocolEngine newProtocolEngine(NetworkConnection network)
         {
             if (_engine == null)
             {
-                _engine = new EchoProtocolEngine();
-                _engine.setNetworkDriver(driver);
+                _engine = new EchoProtocolEngine(network);
             }
             return getEngine();
         }
@@ -328,8 +336,6 @@ public class MINANetworkDriverTest exten
     
     public class CountingProtocolEngine implements ProtocolEngine
     {
-
-        protected NetworkDriver _driver;
         public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
         private int _readBytes;
         private CountDownLatch _latch = new CountDownLatch(0);
@@ -362,26 +368,12 @@ public class MINANetworkDriverTest exten
 
         public SocketAddress getRemoteAddress()
         {
-            if (_driver != null)
-            {
-                return _driver.getRemoteAddress();
-            } 
-            else
-            {
-                return null;
-            }
+            return _network.getRemoteAddress();
         }
         
         public SocketAddress getLocalAddress()
         {            
-            if (_driver != null)
-            {
-                return _driver.getLocalAddress();
-            } 
-            else
-            {
-                return null;
-            }
+            return _network.getLocalAddress();
         }
 
         public long getWrittenBytes()
@@ -394,11 +386,6 @@ public class MINANetworkDriverTest exten
             _readerHasBeenIdle = true;
         }
 
-        public void setNetworkDriver(NetworkDriver driver)
-        {
-            _driver = driver;
-        }
-
         public void writeFrame(AMQDataBlock frame)
         {
             
@@ -465,12 +452,18 @@ public class MINANetworkDriverTest exten
 
     private class EchoProtocolEngine extends CountingProtocolEngine
     {
+        private NetworkConnection _echoNetwork;
+
+        public EchoProtocolEngine(NetworkConnection network)
+        {
+            _echoNetwork = network;
+        }
 
         public void received(ByteBuffer msg)
         {
             super.received(msg);
             msg.rewind();
-            _driver.send(msg);
+            _echoNetwork.getSender().send(msg);
         }
     }
     
@@ -491,4 +484,52 @@ public class MINANetworkDriverTest exten
             timeLeft = period - (System.currentTimeMillis() - start);
         }
     }
+
+    private static class TestNetworkTransportConfiguration implements NetworkTransportConfiguration
+    {
+        private int _port;
+        private String _host;
+
+        public TestNetworkTransportConfiguration(final int port, final String host)
+        {
+            _port = port;
+            _host = host;
+        }
+
+        public Boolean getTcpNoDelay()
+        {
+            return true;
+        }
+
+        public Integer getReceiveBufferSize()
+        {
+            return 32768;
+        }
+
+        public Integer getSendBufferSize()
+        {
+            return 32768;
+        }
+
+        public Integer getPort()
+        {
+            return _port;
+        }
+
+        public String getHost()
+        {
+            return _host;
+        }
+
+        public String getTransport()
+        {
+            return Transport.TCP;
+        }
+
+        public Integer getConnectorProcessors()
+        {
+            return 4;
+        }
+        
+    }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?rev=1143867&r1=1143866&r2=1143867&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
Thu Jul  7 15:10:30 2011
@@ -31,21 +31,21 @@ import org.apache.qpid.client.protocol.A
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.transport.TestNetworkDriver;
+import org.apache.qpid.transport.TestNetworkConnection;
 
 public class AMQProtocolSessionTest extends QpidBrokerTestCase
 {
-    private static class AMQProtSession extends AMQProtocolSession
+    private static class TestProtocolSession extends AMQProtocolSession
     {
 
-        public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+        public TestProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
         {
             super(protocolHandler,connection);
         }
 
-        public TestNetworkDriver getNetworkDriver()
+        public TestNetworkConnection getNetworkConnection()
         {
-            return (TestNetworkDriver) _protocolHandler.getNetworkDriver();
+            return (TestNetworkConnection) _protocolHandler.getNetworkConnection();
         }
 
         public AMQShortString genQueueName()
@@ -54,7 +54,7 @@ public class AMQProtocolSessionTest exte
         }
     }
 
-    private AMQProtSession _testSession;
+    private TestProtocolSession _testSession;
 
     protected void setUp() throws Exception
     {
@@ -62,10 +62,10 @@ public class AMQProtocolSessionTest exte
 
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
         AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
-        protocolHandler.setNetworkDriver(new TestNetworkDriver());
-        
+        protocolHandler.setNetworkConnection(new TestNetworkConnection());
+
         //don't care about the values set here apart from the dummy IoSession
-        _testSession = new AMQProtSession(protocolHandler , con);
+        _testSession = new TestProtocolSession(protocolHandler , con);
     }
     
     public void testTemporaryQueueWildcard() throws UnknownHostException
@@ -100,7 +100,7 @@ public class AMQProtocolSessionTest exte
     
     private void checkTempQueueName(SocketAddress address, String queueName)
     {
-        _testSession.getNetworkDriver().setLocalAddress(address);
+        _testSession.getNetworkConnection().setLocalAddress(address);
         assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString());
     }
 }

Modified: qpid/trunk/qpid/java/test-profiles/Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=1143867&r1=1143866&r2=1143867&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Excludes Thu Jul  7 15:10:30 2011
@@ -50,4 +50,3 @@ org.apache.qpid.test.unit.ack.Acknowledg
 
 // QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change,
so the test will fail.
 org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
-

Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1143867&r1=1143866&r2=1143867&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Thu Jul  7 15:10:30 2011
@@ -77,6 +77,5 @@ org.apache.qpid.test.unit.client.channel
 
 //Temporarily adding the following until the issues are sorted out.
 //Should probably raise JIRAs for them.
-org.apache.qpid.transport.network.mina.MINANetworkDriverTest#*
 org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchange
 org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message