qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject svn commit: r1143866 - in /qpid/trunk/qpid/java/common/src: main/java/org/apache/qpid/transport/ main/java/org/apache/qpid/transport/network/ main/java/org/apache/qpid/transport/network/io/ main/java/org/apache/qpid/transport/network/security/sasl/ mai...
Date Thu, 07 Jul 2011 15:09:14 GMT
Author: robbie
Date: Thu Jul  7 15:09:14 2011
New Revision: 1143866

URL: http://svn.apache.org/viewvc?rev=1143866&view=rev
Log:
QPID-3342: rationalise the existing 0-10 transport code and introduce new NetworkTransport
+ NetworkConnection abstraction. Decouple IoSender and IoReceiver, initiate their threads
after the constructor completes.

Applied patch by Keith Wall and myself

Added:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
      - copied, changed from r1143865, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
      - copied, changed from r1143865, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
      - copied, changed from r1143865, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
Removed:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
Modified:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Thu
Jul  7 15:09:14 2011
@@ -27,6 +27,7 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Connection.State.OPENING;
 import static org.apache.qpid.transport.Connection.State.RESUMING;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -40,6 +41,12 @@ import java.util.concurrent.atomic.Atomi
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslServer;
 
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.io.IoNetworkTransport;
 import org.apache.qpid.transport.network.security.SecurityLayer;
 import org.apache.qpid.transport.util.Logger;
 import org.apache.qpid.transport.util.Waiter;
@@ -235,13 +242,15 @@ public class Connection extends Connecti
             state = OPENING;
             userID = settings.getUsername();
             delegate = new ClientDelegate(settings);
-           
-            TransportBuilder transport = new TransportBuilder();
-            transport.init(this);
-            this.sender = transport.buildSenderPipe();
-            transport.buildReceiverPipe(this);
-            this.securityLayer = transport.getSecurityLayer();
-            
+
+            securityLayer = new SecurityLayer();
+            securityLayer.init(this);
+
+            OutgoingNetworkTransport transport = new IoNetworkTransport();
+            Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new
Assembler(this)));
+            NetworkConnection network = transport.connect(settings, receiver, null);
+            sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize());
+
             send(new ProtocolHeader(1, 0, 10));
 
             Waiter w = new Waiter(lock, timeout);

Copied: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
(from r1143865, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?p2=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java&p1=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java&r1=1143865&r2=1143866&rev=1143866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
Thu Jul  7 15:09:14 2011
@@ -1,5 +1,5 @@
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,29 +7,37 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
-package org.apache.qpid.transport.network.io;
+package org.apache.qpid.transport.network;
 
-import java.net.Socket;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 
 import org.apache.qpid.transport.Sender;
 
-public interface IoContext
+public interface NetworkConnection
 {
     Sender<ByteBuffer> getSender();
-    
-    IoReceiver getReceiver();
 
-    Socket getSocket();
-}
+    void close();
+
+    /**
+     * Returns the remote address of the underlying socket.
+     */
+    SocketAddress getRemoteAddress();
+
+    /**
+     * Returns the local address of the underlying socket.
+     */
+    SocketAddress getLocalAddress();
+}
\ No newline at end of file

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
Thu Jul  7 15:09:14 2011
@@ -20,19 +20,9 @@
  */
 package org.apache.qpid.transport.network;
 
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.ConnectionSettings;
-
 public interface NetworkTransport
 {
-    public void init(ConnectionSettings settings);
-    
-    public Sender<ByteBuffer> sender();
-    
-    public void receiver(Receiver<ByteBuffer> delegate);    
-    
     public void close();
-}
\ No newline at end of file
+
+    public NetworkConnection getConnection();
+}

Copied: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
(from r1143865, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java?p2=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java&p1=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java&r1=1143865&r2=1143866&rev=1143866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
Thu Jul  7 15:09:14 2011
@@ -22,17 +22,11 @@ package org.apache.qpid.transport.networ
 
 import java.nio.ByteBuffer;
 
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
 
-public interface NetworkTransport
+public interface OutgoingNetworkTransport extends NetworkTransport
 {
-    public void init(ConnectionSettings settings);
-    
-    public Sender<ByteBuffer> sender();
-    
-    public void receiver(Receiver<ByteBuffer> delegate);    
-    
-    public void close();
+    public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer>
delegate, SSLContextFactory sslFactory);
 }
\ No newline at end of file

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1143866&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
(added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
Thu Jul  7 15:09:14 2011
@@ -0,0 +1,82 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IoNetworkConnection implements NetworkConnection
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(IoNetworkConnection.class);
+    private final Socket _socket;
+    private final long _timeout;
+    private final IoSender _ioSender;
+    private final IoReceiver _ioReceiver;
+    
+    public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+            int sendBufferSize, int receiveBufferSize, long timeout)
+    {
+        _socket = socket;
+        _timeout = timeout;
+
+        _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout);
+        _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
+        _ioSender.registerCloseListener(_ioReceiver);
+
+        _ioReceiver.initiate();
+        _ioSender.initiate();
+    }
+
+    public Sender<ByteBuffer> getSender()
+    {
+        return _ioSender;
+    }
+
+    public void close()
+    {
+        try
+        {
+            _ioSender.close();
+        }
+        finally
+        {
+            _ioReceiver.close(false);
+        }
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return _socket.getRemoteSocketAddress();
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return _socket.getLocalSocketAddress();
+    }
+    
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
Thu Jul  7 15:09:14 2011
@@ -27,14 +27,15 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
 import org.apache.qpid.transport.util.Logger;
 
-public class IoNetworkTransport implements NetworkTransport, IoContext
+public class IoNetworkTransport implements OutgoingNetworkTransport
 {
     static
     {
@@ -44,34 +45,31 @@ public class IoNetworkTransport implemen
             (Boolean.getBoolean("amqj.enableDirectBuffers"));
     }
 
-    private static final Logger log = Logger.get(IoNetworkTransport.class);
+    private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
 
-    private Socket socket;
-    private Sender<ByteBuffer> sender;
-    private IoReceiver receiver;
-    private long timeout = 60000; 
-    private ConnectionSettings settings;    
+    private Socket _socket;
+    private IoNetworkConnection _connection;
+    private long _timeout = 60000;
     
-    public void init(ConnectionSettings settings)
+    public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer>
delegate, SSLContextFactory sslFactory)
     {
+        int sendBufferSize = settings.getWriteBufferSize();
+        int receiveBufferSize = settings.getReadBufferSize();
+        
         try
         {
-            this.settings = settings;
-            InetAddress address = InetAddress.getByName(settings.getHost());
-            socket = new Socket();
-            socket.setReuseAddress(true);
-            socket.setTcpNoDelay(settings.isTcpNodelay());
-
-            log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
-            log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+            _socket = new Socket();
+            _socket.setReuseAddress(true);
+            _socket.setTcpNoDelay(settings.isTcpNodelay());
+            _socket.setSendBufferSize(sendBufferSize);
+            _socket.setReceiveBufferSize(receiveBufferSize);
 
-            socket.setSendBufferSize(settings.getWriteBufferSize());
-            socket.setReceiveBufferSize(settings.getReadBufferSize());
+            LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
+            LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
 
-            log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
-            log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+            InetAddress address = InetAddress.getByName(settings.getHost());
 
-            socket.connect(new InetSocketAddress(address, settings.getPort()));
+            _socket.connect(new InetSocketAddress(address, settings.getPort()));
         }
         catch (SocketException e)
         {
@@ -81,36 +79,35 @@ public class IoNetworkTransport implemen
         {
             throw new TransportException("Error connecting to broker", e);
         }
-    }
 
-    public void receiver(Receiver<ByteBuffer> delegate)
-    {
-        receiver = new IoReceiver(this, delegate,
-                2*settings.getReadBufferSize() , timeout);
-    }
-
-    public Sender<ByteBuffer> sender()
-    {
-        return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
-    }
+        try
+        {
+            _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize,
_timeout);
+        }
+        catch(Exception e)
+        {
+            try
+            {
+                _socket.close();
+            }
+            catch(IOException ioe)
+            {
+                //ignored, throw based on original exception
+            }
 
-    public void close()
-    {
-        
-    }
+            throw new TransportException("Error creating network connection", e);
+        }
 
-    public Sender<ByteBuffer> getSender()
-    {
-        return sender;
+        return _connection;
     }
 
-    public IoReceiver getReceiver()
+    public void close()
     {
-        return receiver;
+        _connection.close();
     }
 
-    public Socket getSocket()
+    public NetworkConnection getConnection()
     {
-        return socket;
+        return _connection;
     }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
Thu Jul  7 15:09:14 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.transport.network.io;
 
+import org.apache.qpid.common.Closeable;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.TransportException;
@@ -37,43 +38,54 @@ import java.util.concurrent.atomic.Atomi
  *
  */
 
-final class IoReceiver implements Runnable
+final class IoReceiver implements Runnable, Closeable
 {
 
     private static final Logger log = Logger.get(IoReceiver.class);
 
-    private final IoContext ioCtx;
     private final Receiver<ByteBuffer> receiver;
     private final int bufferSize;
     private final Socket socket;
     private final long timeout;
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Thread receiverThread;
-    private final boolean shutdownBroken =
-        ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
+    private static final boolean shutdownBroken;
+    static
+    {
+        String osName = System.getProperty("os.name");
+        shutdownBroken = osName == null ? false : osName.matches("(?i).*windows.*");
+    }
 
-    public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver,
-                      int bufferSize, long timeout)
+    public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize,
long timeout)
     {
-        this.ioCtx = ioCtx;
         this.receiver = receiver;
         this.bufferSize = bufferSize;
-        this.socket = ioCtx.getSocket();
+        this.socket = socket;
         this.timeout = timeout;
 
         try
         {
+            //Create but deliberately don't start the thread.
             receiverThread = Threading.getThreadFactory().createThread(this);
         }
         catch(Exception e)
         {
-            throw new Error("Error creating IOReceiver thread",e);
+            throw new RuntimeException("Error creating IOReceiver thread",e);
         }
         receiverThread.setDaemon(true);
         receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+    }
+
+    public void initiate()
+    {
         receiverThread.start();
     }
 
+    public void close()
+    {
+        close(false);
+    }
+
     void close(boolean block)
     {
         if (!closed.getAndSet(true))

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
Thu Jul  7 15:09:14 2011
@@ -24,8 +24,11 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.qpid.common.Closeable;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.SenderException;
@@ -43,7 +46,6 @@ public final class IoSender implements R
     // we can test other cases as well
     private final static int START = Integer.MAX_VALUE - 10;
 
-    private final IoContext ioCtx;
     private final long timeout;
     private final Socket socket;
     private final OutputStream out;
@@ -56,14 +58,13 @@ public final class IoSender implements R
     private final Object notEmpty = new Object();
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Thread senderThread;
+    private final List<Closeable> _listeners = new ArrayList<Closeable>();
     
     private volatile Throwable exception = null;
 
-
-    public IoSender(IoContext ioCtx, int bufferSize, long timeout)
+    public IoSender(Socket socket, int bufferSize, long timeout)
     {
-        this.ioCtx = ioCtx;
-        this.socket = ioCtx.getSocket();
+        this.socket = socket;
         this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
         this.timeout = timeout;
 
@@ -78,6 +79,7 @@ public final class IoSender implements R
 
         try
         {
+            //Create but deliberately don't start the thread.
             senderThread = Threading.getThreadFactory().createThread(this);             
        
         }
         catch(Exception e)
@@ -87,6 +89,10 @@ public final class IoSender implements R
         
         senderThread.setDaemon(true);
         senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+    }
+
+    public void initiate()
+    {
         senderThread.start();
     }
 
@@ -204,16 +210,20 @@ public final class IoSender implements R
                     senderThread.join(timeout);
                     if (senderThread.isAlive())
                     {
+                        log.error("join timed out");
                         throw new SenderException("join timed out");
                     }
                 }
-                ioCtx.getReceiver().close(false);
             }
             catch (InterruptedException e)
             {
+                log.error("interrupted whilst waiting for sender thread to stop");
                 throw new SenderException(e);
             }
-
+            finally
+            {
+                closeListeners();
+            }
             if (reportException && exception != null)
             {
                 throw new SenderException(exception);
@@ -221,6 +231,28 @@ public final class IoSender implements R
         }
     }
 
+    private void closeListeners()
+    {
+        Exception ex = null;
+        for(Closeable listener : _listeners)
+        {
+            try
+            {
+                listener.close();
+            }
+            catch(Exception e)
+            {
+                log.error("Exception closing listener: " + e.getMessage());
+                ex = e;
+            }
+        }
+
+        if (ex != null)
+        {
+            throw new SenderException(ex.getMessage(), ex);
+        }
+    }
+
     public void run()
     {
         final int size = buffer.length;       
@@ -304,4 +336,9 @@ public final class IoSender implements R
             throw new SenderException(e);
         }
     }
+
+    public void registerCloseListener(Closeable listener)
+    {
+        _listeners.add(listener);
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
Thu Jul  7 15:09:14 2011
@@ -42,7 +42,7 @@ import org.apache.qpid.transport.util.Lo
  * SO_RCVBUF    - amqj.receiveBufferSize
  * SO_SNDBUF    - amqj.sendBufferSize
  */
-public final class IoTransport<E> implements IoContext
+public final class IoTransport<E>
 {
 
     static
@@ -70,44 +70,63 @@ public final class IoTransport<E> implem
     IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl)
     {
         this.socket = socket;
-        
+
         if (ssl)
         {
-            SSLEngine engine = null;
-            SSLContext sslCtx;
-            try
-            {
-                sslCtx = createSSLContext();
-            }
-            catch (Exception e)
-            {
-                throw new TransportException("Error creating SSL Context", e);
-            }
-            
-            try
-            {
-                engine = sslCtx.createSSLEngine();
-                engine.setUseClientMode(true);
-            }
-            catch(Exception e)
-            {
-                throw new TransportException("Error creating SSL Engine", e);
-            }
-            
-            this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout));
-            this.endpoint = binding.endpoint(sender);
-            this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
-                                           2*readBufferSize, timeout);
-            
-            log.info("SSL Sender and Receiver initiated");
+            setupSSLTransport(socket, binding);
         }
         else
         {
-            this.sender = new IoSender(this, 2*writeBufferSize, timeout);
-            this.endpoint = binding.endpoint(sender);
-            this.receiver = new IoReceiver(this, binding.receiver(endpoint),
-                                           2*readBufferSize, timeout);
+            setupTransport(socket, binding);
+        }
+    }
+
+    private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding)
+    {
+        IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
+        ios.initiate();
+
+        this.sender = ios;
+        this.endpoint = binding.endpoint(sender);
+        this.receiver = new IoReceiver(socket, binding.receiver(endpoint),
+                                       2*readBufferSize, timeout);
+        this.receiver.initiate();
+
+        ios.registerCloseListener(this.receiver);
+    }
+
+    private void setupSSLTransport(Socket socket, Binding<E, ByteBuffer> binding)
+    {
+        SSLEngine engine = null;
+        SSLContext sslCtx;
+        try
+        {
+            sslCtx = createSSLContext();
+        }
+        catch (Exception e)
+        {
+            throw new TransportException("Error creating SSL Context", e);
+        }
+
+        try
+        {
+            engine = sslCtx.createSSLEngine();
+            engine.setUseClientMode(true);
         }
+        catch(Exception e)
+        {
+            throw new TransportException("Error creating SSL Engine", e);
+        }
+        IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
+        ios.initiate();
+        this.sender = new SSLSender(engine,ios);
+        this.endpoint = binding.endpoint(sender);
+        this.receiver = new IoReceiver(socket, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
+                2*readBufferSize, timeout);
+        this.receiver.initiate();
+        ios.registerCloseListener(this.receiver);
+
+        log.info("SSL Sender and Receiver initiated");
     }
 
     public Sender<ByteBuffer> getSender()

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
Thu Jul  7 15:09:14 2011
@@ -43,8 +43,7 @@ public class SASLSender extends SASLEncr
         this.delegate = delegate;
         log.debug("SASL Sender enabled");
     }
-    
-    @Override
+
     public void close() 
     {
         
@@ -65,13 +64,11 @@ public class SASLSender extends SASLEncr
         }
     }
 
-    @Override
     public void flush() 
     {
        delegate.flush();
     }
 
-    @Override
     public void send(ByteBuffer buf) 
     {        
         if (closed.get())
@@ -108,7 +105,6 @@ public class SASLSender extends SASLEncr
         }        
     }
 
-    @Override
     public void setIdleTimeout(int i) 
     {
         delegate.setIdleTimeout(i);

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
Thu Jul  7 15:09:14 2011
@@ -48,51 +48,45 @@ public class QpidClientX509KeyManager ex
         kmf.init(ks, keyStorePassword.toCharArray());
         this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0];
     }
-        
-    @Override
+
     public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket)
     {
         log.debug("chooseClientAlias:Returning alias " + alias);
         return alias;
     }
 
-    @Override
     public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket)
     {
         return delegate.chooseServerAlias(keyType, issuers, socket);
     }
 
-    @Override
     public X509Certificate[] getCertificateChain(String alias)
     {
         return delegate.getCertificateChain(alias);
     }
 
-    @Override
     public String[] getClientAliases(String keyType, Principal[] issuers)
     {
         log.debug("getClientAliases:Returning alias " + alias);
         return new String[]{alias};
     }
 
-    @Override
     public PrivateKey getPrivateKey(String alias)
     {
         return delegate.getPrivateKey(alias);
     }
 
-    @Override
     public String[] getServerAliases(String keyType, Principal[] issuers)
     {
         return delegate.getServerAliases(keyType, issuers);
     }
-    
+
     public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine
engine)
     {
         log.debug("chooseEngineClientAlias:Returning alias " + alias);
         return alias;
     }
-    
+
     public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine
engine) 
     {
         return delegate.chooseEngineServerAlias(keyType, issuers, engine);

Copied: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
(from r1143865, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java?p2=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java&p1=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java&r1=1143865&r2=1143866&rev=1143866&view=diff
==============================================================================
    (empty)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message