activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1033933 - in /activemq/activemq-apollo/trunk: apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/ apollo-tcp/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-transport/src/main/java/org/apache/activemq...
Date Thu, 11 Nov 2010 13:30:53 GMT
Author: chirino
Date: Thu Nov 11 13:30:53 2010
New Revision: 1033933

URL: http://svn.apache.org/viewvc?rev=1033933&view=rev
Log:
Added a tls/ssl transport.

Added:
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransportFactory.java
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransportServer.java
    activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/KeyManagerAware.java
      - copied, changed from r1033932, activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java
    activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/SSLContextAware.java
      - copied, changed from r1033932, activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java
    activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TrustManagerAware.java
      - copied, changed from r1033932, activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java
Modified:
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
    activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java

Added: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java?rev=1033933&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
(added)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
Thu Nov 11 13:30:53 2010
@@ -0,0 +1,288 @@
+package org.apache.activemq.apollo.transport.tcp;
+
+import org.apache.activemq.apollo.transport.SSLContextAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.*;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_WRAP;
+import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
+
+/**
+ * An SSL Transport for secure communications.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class SslTransport extends TcpTransport implements SSLContextAware {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
+    private SSLContext sslContext;
+    private SSLEngine engine;
+
+    private ByteBuffer readBuffer;
+    private boolean readUnderflow;
+
+    private ByteBuffer writeBuffer;
+    private boolean writeFlushing;
+
+    private ByteBuffer readOverflowBuffer;
+
+    private ExecutorService blockingExecutor;
+
+    public void setSSLContext(SSLContext ctx) {
+        this.sslContext = ctx;
+    }
+
+    class SSLChannel implements ReadableByteChannel, WritableByteChannel {
+
+        public int write(ByteBuffer plain) throws IOException {
+            return secure_write(plain);
+        }
+
+        public int read(ByteBuffer plain) throws IOException {
+            return secure_read(plain);
+        }
+
+        public boolean isOpen() {
+            return channel.isOpen();
+        }
+
+        public void close() throws IOException {
+            channel.close();
+        }
+    }
+
+    @Override
+    protected void initializeCodec() {
+        SSLChannel channel = new SSLChannel();
+        codec.setReadableByteChannel(channel);
+        codec.setWritableByteChannel(channel);
+    }
+
+
+    @Override
+    public void connecting(URI remoteLocation, URI localLocation) throws Exception {
+        assert engine == null;
+        engine = sslContext.createSSLEngine();
+        engine.setUseClientMode(true);
+        super.connecting(remoteLocation, localLocation);
+    }
+
+    @Override
+    public void connected(SocketChannel channel) throws Exception {
+        if (engine == null) {
+            engine = sslContext.createSSLEngine();
+            engine.setUseClientMode(false);
+        }
+        SSLSession session = engine.getSession();
+        readBuffer = ByteBuffer.allocateDirect(session.getPacketBufferSize());
+        readBuffer.flip();
+        writeBuffer = ByteBuffer.allocateDirect(session.getPacketBufferSize());
+
+        super.connected(channel);
+
+
+    }
+
+    @Override
+    protected void onConnected() throws IOException {
+        super.onConnected();
+        engine.setWantClientAuth(true);
+        engine.beginHandshake();
+        handshake_done();
+    }
+
+    @Override
+    protected void drainOutbound() {
+        if ( handshake_done() ) {
+            super.drainOutbound();
+        }
+    }
+
+    @Override
+    protected void drainInbound() {
+        if ( handshake_done() ) {
+            super.drainInbound();
+        }
+    }
+
+    /**
+     * @return true if fully flushed.
+     * @throws IOException
+     */
+    protected boolean flush() throws IOException {
+        while (true) {
+            if(writeFlushing) {
+                channel.write(writeBuffer);
+                if( !writeBuffer.hasRemaining() ) {
+                    writeBuffer.clear();
+                    writeFlushing = false;
+                    return true;
+                } else {
+                    return false;
+                }
+            } else {
+                if( writeBuffer.position()!=0 ) {
+                    writeBuffer.flip();
+                    writeFlushing = true;
+                } else {
+                    return true;
+                }
+            }
+        }
+    }
+
+    private int secure_write(ByteBuffer plain) throws IOException {
+        if( !flush() ) {
+            // can't write anymore until the write_secured_buffer gets fully flushed out..
+            return 0;
+        }
+        int rc = 0;
+        while ( plain.hasRemaining() || engine.getHandshakeStatus()==NEED_WRAP ) {
+            SSLEngineResult result = engine.wrap(plain, writeBuffer);
+            assert result.getStatus()!= BUFFER_OVERFLOW;
+            rc += result.bytesConsumed();
+            if( !flush() ) {
+                break;
+            }
+        }
+        return rc;
+    }
+
+    private int secure_read(ByteBuffer plain) throws IOException {
+        int rc=0;
+        while ( plain.hasRemaining() || engine.getHandshakeStatus() == NEED_UNWRAP ) {
+            if( readOverflowBuffer !=null ) {
+                // lets drain the overflow buffer before trying to suck down anymore
+                // network bytes.
+                int size = Math.min(plain.remaining(), readOverflowBuffer.remaining());
+                plain.put(readOverflowBuffer.array(), 0, size);
+                if( !readOverflowBuffer.hasRemaining() ) {
+                    readOverflowBuffer = null;
+                }
+                rc += size;
+            } else if( readUnderflow ) {
+                int count = channel.read(readBuffer);
+                if( count == -1 ) {  // peer closed socket.
+                    if (rc==0) {
+                        engine.closeInbound();
+                        return -1;
+                    } else {
+                        return rc;
+                    }
+                }
+                if( count==0 ) {  // no data available right now.
+                    return rc;
+                }
+                // read in some more data, perhaps now we can unwrap.
+                readUnderflow = false;
+                readBuffer.flip();
+            } else {
+                SSLEngineResult result = engine.unwrap(readBuffer, plain);
+                rc += result.bytesProduced();
+                if( result.getStatus() == BUFFER_OVERFLOW ) {
+                    readOverflowBuffer = ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
+                    result = engine.unwrap(readBuffer, readOverflowBuffer);
+                    readOverflowBuffer.flip();
+                }
+                switch( result.getStatus() ) {
+                    case CLOSED:
+                        if (rc==0) {
+                            engine.closeInbound();
+                            return -1;
+                        } else {
+                            return rc;
+                        }
+                    case OK:
+                        break;
+                    case BUFFER_UNDERFLOW:
+                        readBuffer.compact();
+                        readUnderflow = true;
+                        break;
+                    case BUFFER_OVERFLOW:
+                        throw new AssertionError("Unexpected case.");
+                }
+            }
+        }
+        return rc;
+    }
+
+    public boolean handshake_done() {
+        while (true) {
+            switch (engine.getHandshakeStatus()) {
+                case NEED_TASK:
+                    final Runnable task = engine.getDelegatedTask();
+                    if( task!=null ) {
+                        if( blockingExecutor==null ) {
+                             blockingExecutor = Executors.newSingleThreadExecutor();
+                        }
+                        blockingExecutor.execute(new Runnable(){
+                            public void run() {
+                                task.run();
+                                dispatchQueue.execute(new Runnable(){
+                                    public void run() {
+                                        if( isConnected() ) {
+                                            handshake_done();
+                                        }
+                                    }
+                                });
+                            }
+                        });
+                        return false;
+                    }
+                    break;
+
+                case NEED_WRAP:
+                    try {
+                        secure_write(ByteBuffer.allocate(0));
+                        if( writeFlushing && writeSource.isSuspended() ) {
+                            writeSource.resume();
+                            return false;
+                        }
+                    } catch(IOException e) {
+                        onTransportFailure(e);
+                    }
+                    break;
+
+                case NEED_UNWRAP:
+                    try {
+                        secure_read(ByteBuffer.allocate(0));
+                        if( readUnderflow && readSource.isSuspended() ) {
+                            readSource.resume();
+                            return false;
+                        }
+                    } catch(IOException e) {
+                        onTransportFailure(e);
+                        return true;
+                    }
+                    break;
+
+                case FINISHED:
+                    if( blockingExecutor!=null ) {
+                        blockingExecutor.shutdown();
+                        blockingExecutor = null;
+                    }
+
+                case NOT_HANDSHAKING:
+                    return true;
+
+                default:
+                    SSLEngineResult.HandshakeStatus status = engine.getHandshakeStatus();
+                    System.out.println("Unexpected ssl engine handshake status: "+ status);
+            }
+        }
+    }
+
+}
+
+

Added: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransportFactory.java?rev=1033933&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransportFactory.java
(added)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransportFactory.java
Thu Nov 11 13:30:53 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.transport.tcp;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+
+public class SslTransportFactory extends TcpTransportFactory {
+
+
+    /**
+     * Allows subclasses of TcpTransportFactory to create custom instances of
+     * TcpTransportServer.
+     */
+    protected TcpTransportServer createTcpTransportServer(final URI uri) throws Exception
{
+        String protocol = protocol(uri.getScheme());
+        if( protocol!=null ) {
+            return new SslTransportServer(uri).protocol(protocol);
+        }
+        return null;
+
+    }
+
+
+    /**
+     * Allows subclasses of TcpTransportFactory to create custom instances of
+     * TcpTransport.
+     */
+    protected TcpTransport createTransport(URI uri) throws Exception {
+        String protocol = protocol(uri.getScheme());
+        if( protocol !=null ) {
+            SslTransport rc = new SslTransport();
+            rc.setSSLContext(SSLContext.getInstance(protocol));
+            return rc;
+        }
+        return null;
+    }
+
+        /**
+     * Maps uri schemes to a protocol algorithm names.
+     * Valid algorithm names listed at:
+     * http://download.oracle.com/javase/6/docs/technotes/guides/security/StandardNames.html#SSLContext
+     */
+    protected String protocol(String scheme) {
+        if( scheme.equals("tls") ) {
+            return "TLS";
+        } else if( scheme.equals("tlsv1") ) {
+            return "TLSv1";
+        } else if( scheme.equals("tlsv1.1") ) {
+            return "TLSv1.1";
+        } else if( scheme.equals("ssl") ) {
+            return "SSL";
+        } else if( scheme.equals("sslv2") ) {
+            return "SSLv2";
+        } else if( scheme.equals("sslv3") ) {
+            return "SSLv3";
+        }
+        return null;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransportServer.java?rev=1033933&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransportServer.java
(added)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransportServer.java
Thu Nov 11 13:30:53 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.transport.tcp;
+
+import org.apache.activemq.apollo.transport.KeyManagerAware;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import java.io.IOException;
+import java.net.URI;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+
+public class SslTransportServer extends TcpTransportServer implements KeyManagerAware {
+
+    protected KeyManager[] keyManagers;
+    protected String protocol = "TLS";
+    protected SSLContext sslContext;
+
+    public SslTransportServer(URI location) {
+        super(location);
+    }
+
+    public void setKeyManagers(KeyManager[] keyManagers) {
+        this.keyManagers = keyManagers;
+    }
+
+    public void start(Runnable onCompleted) throws Exception {
+        if( keyManagers!=null ) {
+            sslContext = SSLContext.getInstance(protocol);
+            sslContext.init(keyManagers, null, null);
+        } else {
+            sslContext = SSLContext.getDefault();
+        }
+        super.start(onCompleted);
+    }
+
+    protected TcpTransport createTransport() {
+        SslTransport rc = new SslTransport();
+        rc.setSSLContext(sslContext);
+        return rc;
+    }
+
+    protected SslTransportServer protocol(String value) {
+        this.protocol = value;
+        return this;
+    }
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1033933&r1=1033932&r2=1033933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Thu Nov 11 13:30:53 2010
@@ -27,9 +27,9 @@ import org.fusesource.hawtdispatch.Dispa
 import org.fusesource.hawtdispatch.DispatchSource;
 import org.fusesource.hawtdispatch.Retained;
 
+import javax.net.ssl.SSLException;
 import java.io.IOException;
 import java.net.*;
-import java.nio.channels.Channel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.LinkedList;
@@ -44,7 +44,7 @@ public class TcpTransport extends JavaBa
 
     private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
 
-    private Map<String, Object> socketOptions;
+    protected Map<String, Object> socketOptions;
 
     abstract static class SocketState {
         void onStop(Runnable onCompleted) {
@@ -161,20 +161,20 @@ public class TcpTransport extends JavaBa
 
     protected URI remoteLocation;
     protected URI localLocation;
-    private TransportListener listener;
-    private String remoteAddress;
-    private ProtocolCodec wireformat;
+    protected TransportListener listener;
+    protected String remoteAddress;
+    protected ProtocolCodec codec;
 
-    private SocketChannel channel;
+    protected SocketChannel channel;
 
-    private SocketState socketState = new DISCONNECTED();
+    protected SocketState socketState = new DISCONNECTED();
 
-    private DispatchQueue dispatchQueue;
-    private DispatchSource readSource;
-    private DispatchSource writeSource;
+    protected DispatchQueue dispatchQueue;
+    protected DispatchSource readSource;
+    protected DispatchSource writeSource;
 
     protected boolean useLocalHost = true;
-    boolean full = false;
+    protected boolean full = false;
 
     private final Runnable CANCEL_HANDLER = new Runnable() {
         public void run() {
@@ -192,32 +192,32 @@ public class TcpTransport extends JavaBa
         }
     }
 
-    public void connected(SocketChannel channel) throws IOException {
+    public void connected(SocketChannel channel) throws IOException, Exception {
         this.channel = channel;
 
-        if( wireformat!=null ) {
-            wireformat.setReadableByteChannel(this.channel);
-            wireformat.setWritableByteChannel(this.channel);
+        if( codec !=null ) {
+            initializeCodec();
         }
 
         this.channel.configureBlocking(false);
         this.remoteAddress = channel.socket().getRemoteSocketAddress().toString();
         channel.socket().setSoLinger(true, 0);
+        channel.socket().setTcpNoDelay(true);
 
         this.socketState = new CONNECTED();
     }
 
-    public void connecting(URI remoteLocation, URI localLocation) throws IOException {
+    protected void initializeCodec() {
+        codec.setReadableByteChannel(this.channel);
+        codec.setWritableByteChannel(this.channel);
+    }
+
+    public void connecting(URI remoteLocation, URI localLocation) throws IOException, Exception
{
         this.channel = SocketChannel.open();
         this.channel.configureBlocking(false);
         this.remoteLocation = remoteLocation;
         this.localLocation = localLocation;
 
-        if( wireformat!=null ) {
-            wireformat.setReadableByteChannel(this.channel);
-            wireformat.setWritableByteChannel(this.channel);
-        }
-
         if (localLocation != null) {
             InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
localLocation.getPort());
             channel.socket().bind(localAddress);
@@ -300,7 +300,7 @@ public class TcpTransport extends JavaBa
         return host;
     }
 
-    private void onConnected() throws SocketException {
+    protected void onConnected() throws IOException {
 
         readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
         writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
@@ -338,7 +338,7 @@ public class TcpTransport extends JavaBa
         }
         
         dispatchQueue.release();
-        this.wireformat = null;
+        this.codec = null;
     }
 
     public void onTransportFailure(IOException error) {
@@ -361,7 +361,7 @@ public class TcpTransport extends JavaBa
                 throw new IOException("Not running.");
             }
 
-            ProtocolCodec.BufferState rc = wireformat.write(command);
+            ProtocolCodec.BufferState rc = codec.write(command);
             switch (rc ) {
                 case FULL:
                     return false;
@@ -378,15 +378,15 @@ public class TcpTransport extends JavaBa
     }
 
     /**
-     * @retruns true if there are no in progress writes.
+     *
      */
-    private void drainOutbound() {
+    protected void drainOutbound() {
         assert Dispatch.getCurrentQueue() == dispatchQueue;
         if (getServiceState() != STARTED || !socketState.is(CONNECTED.class)) {
             return;
         }
         try {
-            if( wireformat.flush() == ProtocolCodec.BufferState.EMPTY ) {
+            if( codec.flush() == ProtocolCodec.BufferState.EMPTY && flush() ) {
                 writeSource.suspend();
                 listener.onRefill();
             }
@@ -395,12 +395,16 @@ public class TcpTransport extends JavaBa
         }
     }
 
-    private void drainInbound() {
+    protected boolean flush() throws IOException {
+        return true;
+    }
+
+    protected void drainInbound() {
         if (!getServiceState().isStarted() || readSource.isSuspended()) {
             return;
         }
         try {
-            Object command = wireformat.read();
+            Object command = codec.read();
             while ( command!=null ) {
                 try {
                     listener.onTransportCommand(command);
@@ -414,7 +418,7 @@ public class TcpTransport extends JavaBa
                     return;
                 }
 
-                command = wireformat.read();
+                command = codec.read();
             }
         } catch (IOException e) {
             onTransportFailure(e);
@@ -480,14 +484,13 @@ public class TcpTransport extends JavaBa
     }
 
     public ProtocolCodec getProtocolCodec() {
-        return wireformat;
+        return codec;
     }
 
     public void setProtocolCodec(ProtocolCodec protocolCodec) {
-        this.wireformat = protocolCodec;
-        if( channel!=null ) {
-            protocolCodec.setReadableByteChannel(this.channel);
-            protocolCodec.setWritableByteChannel(this.channel);
+        this.codec = protocolCodec;
+        if( channel!=null && codec!=null ) {
+            initializeCodec();
         }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java?rev=1033933&r1=1033932&r2=1033933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java
Thu Nov 11 13:30:53 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.apollo.trans
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -42,38 +43,27 @@ public class TcpTransportFactory impleme
     private static final Logger LOG = LoggerFactory.getLogger(TcpTransportFactory.class);
 
     public TransportServer bind(String location) throws Exception {
-        if( !location.startsWith("tcp:") ) {
-            return null;
-        }
 
         URI uri = new URI(location);
-        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
         TcpTransportServer server = createTcpTransportServer(uri);
+        if (server == null) return null;
+
+        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
         IntrospectionSupport.setProperties(server, options);
         Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options,
"transport.");
         server.setTransportOption(transportOptions);
         return server;
     }
 
-    /**
-     * Allows subclasses of TcpTransportFactory to create custom instances of
-     * TcpTransportServer.
-     */
-    protected TcpTransportServer createTcpTransportServer(final URI location) throws IOException,
URISyntaxException {
-        return new TcpTransportServer(location);
-    }
-
 
     public Transport connect(String location) throws Exception {
-        if( !location.startsWith("tcp:") ) {
-            return null;
-        }
-
         URI uri = new URI(location);
+        TcpTransport transport = createTransport(uri);
+        if (transport == null) return null;
+
         Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
         URI localLocation = getLocalLocation(uri);
 
-        TcpTransport transport = new TcpTransport();
         transport.connecting(uri, localLocation);
 
         Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options,
"socket.");
@@ -83,7 +73,30 @@ public class TcpTransportFactory impleme
         return verify(transport, options);
     }
 
-    private URI getLocalLocation(URI location) {
+    /**
+     * Allows subclasses of TcpTransportFactory to create custom instances of
+     * TcpTransportServer.
+     */
+    protected TcpTransportServer createTcpTransportServer(final URI location) throws IOException,
URISyntaxException, Exception {
+        if( !location.getScheme().equals("tcp") ) {
+            return null;
+        }
+        return new TcpTransportServer(location);
+    }
+
+    /**
+     * Allows subclasses of TcpTransportFactory to create custom instances of
+     * TcpTransport.
+     */
+    protected TcpTransport createTransport(URI uri) throws NoSuchAlgorithmException, Exception
{
+        if( !uri.getScheme().equals("tcp") ) {
+            return null;
+        }
+        TcpTransport transport = new TcpTransport();
+        return transport;
+    }
+
+    protected URI getLocalLocation(URI location) {
         URI localLocation = null;
         String path = location.getPath();
         // see if the path is a local URI location

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java?rev=1033933&r1=1033932&r2=1033933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
Thu Nov 11 13:30:53 2010
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.apollo.transport.tcp;
 
-import org.apache.activemq.apollo.transport.TransportServer;
 import org.apache.activemq.apollo.transport.Transport;
 import org.apache.activemq.apollo.transport.TransportAcceptListener;
+import org.apache.activemq.apollo.transport.TransportServer;
 import org.apache.activemq.apollo.util.IOExceptionSupport;
 import org.apache.activemq.apollo.util.IntrospectionSupport;
 import org.fusesource.hawtdispatch.Dispatch;
@@ -85,7 +85,7 @@ public class TcpTransportServer implemen
     public void start() throws Exception {
         start(null);
     }
-    public void start(Runnable onCompleted) throws IOException {
+    public void start(Runnable onCompleted) throws Exception {
         URI bind = bindURI;
 
         String host = bind.getHost();
@@ -125,7 +125,7 @@ public class TcpTransportServer implemen
                         handleSocket(client);
                         client = channel.accept();
                     }
-                } catch (IOException e) {
+                } catch (Exception e) {
                     listener.onAcceptError(e);
                 }
             }
@@ -187,7 +187,7 @@ public class TcpTransportServer implemen
         this.backlog = backlog;
     }
 
-    protected final void handleSocket(SocketChannel socket) throws IOException {
+    protected final void handleSocket(SocketChannel socket) throws Exception {
         HashMap<String, Object> options = new HashMap<String, Object>();
 //      options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
 //      options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
@@ -202,8 +202,8 @@ public class TcpTransportServer implemen
         listener.onAccept(transport);
     }
 
-    private Transport createTransport(SocketChannel socketChannel, HashMap<String, Object>
options) throws IOException {
-        TcpTransport transport = new TcpTransport();
+    protected Transport createTransport(SocketChannel socketChannel, HashMap<String, Object>
options) throws Exception {
+        TcpTransport transport = createTransport();
         transport.connected(socketChannel);
         if( options!=null ) {
             IntrospectionSupport.setProperties(transport, options);
@@ -214,6 +214,10 @@ public class TcpTransportServer implemen
         return transport;
     }
 
+    protected TcpTransport createTransport() {
+        return new TcpTransport();
+    }
+
     public void setTransportOption(Map<String, Object> transportOptions) {
         this.transportOptions = transportOptions;
     }

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index?rev=1033933&r1=1033932&r2=1033933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
Thu Nov 11 13:30:53 2010
@@ -15,3 +15,4 @@
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
 org.apache.activemq.apollo.transport.tcp.TcpTransportFactory
+org.apache.activemq.apollo.transport.tcp.SslTransportFactory

Copied: activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/KeyManagerAware.java
(from r1033932, activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/KeyManagerAware.java?p2=activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/KeyManagerAware.java&p1=activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java&r1=1033932&r2=1033933&rev=1033933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/KeyManagerAware.java
Thu Nov 11 13:30:53 2010
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.apollo.transport;
 
+import javax.net.ssl.KeyManager;
 
-public interface TransportAcceptListener {
-    
-    void onAccept(Transport transport);
-    
-    void onAcceptError(Exception error);
-
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface KeyManagerAware {
+    void setKeyManagers(KeyManager[] manager);
 }

Copied: activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/SSLContextAware.java
(from r1033932, activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/SSLContextAware.java?p2=activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/SSLContextAware.java&p1=activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java&r1=1033932&r2=1033933&rev=1033933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/SSLContextAware.java
Thu Nov 11 13:30:53 2010
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.apollo.transport;
 
+import javax.net.ssl.SSLContext;
 
-public interface TransportAcceptListener {
-    
-    void onAccept(Transport transport);
-    
-    void onAcceptError(Exception error);
-
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface SSLContextAware {
+    void setSSLContext(SSLContext ctx);
 }

Modified: activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java?rev=1033933&r1=1033932&r2=1033933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java
Thu Nov 11 13:30:53 2010
@@ -17,6 +17,11 @@
 package org.apache.activemq.apollo.transport;
 
 
+/**
+ * Implemented by object that need to get injected by
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public interface TransportAcceptListener {
     
     void onAccept(Transport transport);

Copied: activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TrustManagerAware.java
(from r1033932, activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TrustManagerAware.java?p2=activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TrustManagerAware.java&p1=activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java&r1=1033932&r2=1033933&rev=1033933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TransportAcceptListener.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/TrustManagerAware.java
Thu Nov 11 13:30:53 2010
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.apollo.transport;
 
+import javax.net.ssl.TrustManager;
 
-public interface TransportAcceptListener {
-    
-    void onAccept(Transport transport);
-    
-    void onAcceptError(Exception error);
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface TrustManagerAware {
 
+    void setKeyManagers(TrustManager[] manager);
 }



Mime
View raw message