activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961062 [2/14] - in /activemq/sandbox/activemq-apollo-actor: ./ activemq-all/ activemq-all/src/test/ide-resources/ activemq-all/src/test/java/org/apache/activemq/jaxb/ activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/...
Date Wed, 07 Jul 2010 03:24:44 GMT

Modified: activemq/sandbox/activemq-apollo-actor/activemq-bio/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-bio/pom.xml?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-bio/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-bio/pom.xml Wed Jul  7 03:24:02 2010
@@ -25,11 +25,11 @@
   </parent>
 
   <groupId>org.apache.activemq</groupId>
-  <artifactId>activemq-bio</artifactId>
+  <artifactId>activemq-nio</artifactId>
   <packaging>jar</packaging>
   <version>6.0-SNAPSHOT</version>
 
-  <name>ActiveMQ :: BIO</name>
+  <name>ActiveMQ :: NIO</name>
 
   <dependencies>
 
@@ -37,6 +37,7 @@
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-transport</artifactId>
     </dependency>
+    
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
@@ -48,24 +49,6 @@
       <scope>test</scope>
     </dependency>
 
-  <!--   In case we want to look at mina..
-    <dependency>
-      <groupId>org.apache.mina</groupId>
-      <artifactId>mina-core</artifactId>
-      <version>2.0.0-M4</version>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-jcl</artifactId>
-    </dependency>
--->
-    
-    <!-- Testing Dependencies -->
-
   </dependencies>
 
 </project>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:24:02 2010
@@ -16,398 +16,152 @@
  */
 package org.apache.activemq.transport.tcp;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import org.apache.activemq.transport.CompletionCallback;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.buffer.ByteArrayOutputStream;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.DispatchSource;
+
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.HashMap;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.net.SocketFactory;
-
-import org.apache.activemq.Service;
-import org.apache.activemq.transport.Transport; //import org.apache.activemq.transport.TransportLoggerFactory;
-import org.apache.activemq.transport.TransportThreadSupport;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.activemq.transport.tcp.TcpTransport.SocketState.*;
+import static org.apache.activemq.transport.tcp.TcpTransport.TransportState.*;
 
 /**
  * An implementation of the {@link Transport} interface using raw tcp/ip
  * 
- * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging
- *         improvement modifications)
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
-    private static final Log LOG = LogFactory.getLog(TcpTransport.class);
-    private static final ThreadPoolExecutor SOCKET_CLOSE;
-    protected final URI remoteLocation;
-    protected final URI localLocation;
-    protected final WireFormat wireFormat;
-
-    protected int connectionTimeout = 30000;
-    protected int soTimeout;
-    protected int socketBufferSize = 64 * 1024;
-    protected int ioBufferSize = 8 * 1024;
-    protected boolean closeAsync = true;
-    protected Socket socket;
-    protected DataOutputStream dataOut;
-    protected DataInputStream dataIn;
-    protected TcpBufferedOutputStream buffOut = null;
-
-    private static final boolean ASYNC_WRITE = false;
-    /**
-     * trace=true -> the Transport stack where this TcpTransport object will be,
-     * will have a TransportLogger layer trace=false -> the Transport stack
-     * where this TcpTransport object will be, will NOT have a TransportLogger
-     * layer, and therefore will never be able to print logging messages. This
-     * parameter is most probably set in Connection or TransportConnector URIs.
-     */
-    protected boolean trace = false;
-    /**
-     * Name of the LogWriter implementation to use. Names are mapped to classes
-     * in the
-     * resources/META-INF/services/org/apache/activemq/transport/logwriters
-     * directory. This parameter is most probably set in Connection or
-     * TransportConnector URIs.
-     */
-    //    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
-    /**
-     * Specifies if the TransportLogger will be manageable by JMX or not. Also,
-     * as long as there is at least 1 TransportLogger which is manageable, a
-     * TransportLoggerControl MBean will me created.
-     */
-    protected boolean dynamicManagement = false;
-    /**
-     * startLogging=true -> the TransportLogger object of the Transport stack
-     * will initially write messages to the log. startLogging=false -> the
-     * TransportLogger object of the Transport stack will initially NOT write
-     * messages to the log. This parameter only has an effect if trace == true.
-     * This parameter is most probably set in Connection or TransportConnector
-     * URIs.
-     */
-    protected boolean startLogging = true;
-    /**
-     * Specifies the port that will be used by the JMX server to manage the
-     * TransportLoggers. This should only be set in an URI by a client (producer
-     * or consumer) since a broker will already create a JMX server. It is
-     * useful for people who test a broker and clients in the same machine and
-     * want to control both via JMX; a different port will be needed.
-     */
-    protected int jmxPort = 1099;
-    protected boolean useLocalHost = true;
-    protected int minmumWireFormatVersion;
-    protected SocketFactory socketFactory;
-    protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
-
+public class TcpTransport implements Transport {
     private Map<String, Object> socketOptions;
-    private Boolean keepAlive;
-    private Boolean tcpNoDelay;
-    private Thread runnerThread;
-
-    protected boolean useActivityMonitor;
-
-    /**
-     * Connect to a remote Node - e.g. a Broker
-     * 
-     * @param wireFormat
-     * @param socketFactory
-     * @param remoteLocation
-     * @param localLocation
-     *            - e.g. local InetAddress and local port
-     * @throws IOException
-     * @throws UnknownHostException
-     */
-    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
-        this.wireFormat = wireFormat;
-        this.socketFactory = socketFactory;
-        try {
-            this.socket = socketFactory.createSocket();
-        } catch (SocketException e) {
-            this.socket = null;
-        }
-        this.remoteLocation = remoteLocation;
-        this.localLocation = localLocation;
-        setDaemon(false);
-    }
-
-    /**
-     * Initialize from a server Socket
-     * 
-     * @param wireFormat
-     * @param socket
-     * @throws IOException
-     */
-    public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
-        this.wireFormat = wireFormat;
-        this.socket = socket;
-        this.remoteLocation = null;
-        this.localLocation = null;
-        setDaemon(true);
-    }
-
-    LinkedBlockingQueue<Object> outbound = new LinkedBlockingQueue<Object>();
-    private Thread onewayThread;
-
-    /**
-     * A one way asynchronous send
-     */
-    public void oneway(Object command) throws IOException {
-        checkStarted();
-        try {
-            if (ASYNC_WRITE) {
-                outbound.put(command);
-            } else {
-                wireFormat.marshal(command, dataOut);
-                dataOut.flush();
-            }
-        } catch (InterruptedException e) {
-            throw new InterruptedIOException();
-        }
-    }
-
-    protected void sendOneways() {
-        try {
-            LOG.debug("Started oneway thead");
-            while (!isStopped()) {
-                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
-                if (command != null) {
-                    try {
-                        // int count=0;
-                        while (command != null) {
-                            wireFormat.marshal(command, dataOut);
-                            // count++;
-                            command = outbound.poll();
-                        }
-                        // System.out.println(count);
-                        dataOut.flush();
-                    } catch (IOException e) {
-                        getTransportListener().onException(e);
-                    }
-                }
-            }
-        } catch (InterruptedException e) {
-        }
-    }
-
-    /**
-     * @return pretty print of 'this'
-     */
-    public String toString() {
-        return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
-    }
-
-    /**
-     * reads packets from a Socket
-     */
-    public void run() {
-        LOG.trace("TCP consumer thread for " + this + " starting");
-        this.runnerThread = Thread.currentThread();
-        try {
-            while (!isStopped()) {
-                doRun();
-            }
-        } catch (IOException e) {
-            stoppedLatch.get().countDown();
-            onException(e);
-        } catch (Throwable e) {
-            stoppedLatch.get().countDown();
-            IOException ioe = new IOException("Unexpected error occured");
-            ioe.initCause(e);
-            onException(ioe);
-        } finally {
-            stoppedLatch.get().countDown();
-        }
-    }
 
-    protected void doRun() throws IOException {
-        try {
-            Object command = readCommand();
-            doConsume(command);
-        } catch (SocketTimeoutException e) {
-        } catch (InterruptedIOException e) {
-        }
+    enum SocketState {
+        CONNECTING,
+        CONNECTED,
+        DISCONNECTED
     }
 
-    protected Object readCommand() throws IOException {
-        return wireFormat.unmarshal(dataIn);
+    enum TransportState {
+        CREATED,
+        RUNNING,
+        DISPOSED
     }
 
-    // Properties
-    // -------------------------------------------------------------------------
+    protected URI remoteLocation;
+    protected URI localLocation;
+    private TransportListener listener;
+    private String remoteAddress;
+    private WireFormat wireformat;
 
-    public boolean isTrace() {
-        return trace;
-    }
+    private SocketChannel channel;
 
-    public void setTrace(boolean trace) {
-        this.trace = trace;
-    }
+    private SocketState socketState = DISCONNECTED;
+    private TransportState transportState = CREATED;
 
-    void setUseInactivityMonitor(boolean val) {
-        useActivityMonitor = val;
-    }
+    private DispatchQueue dispatchQueue;
+    private DispatchSource readSource;
+    private DispatchSource writeSource;
 
-    public boolean isUseInactivityMonitor() {
-        return useActivityMonitor;
-    }
-
-    //    public String getLogWriterName() {
-    //        return logWriterName;
-    //    }
-    //
-    //    public void setLogWriterName(String logFormat) {
-    //        this.logWriterName = logFormat;
-    //    }
-
-    public boolean isDynamicManagement() {
-        return dynamicManagement;
-    }
-
-    public void setDynamicManagement(boolean useJmx) {
-        this.dynamicManagement = useJmx;
-    }
-
-    public boolean isStartLogging() {
-        return startLogging;
-    }
-
-    public void setStartLogging(boolean startLogging) {
-        this.startLogging = startLogging;
-    }
-
-    public int getJmxPort() {
-        return jmxPort;
-    }
-
-    public void setJmxPort(int jmxPort) {
-        this.jmxPort = jmxPort;
-    }
-
-    public int getMinmumWireFormatVersion() {
-        return minmumWireFormatVersion;
-    }
-
-    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
-        this.minmumWireFormatVersion = minmumWireFormatVersion;
-    }
-
-    public boolean isUseLocalHost() {
-        return useLocalHost;
-    }
-
-    /**
-     * Sets whether 'localhost' or the actual local host name should be used to
-     * make local connections. On some operating systems such as Macs its not
-     * possible to connect as the local host name so localhost is better.
-     */
-    public void setUseLocalHost(boolean useLocalHost) {
-        this.useLocalHost = useLocalHost;
-    }
-
-    public int getSocketBufferSize() {
-        return socketBufferSize;
-    }
+    final LinkedList<OneWay> outbound = new LinkedList<OneWay>();
+    int maxOutbound = 1024*32;
+    ByteBuffer outbound_frame;
+    protected boolean useLocalHost = true;
 
-    /**
-     * Sets the buffer size to use on the socket
-     */
-    public void setSocketBufferSize(int socketBufferSize) {
-        this.socketBufferSize = socketBufferSize;
-    }
+    static final class OneWay {
+        final Buffer buffer;
+        final CompletionCallback callback;
 
-    public int getSoTimeout() {
-        return soTimeout;
+        public OneWay(Buffer buffer, CompletionCallback callback) {
+            this.callback = callback;
+            this.buffer = buffer;
+        }
     }
 
-    /**
-     * Sets the socket timeout
-     */
-    public void setSoTimeout(int soTimeout) {
-        this.soTimeout = soTimeout;
+    public void connected(SocketChannel channel) {
+        this.channel = channel;
+        this.socketState = CONNECTED;
     }
 
-    public int getConnectionTimeout() {
-        return connectionTimeout;
+    public void connecting(URI remoteLocation, URI localLocation) throws IOException {
+        this.remoteLocation = remoteLocation;
+        this.localLocation = localLocation;
+        this.socketState = CONNECTING;
     }
 
-    /**
-     * Sets the timeout used to connect to the socket
-     */
-    public void setConnectionTimeout(int connectionTimeout) {
-        this.connectionTimeout = connectionTimeout;
-    }
 
-    public Boolean getKeepAlive() {
-        return keepAlive;
+    public DispatchQueue getDispatchQueue() {
+        return dispatchQueue;
     }
 
-    /**
-     * Enable/disable TCP KEEP_ALIVE mode
-     */
-    public void setKeepAlive(Boolean keepAlive) {
-        this.keepAlive = keepAlive;
+    public void setDispatchQueue(DispatchQueue queue) {
+        if( dispatchQueue!=null ) {
+            dispatchQueue.release();
+        }
+        this.dispatchQueue = queue;
+        if( dispatchQueue!=null ) {
+            dispatchQueue.retain();
+        }
     }
 
-    public Boolean getTcpNoDelay() {
-        return tcpNoDelay;
-    }
+    public void start() throws Exception {
+        if (dispatchQueue == null) {
+            throw new IllegalArgumentException("dispatchQueue is not set");
+        }
+        if (listener == null) {
+            throw new IllegalArgumentException("listener is not set");
+        }
+        if( transportState!=CREATED ) {
+            throw new IllegalStateException("can only be started from the created stae");
+        }
+        transportState=RUNNING;
 
-    /**
-     * Enable/disable the TCP_NODELAY option on the socket
-     */
-    public void setTcpNoDelay(Boolean tcpNoDelay) {
-        this.tcpNoDelay = tcpNoDelay;
-    }
+        if( socketState == CONNECTING ) {
+            channel = SocketChannel.open();
+        }
+        channel.configureBlocking(false);
+        if( socketState == CONNECTING ) {
 
-    /**
-     * @return the ioBufferSize
-     */
-    public int getIoBufferSize() {
-        return this.ioBufferSize;
-    }
+            if (localLocation != null) {
+                InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+                channel.socket().bind(localAddress);
+            }
 
-    /**
-     * @param ioBufferSize
-     *            the ioBufferSize to set
-     */
-    public void setIoBufferSize(int ioBufferSize) {
-        this.ioBufferSize = ioBufferSize;
-    }
+            String host = resolveHostName(remoteLocation.getHost());
+            InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+            channel.connect(remoteAddress);
 
-    /**
-     * @return the closeAsync
-     */
-    public boolean isCloseAsync() {
-        return closeAsync;
+            final DispatchSource connectSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue);
+            connectSource.setEventHandler(new Runnable() {
+                public void run() {
+                    if( transportState==RUNNING ) {
+                        try {
+                            socketState = CONNECTED;
+                            channel.finishConnect();
+                            connectSource.release();
+                            fireConnected();
+                        } catch (IOException e) {
+                            listener.onException(e);
+                        }
+                    }
+                }
+            });
+            connectSource.resume();
+        } else {
+            fireConnected();
+        }
     }
 
-    /**
-     * @param closeAsync
-     *            the closeAsync to set
-     */
-    public void setCloseAsync(boolean closeAsync) {
-        this.closeAsync = closeAsync;
-    }
 
-    // Implementation methods
-    // -------------------------------------------------------------------------
     protected String resolveHostName(String host) throws UnknownHostException {
         String localName = InetAddress.getLocalHost().getHostName();
         if (localName != null && isUseLocalHost()) {
@@ -418,201 +172,759 @@ public class TcpTransport extends Transp
         return host;
     }
 
-    /**
-     * Configures the socket for use
-     * 
-     * @param sock
-     * @throws SocketException
-     */
-    protected void initialiseSocket(Socket sock) throws SocketException {
-        if (socketOptions != null) {
-            IntrospectionSupport.setProperties(socket, socketOptions);
-        }
+    private void fireConnected() {
 
         try {
-            sock.setReceiveBufferSize(socketBufferSize);
-            sock.setSendBufferSize(socketBufferSize);
-        } catch (SocketException se) {
-            LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
-            LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
+            channel.socket().setSendBufferSize(maxOutbound);
+            channel.socket().setReceiveBufferSize(maxOutbound);
+        } catch (SocketException e) {
         }
-        sock.setSoTimeout(soTimeout);
 
-        if (keepAlive != null) {
-            sock.setKeepAlive(keepAlive.booleanValue());
-        }
-        if (tcpNoDelay != null) {
-            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
-        }
-    }
+        readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
+        readSource.setEventHandler(new Runnable(){
+            public void run() {
+                drainInbound();
+            }
+        });
 
-    protected void doStart() throws Exception {
-        connect();
-        if (ASYNC_WRITE) {
-            onewayThread = new Thread() {
-                @Override
-                public void run() {
-                    sendOneways();
+        writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
+        writeSource.setEventHandler(new Runnable(){
+            public void run() {
+                if( transportState==RUNNING ) {
+                    // once the outbound is drained.. we can suspend getting
+                    // write events.
+                    if( drainOutbound() ) {
+                        writeSource.suspend();
+                    }
                 }
-            };
-            onewayThread.start();
-        }
+            }
+        });
 
-        stoppedLatch.set(new CountDownLatch(1));
-        super.doStart();
+        remoteAddress = channel.socket().getRemoteSocketAddress().toString();
+        listener.onConnected();
+        readSource.resume();
     }
 
-    protected void connect() throws Exception {
-
-        if (socket == null && socketFactory == null) {
-            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
-        }
-
-        InetSocketAddress localAddress = null;
-        InetSocketAddress remoteAddress = null;
 
-        if (localLocation != null) {
-            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+    public void stop() throws Exception {
+        if( readSource!=null ) {
+            readSource.release();
+            readSource = null;
         }
-
-        if (remoteLocation != null) {
-            String host = resolveHostName(remoteLocation.getHost());
-            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+        if( writeSource!=null ) {
+            writeSource.release();
+            writeSource = null;
         }
+        setDispatchQueue(null);
+        transportState=DISPOSED;
+    }
 
-        if (socket != null) {
+    @Deprecated
+    public void oneway(Object command) {
+        oneway(command, null);
+    }
 
-            if (localAddress != null) {
-                socket.bind(localAddress);
+    public void oneway(Object command, CompletionCallback callback) {
+        try {
+            if( socketState != CONNECTED ) {
+                throw new IllegalStateException("Not connected.");
             }
-
-            // If it's a server accepted socket.. we don't need to connect it
-            // to a remote address.
-            if (remoteAddress != null) {
-                if (connectionTimeout >= 0) {
-                    socket.connect(remoteAddress, connectionTimeout);
-                } else {
-                    socket.connect(remoteAddress);
-                }
+        } catch (IllegalStateException e) {
+            if( callback!=null ) {
+                callback.onFailure(e);
             }
+        }
 
-        } else {
-            // For SSL sockets.. you can't create an unconnected socket :(
-            // This means the timout option are not supported either.
-            if (localAddress != null) {
-                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
-            } else {
-                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
-            }
+        // Marshall the command.
+        Buffer buffer = null;
+        try {
+            buffer = wireformat.marshal(command);
+        } catch (IOException e) {
+            callback.onFailure(e);
+            return;
         }
 
-        initialiseSocket(socket);
-        initializeStreams();
-    }
+        outbound.add(new OneWay(buffer, callback));
 
-    protected void doStop(ServiceStopper stopper) throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Stopping transport " + this);
+        // wait for write ready events if this write
+        // cannot be drained.
+        if( outbound.size()==1 && !drainOutbound() ) {
+            writeSource.resume();
         }
+    }
 
-        // Closing the streams flush the sockets before closing.. if the socket
-        // is hung.. then this hangs the close.
-        // closeStreams();
-        if (socket != null) {
-            if (closeAsync) {
-                // closing the socket can hang also
-                final CountDownLatch latch = new CountDownLatch(1);
+    /**
+    * @retruns true if the outbound has been drained of all objects and there are no in progress writes.
+    */
+    private boolean drainOutbound() {
+        try {
+            
+            while(socketState == CONNECTED) {
+                
+              // if we have a pending write that is being sent over the socket...
+              if( outbound_frame!=null ) {
+
+                channel.write(outbound_frame);
+                if( outbound_frame.remaining() != 0 ) {
+                  return false;
+                } else {
+                  outbound_frame = null;
+                }
 
-                SOCKET_CLOSE.execute(new Runnable() {
+              } else {
 
-                    public void run() {
-                        try {
-                            socket.close();
-                        } catch (IOException e) {
-                            LOG.debug("Caught exception closing socket", e);
-                        } finally {
-                            latch.countDown();
-                        }
+                // marshall all the available frames..
+                ByteArrayOutputStream buffer = new ByteArrayOutputStream(maxOutbound << 2);
+                OneWay oneWay = outbound.poll();
+
+                while( oneWay!=null) {
+                    buffer.write(oneWay.buffer);
+                    if( oneWay.callback!=null ) {
+                        oneWay.callback.onCompletion();
                     }
+                    if( buffer.size() < maxOutbound ) {
+                        oneWay = outbound.poll();
+                    } else {
+                        oneWay = null;
+                    }
+                }
+
 
-                });
-                latch.await(1, TimeUnit.SECONDS);
-            } else {
-                try {
-                    socket.close();
-                } catch (IOException e) {
-                    LOG.debug("Caught exception closing socket", e);
+                if( buffer.size()==0 ) {
+                  // the source is now drained...
+                  return true;
+                } else {
+                  outbound_frame = buffer.toBuffer().toByteBuffer();
                 }
+              }
 
             }
-            if (ASYNC_WRITE) {
-                onewayThread.join();
-            }
+
+        } catch (IOException e) {
+            listener.onException(e);
         }
+        
+        return outbound.isEmpty() && outbound_frame==null;
     }
 
-    /**
-     * Override so that stop() blocks until the run thread is no longer running.
-     */
-    @Override
-    public void stop() throws Exception {
-        super.stop();
-        CountDownLatch countDownLatch = stoppedLatch.get();
-        if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
-            countDownLatch.await(1, TimeUnit.SECONDS);
+    private void drainInbound() {
+        Object command = null;
+        // the transport may be suspended after processing a command.
+        while( !readSource.isSuspended() && (command=wireformat.unmarshal(channel))!=null ) {
+            listener.onCommand(command);
         }
     }
 
-    protected void initializeStreams() throws Exception {
-        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
-        this.dataIn = new DataInputStream(buffIn);
-        buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
-        this.dataOut = new DataOutputStream(buffOut);
+
+    public String getRemoteAddress() {
+        return remoteAddress;
     }
 
-    protected void closeStreams() throws IOException {
-        if (dataOut != null) {
-            dataOut.close();
-        }
-        if (dataIn != null) {
-            dataIn.close();
+    public <T> T narrow(Class<T> target) {
+        if (target.isAssignableFrom(getClass())) {
+            return target.cast(this);
         }
+        return null;
     }
 
-    public void setSocketOptions(Map<String, Object> socketOptions) {
-        this.socketOptions = new HashMap<String, Object>(socketOptions);
+    public void suspend() {
+        readSource.suspend();
     }
 
-    public String getRemoteAddress() {
-        if (socket != null) {
-            return "" + socket.getRemoteSocketAddress();
-        }
-        return null;
+    public void resume() {
+        readSource.resume();
+    }
+    
+    public void reconnect(URI uri, CompletionCallback callback) {
+        throw new UnsupportedOperationException();
     }
 
-    @Override
-    public <T> T narrow(Class<T> target) {
-        if (target == Socket.class) {
-            return target.cast(socket);
-        } else if (target == TcpBufferedOutputStream.class) {
-            return target.cast(buffOut);
-        }
-        return super.narrow(target);
+    public TransportListener getTransportListener() {
+        return listener;
+    }
+    public void setTransportListener(TransportListener listener) {
+        this.listener = listener;
     }
 
-    static {
-        SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "TcpSocketClose: " + runnable);
-                thread.setPriority(Thread.MAX_PRIORITY);
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
+    public WireFormat getWireformat() {
+        return wireformat;
+    }
+    public void setWireformat(WireFormat wireformat) {
+        this.wireformat = wireformat;
+    }
+
+    public boolean isConnected() {
+        return socketState == CONNECTED;
+    }
+
+    public boolean isDisposed() {
+        return transportState == DISPOSED;
+    }
+    public boolean isFaultTolerant() {
+        return false;
+    }
+
+    public void setSocketOptions(Map<String, Object> socketOptions) {
+        this.socketOptions = socketOptions;
     }
 
-    public WireFormat getWireformat()
-    {
-        return wireFormat;
+//    private static final Log LOG = LogFactory.getLog(TcpTransport.class);
+//    private static final ThreadPoolExecutor SOCKET_CLOSE;
+//    protected final URI remoteLocation;
+//    protected final URI localLocation;
+//    protected final WireFormat wireFormat;
+//
+//    protected int connectionTimeout = 30000;
+//    protected int soTimeout;
+//    protected int socketBufferSize = 64 * 1024;
+//    protected int ioBufferSize = 8 * 1024;
+//    protected boolean closeAsync = true;
+//    protected Socket socket;
+//    protected DataOutputStream dataOut;
+//    protected DataInputStream dataIn;
+//    protected TcpBufferedOutputStream buffOut = null;
+//
+//    private static final boolean ASYNC_WRITE = false;
+//    /**
+//     * trace=true -> the Transport stack where this TcpTransport object will be,
+//     * will have a TransportLogger layer trace=false -> the Transport stack
+//     * where this TcpTransport object will be, will NOT have a TransportLogger
+//     * layer, and therefore will never be able to print logging messages. This
+//     * parameter is most probably set in Connection or TransportConnector URIs.
+//     */
+//    protected boolean trace = false;
+//    /**
+//     * Name of the LogWriter implementation to use. Names are mapped to classes
+//     * in the
+//     * resources/META-INF/services/org/apache/activemq/transport/logwriters
+//     * directory. This parameter is most probably set in Connection or
+//     * TransportConnector URIs.
+//     */
+//    //    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+//    /**
+//     * Specifies if the TransportLogger will be manageable by JMX or not. Also,
+//     * as long as there is at least 1 TransportLogger which is manageable, a
+//     * TransportLoggerControl MBean will me created.
+//     */
+//    protected boolean dynamicManagement = false;
+//    /**
+//     * startLogging=true -> the TransportLogger object of the Transport stack
+//     * will initially write messages to the log. startLogging=false -> the
+//     * TransportLogger object of the Transport stack will initially NOT write
+//     * messages to the log. This parameter only has an effect if trace == true.
+//     * This parameter is most probably set in Connection or TransportConnector
+//     * URIs.
+//     */
+//    protected boolean startLogging = true;
+//    /**
+//     * Specifies the port that will be used by the JMX server to manage the
+//     * TransportLoggers. This should only be set in an URI by a client (producer
+//     * or consumer) since a broker will already create a JMX server. It is
+//     * useful for people who test a broker and clients in the same machine and
+//     * want to control both via JMX; a different port will be needed.
+//     */
+//    protected int jmxPort = 1099;
+//    protected int minmumWireFormatVersion;
+//    protected SocketFactory socketFactory;
+//    protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
+//
+//    private Map<String, Object> socketOptions;
+//    private Boolean keepAlive;
+//    private Boolean tcpNoDelay;
+//    private Thread runnerThread;
+//
+//    protected boolean useActivityMonitor;
+//
+//    /**
+//     * Connect to a remote Node - e.g. a Broker
+//     *
+//     * @param wireFormat
+//     * @param socketFactory
+//     * @param remoteLocation
+//     * @param localLocation
+//     *            - e.g. local InetAddress and local port
+//     * @throws IOException
+//     * @throws UnknownHostException
+//     */
+//    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+//        this.wireFormat = wireFormat;
+//        this.socketFactory = socketFactory;
+//        try {
+//            this.socket = socketFactory.createSocket();
+//        } catch (SocketException e) {
+//            this.socket = null;
+//        }
+//        this.remoteLocation = remoteLocation;
+//        this.localLocation = localLocation;
+//        setDaemon(false);
+//    }
+//
+//    /**
+//     * Initialize from a server Socket
+//     *
+//     * @param wireFormat
+//     * @param socket
+//     * @throws IOException
+//     */
+//    public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
+//        this.wireFormat = wireFormat;
+//        this.socket = socket;
+//        this.remoteLocation = null;
+//        this.localLocation = null;
+//        setDaemon(true);
+//    }
+//
+//    LinkedBlockingQueue<Object> outbound = new LinkedBlockingQueue<Object>();
+//    private Thread onewayThread;
+//
+//    /**
+//     * A one way asynchronous send
+//     */
+//    public void oneway(Object command) throws IOException {
+//        checkStarted();
+//        try {
+//            if (ASYNC_WRITE) {
+//                outbound.put(command);
+//            } else {
+//                wireFormat.marshal(command, dataOut);
+//                dataOut.flush();
+//            }
+//        } catch (InterruptedException e) {
+//            throw new InterruptedIOException();
+//        }
+//    }
+//
+//    protected void sendOneways() {
+//        try {
+//            LOG.debug("Started oneway thead");
+//            while (!isStopped()) {
+//                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
+//                if (command != null) {
+//                    try {
+//                        // int count=0;
+//                        while (command != null) {
+//                            wireFormat.marshal(command, dataOut);
+//                            // count++;
+//                            command = outbound.poll();
+//                        }
+//                        // System.out.println(count);
+//                        dataOut.flush();
+//                    } catch (IOException e) {
+//                        getTransportListener().onException(e);
+//                    }
+//                }
+//            }
+//        } catch (InterruptedException e) {
+//        }
+//    }
+//
+//    /**
+//     * @return pretty print of 'this'
+//     */
+//    public String toString() {
+//        return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
+//    }
+//
+//    /**
+//     * reads packets from a Socket
+//     */
+//    public void run() {
+//        LOG.trace("TCP consumer thread for " + this + " starting");
+//        this.runnerThread = Thread.currentThread();
+//        try {
+//            while (!isStopped()) {
+//                doRun();
+//            }
+//        } catch (IOException e) {
+//            stoppedLatch.get().countDown();
+//            onException(e);
+//        } catch (Throwable e) {
+//            stoppedLatch.get().countDown();
+//            IOException ioe = new IOException("Unexpected error occured");
+//            ioe.initCause(e);
+//            onException(ioe);
+//        } finally {
+//            stoppedLatch.get().countDown();
+//        }
+//    }
+//
+//    protected void doRun() throws IOException {
+//        try {
+//            Object command = readCommand();
+//            doConsume(command);
+//        } catch (SocketTimeoutException e) {
+//        } catch (InterruptedIOException e) {
+//        }
+//    }
+//
+//    protected Object readCommand() throws IOException {
+//        return wireFormat.unmarshal(dataIn);
+//    }
+//
+//    // Properties
+//    // -------------------------------------------------------------------------
+//
+//    public boolean isTrace() {
+//        return trace;
+//    }
+//
+//    public void setTrace(boolean trace) {
+//        this.trace = trace;
+//    }
+//
+//    void setUseInactivityMonitor(boolean val) {
+//        useActivityMonitor = val;
+//    }
+//
+//    public boolean isUseInactivityMonitor() {
+//        return useActivityMonitor;
+//    }
+//
+//    //    public String getLogWriterName() {
+//    //        return logWriterName;
+//    //    }
+//    //
+//    //    public void setLogWriterName(String logFormat) {
+//    //        this.logWriterName = logFormat;
+//    //    }
+//
+//    public boolean isDynamicManagement() {
+//        return dynamicManagement;
+//    }
+//
+//    public void setDynamicManagement(boolean useJmx) {
+//        this.dynamicManagement = useJmx;
+//    }
+//
+//    public boolean isStartLogging() {
+//        return startLogging;
+//    }
+//
+//    public void setStartLogging(boolean startLogging) {
+//        this.startLogging = startLogging;
+//    }
+//
+//    public int getJmxPort() {
+//        return jmxPort;
+//    }
+//
+//    public void setJmxPort(int jmxPort) {
+//        this.jmxPort = jmxPort;
+//    }
+//
+//    public int getMinmumWireFormatVersion() {
+//        return minmumWireFormatVersion;
+//    }
+//
+//    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
+//        this.minmumWireFormatVersion = minmumWireFormatVersion;
+//    }
+//
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    /**
+     * Sets whether 'localhost' or the actual local host name should be used to
+     * make local connections. On some operating systems such as Macs its not
+     * possible to connect as the local host name so localhost is better.
+     */
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
     }
+
+//    public int getSocketBufferSize() {
+//        return socketBufferSize;
+//    }
+//
+//    /**
+//     * Sets the buffer size to use on the socket
+//     */
+//    public void setSocketBufferSize(int socketBufferSize) {
+//        this.socketBufferSize = socketBufferSize;
+//    }
+//
+//    public int getSoTimeout() {
+//        return soTimeout;
+//    }
+//
+//    /**
+//     * Sets the socket timeout
+//     */
+//    public void setSoTimeout(int soTimeout) {
+//        this.soTimeout = soTimeout;
+//    }
+//
+//    public int getConnectionTimeout() {
+//        return connectionTimeout;
+//    }
+//
+//    /**
+//     * Sets the timeout used to connect to the socket
+//     */
+//    public void setConnectionTimeout(int connectionTimeout) {
+//        this.connectionTimeout = connectionTimeout;
+//    }
+//
+//    public Boolean getKeepAlive() {
+//        return keepAlive;
+//    }
+//
+//    /**
+//     * Enable/disable TCP KEEP_ALIVE mode
+//     */
+//    public void setKeepAlive(Boolean keepAlive) {
+//        this.keepAlive = keepAlive;
+//    }
+//
+//    public Boolean getTcpNoDelay() {
+//        return tcpNoDelay;
+//    }
+//
+//    /**
+//     * Enable/disable the TCP_NODELAY option on the socket
+//     */
+//    public void setTcpNoDelay(Boolean tcpNoDelay) {
+//        this.tcpNoDelay = tcpNoDelay;
+//    }
+//
+//    /**
+//     * @return the ioBufferSize
+//     */
+//    public int getIoBufferSize() {
+//        return this.ioBufferSize;
+//    }
+//
+//    /**
+//     * @param ioBufferSize
+//     *            the ioBufferSize to set
+//     */
+//    public void setIoBufferSize(int ioBufferSize) {
+//        this.ioBufferSize = ioBufferSize;
+//    }
+//
+//    /**
+//     * @return the closeAsync
+//     */
+//    public boolean isCloseAsync() {
+//        return closeAsync;
+//    }
+//
+//    /**
+//     * @param closeAsync
+//     *            the closeAsync to set
+//     */
+//    public void setCloseAsync(boolean closeAsync) {
+//        this.closeAsync = closeAsync;
+//    }
+//
+//    // Implementation methods
+//    // -------------------------------------------------------------------------
+//    protected String resolveHostName(String host) throws UnknownHostException {
+//        String localName = InetAddress.getLocalHost().getHostName();
+//        if (localName != null && isUseLocalHost()) {
+//            if (localName.equals(host)) {
+//                return "localhost";
+//            }
+//        }
+//        return host;
+//    }
+//
+//    /**
+//     * Configures the socket for use
+//     *
+//     * @param sock
+//     * @throws SocketException
+//     */
+//    protected void initialiseSocket(Socket sock) throws SocketException {
+//        if (socketOptions != null) {
+//            IntrospectionSupport.setProperties(socket, socketOptions);
+//        }
+//
+//        try {
+//            sock.setReceiveBufferSize(socketBufferSize);
+//            sock.setSendBufferSize(socketBufferSize);
+//        } catch (SocketException se) {
+//            LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
+//            LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
+//        }
+//        sock.setSoTimeout(soTimeout);
+//
+//        if (keepAlive != null) {
+//            sock.setKeepAlive(keepAlive.booleanValue());
+//        }
+//        if (tcpNoDelay != null) {
+//            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
+//        }
+//    }
+//
+//    protected void doStart() throws Exception {
+//        connect();
+//        if (ASYNC_WRITE) {
+//            onewayThread = new Thread() {
+//                @Override
+//                public void run() {
+//                    sendOneways();
+//                }
+//            };
+//            onewayThread.start();
+//        }
+//
+//        stoppedLatch.set(new CountDownLatch(1));
+//        super.doStart();
+//    }
+//
+//    protected void connect() throws Exception {
+//
+//        if (socket == null && socketFactory == null) {
+//            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+//        }
+//
+//        InetSocketAddress localAddress = null;
+//        InetSocketAddress remoteAddress = null;
+//
+//        if (localLocation != null) {
+//            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
+//        }
+//
+//        if (remoteLocation != null) {
+//            String host = resolveHostName(remoteLocation.getHost());
+//            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+//        }
+//
+//        if (socket != null) {
+//
+//            if (localAddress != null) {
+//                socket.bind(localAddress);
+//            }
+//
+//            // If it's a server accepted socket.. we don't need to connect it
+//            // to a remote address.
+//            if (remoteAddress != null) {
+//                if (connectionTimeout >= 0) {
+//                    socket.connect(remoteAddress, connectionTimeout);
+//                } else {
+//                    socket.connect(remoteAddress);
+//                }
+//            }
+//
+//        } else {
+//            // For SSL sockets.. you can't create an unconnected socket :(
+//            // This means the timout option are not supported either.
+//            if (localAddress != null) {
+//                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
+//            } else {
+//                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
+//            }
+//        }
+//
+//        initialiseSocket(socket);
+//        initializeStreams();
+//    }
+//
+//    protected void doStop(ServiceStopper stopper) throws Exception {
+//        if (LOG.isDebugEnabled()) {
+//            LOG.debug("Stopping transport " + this);
+//        }
+//
+//        // Closing the streams flush the sockets before closing.. if the socket
+//        // is hung.. then this hangs the close.
+//        // closeStreams();
+//        if (socket != null) {
+//            if (closeAsync) {
+//                // closing the socket can hang also
+//                final CountDownLatch latch = new CountDownLatch(1);
+//
+//                SOCKET_CLOSE.execute(new Runnable() {
+//
+//                    public void run() {
+//                        try {
+//                            socket.close();
+//                        } catch (IOException e) {
+//                            LOG.debug("Caught exception closing socket", e);
+//                        } finally {
+//                            latch.countDown();
+//                        }
+//                    }
+//
+//                });
+//                latch.await(1, TimeUnit.SECONDS);
+//            } else {
+//                try {
+//                    socket.close();
+//                } catch (IOException e) {
+//                    LOG.debug("Caught exception closing socket", e);
+//                }
+//
+//            }
+//            if (ASYNC_WRITE) {
+//                onewayThread.join();
+//            }
+//        }
+//    }
+//
+//    /**
+//     * Override so that stop() blocks until the run thread is no longer running.
+//     */
+//    @Override
+//    public void stop() throws Exception {
+//        super.stop();
+//        CountDownLatch countDownLatch = stoppedLatch.get();
+//        if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
+//            countDownLatch.await(1, TimeUnit.SECONDS);
+//        }
+//    }
+//
+//    protected void initializeStreams() throws Exception {
+//        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+//        this.dataIn = new DataInputStream(buffIn);
+//        buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+//        this.dataOut = new DataOutputStream(buffOut);
+//    }
+//
+//    protected void closeStreams() throws IOException {
+//        if (dataOut != null) {
+//            dataOut.close();
+//        }
+//        if (dataIn != null) {
+//            dataIn.close();
+//        }
+//    }
+//
+//    public void setSocketOptions(Map<String, Object> socketOptions) {
+//        this.socketOptions = new HashMap<String, Object>(socketOptions);
+//    }
+//
+//    public String getRemoteAddress() {
+//        if (socket != null) {
+//            return "" + socket.getRemoteSocketAddress();
+//        }
+//        return null;
+//    }
+//
+//    @Override
+//    public <T> T narrow(Class<T> target) {
+//        if (target == Socket.class) {
+//            return target.cast(socket);
+//        } else if (target == TcpBufferedOutputStream.class) {
+//            return target.cast(buffOut);
+//        }
+//        return super.narrow(target);
+//    }
+//
+//    static {
+//        SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+//            public Thread newThread(Runnable runnable) {
+//                Thread thread = new Thread(runnable, "TcpSocketClose: " + runnable);
+//                thread.setPriority(Thread.MAX_PRIORITY);
+//                thread.setDaemon(true);
+//                return thread;
+//            }
+//        });
+//    }
+//
+//    public WireFormat getWireformat()
+//    {
+//        return wireFormat;
+//    }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Jul  7 03:24:02 2010
@@ -29,6 +29,7 @@ import javax.net.SocketFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 //import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.TransportFactorySupport;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -37,87 +38,50 @@ import org.apache.activemq.wireformat.Wi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import static org.apache.activemq.transport.TransportFactorySupport.configure;
+import static org.apache.activemq.transport.TransportFactorySupport.verify;
+
 /**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
  */
-public class TcpTransportFactory extends TransportFactory {
+public class TcpTransportFactory implements TransportFactory.TransportFactorySPI {
     private static final Log LOG = LogFactory.getLog(TcpTransportFactory.class);
 
-    public TransportServer doBind(final URI location) throws IOException {
-        try {
-            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
-
-            ServerSocketFactory serverSocketFactory = createServerSocketFactory();
-            TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
-            server.setWireFormatFactory(createWireFormatFactory(options));
-            IntrospectionSupport.setProperties(server, options);
-            Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
-            server.setTransportOption(transportOptions);
-            server.bind();
-
-            return server;
-        } catch (URISyntaxException e) {
-            throw IOExceptionSupport.create(e);
-        }
+    public TransportServer bind(URI location) throws Exception {
+        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+        TcpTransportServer server = createTcpTransportServer(location);
+        server.setWireFormatFactory(TransportFactorySupport.createWireFormatFactory(options));
+        IntrospectionSupport.setProperties(server, options);
+        Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
+        server.setTransportOption(transportOptions);
+        return server;
     }
 
     /**
      * Allows subclasses of TcpTransportFactory to create custom instances of
      * TcpTransportServer.
-     * 
-     * @param location
-     * @param serverSocketFactory
-     * @return
-     * @throws IOException
-     * @throws URISyntaxException
      */
-    protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
-        return new TcpTransportServer(this, location, serverSocketFactory);
+    protected TcpTransportServer createTcpTransportServer(final URI location) throws IOException, URISyntaxException {
+        return new TcpTransportServer(location);
     }
 
-    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
 
-        TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
-        IntrospectionSupport.setProperties(tcpTransport, options);
+    public Transport connect(URI location) throws Exception {
+        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+        URI localLocation = getLocalLocation(location);
 
-        Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
-        tcpTransport.setSocketOptions(socketOptions);
-        
-        if (tcpTransport.isTrace()) {
-            throw new UnsupportedOperationException("Trace not implemented");
-//            try {
-//                transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, tcpTransport.getLogWriterName(),
-//                        tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort());
-//            } catch (Throwable e) {
-//                LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e);
-//            }
-        }
-        
-        boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
-        tcpTransport.setUseInactivityMonitor(useInactivityMonitor && isUseInactivityMonitor(transport));
-        
-
-        transport = format.createTransportFilters(transport, options);
-        
-        return transport;
-    }
+        TcpTransport transport = new TcpTransport();
+        transport.connecting(location, localLocation);
 
-    protected String getOption(Map options, String key, String def) {
-        String rc = (String) options.remove(key);
-        if( rc == null ) {
-            rc = def;
-        }
-        return rc;
-    }
+        Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
+        transport.setSocketOptions(socketOptions);
 
-    /**
-     * Returns true if the inactivity monitor should be used on the transport
-     */
-    protected boolean isUseInactivityMonitor(Transport transport) {
-        return true;
+        configure(transport, options);
+        return verify(transport, options);
     }
 
-    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+    private URI getLocalLocation(URI location) {
         URI localLocation = null;
         String path = location.getPath();
         // see if the path is a local URI location
@@ -131,31 +95,15 @@ public class TcpTransportFactory extends
                 LOG.warn("path isn't a valid local location for TcpTransport to use", e);
             }
         }
-        SocketFactory socketFactory = createSocketFactory();
-        return createTcpTransport(wf, socketFactory, location, localLocation);
+        return localLocation;
     }
 
-    /**
-     * Allows subclasses of TcpTransportFactory to provide a create custom
-     * TcpTransport intances.
-     * 
-     * @param location
-     * @param wf
-     * @param socketFactory
-     * @param localLocation
-     * @return
-     * @throws UnknownHostException
-     * @throws IOException
-     */
-    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
-        return new TcpTransport(wf, socketFactory, location, localLocation);
-    }
-
-    protected ServerSocketFactory createServerSocketFactory() throws IOException {
-        return ServerSocketFactory.getDefault();
+    protected String getOption(Map options, String key, String def) {
+        String rc = (String) options.remove(key);
+        if( rc == null ) {
+            rc = def;
+        }
+        return rc;
     }
 
-    protected SocketFactory createSocketFactory() throws IOException {
-        return SocketFactory.getDefault();
-    }
 }



Mime
View raw message