activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r781177 [1/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transpo...
Date Tue, 02 Jun 2009 21:29:35 GMT
Author: chirino
Date: Tue Jun  2 21:29:30 2009
New Revision: 781177

URL: http://svn.apache.org/viewvc?rev=781177&view=rev
Log:
Copied the client classes into the client module and the tcp/ssl classes into the bio module.


Added:
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java   (with props)
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportServer.java
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java   (with props)
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java   (with props)
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java   (with props)
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java   (with props)
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java   (with props)
    activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/package.html   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQDispatcher.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSender.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQQueueSession.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSession.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASession.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/AlreadyClosedException.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/BlobMessage.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ClientInternalExceptionListener.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Closeable.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConfigurationException.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionAudit.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionClosedException.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ConnectionFailedException.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/CustomDestination.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Disposable.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/EnhancedConnection.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/LocalTransactionEventListener.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Message.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableConsumer.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerListener.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerStartedEvent.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerStoppedEvent.java
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/package.html   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/BoundaryStatisticImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/BoundedRangeStatisticImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/CountStatisticImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionPoolStatsImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionStatsImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/JCAStatsImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/JMSConnectionStatsImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/JMSConsumerStatsImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/JMSProducerStatsImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/JMSSessionStatsImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/JMSStatsImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/PollCountStatisticImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/RangeStatisticImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/Resettable.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/StatsCapable.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/TimeStatisticImpl.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/management/package.html   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/package.html   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java   (with props)
    activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java   (with props)
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java   (with props)
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java   (with props)
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java   (with props)
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java   (with props)
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java   (with props)
Modified:
    activemq/sandbox/activemq-flow/activemq-bio/pom.xml
    activemq/sandbox/activemq-flow/activemq-client/pom.xml
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java

Modified: activemq/sandbox/activemq-flow/activemq-bio/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/pom.xml?rev=781177&r1=781176&r2=781177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-bio/pom.xml Tue Jun  2 21:29:30 2009
@@ -37,6 +37,10 @@
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-transport</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-client</artifactId>
+    </dependency>
 
   <!--   In case we want to look at mina..
     <dependency>

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/ActiveMQSslConnectionFactory.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import java.security.SecureRandom;
+import javax.jms.JMSException;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.TrustManager;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.SslTransportFactory;
+import org.apache.activemq.util.JMSExceptionSupport;
+
+/**
+ * An ActiveMQConnectionFactory that allows access to the key and trust managers
+ * used for SslConnections. There is no reason to use this class unless SSL is
+ * being used AND the key and trust managers need to be specified from within
+ * code. In fact, if the URI passed to this class does not have an "ssl" scheme,
+ * this class will pass all work on to its superclass.
+ * 
+ * @author sepandm@gmail.com
+ */
+public class ActiveMQSslConnectionFactory extends ActiveMQConnectionFactory {
+    // The key and trust managers used to initialize the used SSLContext.
+    protected KeyManager[] keyManager;
+    protected TrustManager[] trustManager;
+    protected SecureRandom secureRandom;
+
+    /**
+     * Sets the key and trust managers used when creating SSL connections.
+     * 
+     * @param km The KeyManagers used.
+     * @param tm The TrustManagers used.
+     * @param random The SecureRandom number used.
+     */
+    public void setKeyAndTrustManagers(final KeyManager[] km, final TrustManager[] tm, final SecureRandom random) {
+        keyManager = km;
+        trustManager = tm;
+        secureRandom = random;
+    }
+
+    /**
+     * Overriding to make special considerations for SSL connections. If we are
+     * not using SSL, the superclass's method is called. If we are using SSL, an
+     * SslConnectionFactory is used and it is given the needed key and trust
+     * managers.
+     * 
+     * @author sepandm@gmail.com
+     */
+    protected Transport createTransport() throws JMSException {
+        // If the given URI is non-ssl, let superclass handle it.
+        if (!brokerURL.getScheme().equals("ssl")) {
+            return super.createTransport();
+        }
+
+        try {
+            SslTransportFactory sslFactory = new SslTransportFactory();
+            sslFactory.setKeyAndTrustManagers(keyManager, trustManager, secureRandom);
+            return sslFactory.doConnect(brokerURL);
+        } catch (Exception e) {
+            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import org.apache.activemq.command.Response;
+
+/**
+ * ResponseHolder utility
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ResponseHolder {
+    protected Response response;
+    protected Object lock = new Object();
+    protected boolean notified;
+
+    /**
+     * Construct a receipt holder
+     */
+    public ResponseHolder() {
+    }
+
+    /**
+     * Set the Response for this holder
+     * 
+     * @param r
+     */
+    public void setResponse(Response r) {
+        synchronized (lock) {
+            this.response = r;
+            notified = true;
+            lock.notify();
+        }
+    }
+
+    /**
+     * Get the Response
+     * 
+     * @return the Response or null if it is closed
+     */
+    public Response getResponse() {
+        return getResponse(0);
+    }
+
+    /**
+     * wait upto <Code>timeout</Code> timeout ms to get a receipt
+     * 
+     * @param timeout
+     * @return
+     */
+    public Response getResponse(int timeout) {
+        synchronized (lock) {
+            if (!notified) {
+                try {
+                    lock.wait(timeout);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+        return this.response;
+    }
+
+    /**
+     * close this holder
+     */
+    public void close() {
+        synchronized (lock) {
+            notified = true;
+            lock.notifyAll();
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ResponseHolder.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * A Transport class that uses SSL and client-side certificate authentication.
+ * Client-side certificate authentication must be enabled through the
+ * constructor. By default, this class will have the same client authentication
+ * behavior as the socket it is passed. This class will set ConnectionInfo's
+ * transportContext to the SSL certificates of the client. NOTE: Accessor method
+ * for needClientAuth was not provided on purpose. This is because
+ * needClientAuth's value must be set before the socket is connected. Otherwise,
+ * unexpected situations may occur.
+ */
+public class SslTransport extends TcpTransport {
+    /**
+     * Connect to a remote node such as a Broker.
+     * 
+     * @param wireFormat The WireFormat to be used.
+     * @param socketFactory The socket factory to be used. Forcing SSLSockets
+     *                for obvious reasons.
+     * @param remoteLocation The remote location.
+     * @param localLocation The local location.
+     * @param needClientAuth If set to true, the underlying socket will need
+     *                client certificate authentication.
+     * @throws UnknownHostException If TcpTransport throws.
+     * @throws IOException If TcpTransport throws.
+     */
+    public SslTransport(WireFormat wireFormat, SSLSocketFactory socketFactory, URI remoteLocation, URI localLocation, boolean needClientAuth) throws IOException {
+        super(wireFormat, socketFactory, remoteLocation, localLocation);
+        if (this.socket != null) {
+            ((SSLSocket)this.socket).setNeedClientAuth(needClientAuth);
+        }
+    }
+
+    /**
+     * Initialize from a ServerSocket. No access to needClientAuth is given
+     * since it is already set within the provided socket.
+     * 
+     * @param wireFormat The WireFormat to be used.
+     * @param socket The Socket to be used. Forcing SSL.
+     * @throws IOException If TcpTransport throws.
+     */
+    public SslTransport(WireFormat wireFormat, SSLSocket socket) throws IOException {
+        super(wireFormat, socket);
+    }
+
+    /**
+     * Overriding in order to add the client's certificates to ConnectionInfo
+     * Commmands.
+     * 
+     * @param command The Command coming in.
+     */
+    public void doConsume(Object command) {
+        // The instanceof can be avoided, but that would require modifying the
+        // Command clas tree and that would require too much effort right
+        // now.
+        if (command instanceof ConnectionInfo) {
+            ConnectionInfo connectionInfo = (ConnectionInfo)command;
+
+            SSLSocket sslSocket = (SSLSocket)this.socket;
+
+            SSLSession sslSession = sslSocket.getSession();
+
+            X509Certificate[] clientCertChain;
+            try {
+                clientCertChain = (X509Certificate[])sslSession.getPeerCertificates();
+            } catch (SSLPeerUnverifiedException e) {
+                clientCertChain = null;
+            }
+
+            connectionInfo.setTransportContext(clientCertChain);
+        }
+
+        super.doConsume(command);
+    }
+
+    /**
+     * @return pretty print of 'this'
+     */
+    public String toString() {
+        return "ssl://" + socket.getInetAddress() + ":" + socket.getPort();
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of the TcpTransportFactory using SSL. The major
+ * contribution from this class is that it is aware of SslTransportServer and
+ * SslTransport classes. All Transports and TransportServers created from this
+ * factory will have their needClientAuth option set to false.
+ * 
+ * @author sepandm@gmail.com (Sepand)
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
+ * @version $Revision$
+ */
+public class SslTransportFactory extends TcpTransportFactory {
+    // The log this uses.,
+    private static final Log LOG = LogFactory.getLog(SslTransportFactory.class);
+    
+    /**
+     * Overriding to use SslTransportServer and allow for proper reflection.
+     */
+    public TransportServer doBind(final URI location) throws IOException {
+        try {
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+
+            ServerSocketFactory serverSocketFactory = createServerSocketFactory();
+            SslTransportServer server = new SslTransportServer(this, location, (SSLServerSocketFactory)serverSocketFactory);
+            server.setWireFormatFactory(createWireFormatFactory(options));
+            IntrospectionSupport.setProperties(server, options);
+            Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
+            server.setTransportOption(transportOptions);
+            server.bind();
+
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * Overriding to allow for proper configuration through reflection.
+     */
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+
+        SslTransport sslTransport = (SslTransport)transport.narrow(SslTransport.class);
+        IntrospectionSupport.setProperties(sslTransport, options);
+
+        Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
+
+        sslTransport.setSocketOptions(socketOptions);
+
+//        if (sslTransport.isTrace()) {
+//            try {
+//                transport = TransportLoggerFactory.getInstance().createTransportLogger(transport,
+//                        sslTransport.getLogWriterName(), sslTransport.isDynamicManagement(), sslTransport.isStartLogging(), sslTransport.getJmxPort());
+//            } catch (Throwable e) {
+//                LOG.error("Could not create TransportLogger object for: " + sslTransport.getLogWriterName() + ", reason: " + e, e);
+//            }
+//        }
+
+        transport = new InactivityMonitor(transport, format);
+
+        // Only need the WireFormatNegotiator if using openwire
+        if (format instanceof OpenWireFormat) {
+            transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, sslTransport.getMinmumWireFormatVersion());
+        }
+
+        return transport;
+    }
+
+    /**
+     * Overriding to use SslTransports.
+     */
+    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+        URI localLocation = null;
+        String path = location.getPath();
+        // see if the path is a local URI location
+        if (path != null && path.length() > 0) {
+            int localPortIndex = path.indexOf(':');
+            try {
+                Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
+                String localString = location.getScheme() + ":/" + path;
+                localLocation = new URI(localString);
+            } catch (Exception e) {
+                LOG.warn("path isn't a valid local location for SslTransport to use", e);
+            }
+        }
+        SocketFactory socketFactory = createSocketFactory();
+        return new SslTransport(wf, (SSLSocketFactory)socketFactory, location, localLocation, false);
+    }
+
+
+
+    /**
+     * Creates a new SSL ServerSocketFactory. The given factory will use
+     * user-provided key and trust managers (if the user provided them).
+     * 
+     * @return Newly created (Ssl)ServerSocketFactory.
+     * @throws IOException 
+     */
+    protected ServerSocketFactory createServerSocketFactory() throws IOException {
+        if( SslContext.getCurrentSslContext()!=null ) {
+            SslContext ctx = SslContext.getCurrentSslContext();
+            try {
+                return ctx.getSSLContext().getServerSocketFactory();
+            } catch (Exception e) {
+                throw IOExceptionSupport.create(e);
+            }
+        } else {
+            return SSLServerSocketFactory.getDefault();
+        }
+    }
+
+    /**
+     * Creates a new SSL SocketFactory. The given factory will use user-provided
+     * key and trust managers (if the user provided them).
+     * 
+     * @return Newly created (Ssl)SocketFactory.
+     * @throws IOException 
+     */
+    protected SocketFactory createSocketFactory() throws IOException {
+        
+        if( SslContext.getCurrentSslContext()!=null ) {
+            SslContext ctx = SslContext.getCurrentSslContext();
+            try {
+                return ctx.getSSLContext().getSocketFactory();
+            } catch (Exception e) {
+                throw IOExceptionSupport.create(e);
+            }
+        } else {
+            return SSLSocketFactory.getDefault();
+        }
+        
+    }
+
+    /**
+     * 
+     * @param km
+     * @param tm
+     * @param random
+     * @deprecated "Do not use anymore... using static initializers like this method only allows the JVM to use 1 SSL configuration per broker."
+     * @see org.apache.activemq.broker.SslContext#setCurrentSslContext(SslContext)
+     * @see org.apache.activemq.broker.SslContext#getSSLContext()
+     */
+    public void setKeyAndTrustManagers(KeyManager[] km, TrustManager[] tm, SecureRandom random) {
+        SslContext ctx = new SslContext(km, tm, random);
+        SslContext.setCurrentSslContext(ctx);
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportServer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportServer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/SslTransportServer.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocket;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ *  An SSL TransportServer.
+ * 
+ *  Allows for client certificate authentication (refer to setNeedClientAuth for
+ *      details).
+ *  NOTE: Client certificate authentication is disabled by default. 
+ *
+ */
+public class SslTransportServer extends TcpTransportServer {
+    
+    // Specifies if sockets created from this server should needClientAuth.
+    private boolean needClientAuth;
+    
+    // Specifies if sockets created from this server should wantClientAuth.
+    private boolean wantClientAuth;
+    
+    
+    /**
+     * Creates a ssl transport server for the specified url using the provided
+     * serverSocketFactory
+     * 
+     * @param transportFactory The factory used to create transports when connections arrive.
+     * @param location The location of the broker to bind to.
+     * @param serverSocketFactory The factory used to create this server.
+     * @throws IOException passed up from TcpTransportFactory.
+     * @throws URISyntaxException passed up from TcpTransportFactory.
+     */
+    public SslTransportServer(
+            SslTransportFactory transportFactory,
+            URI location,
+            SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+        super(transportFactory, location, serverSocketFactory);
+    }
+    
+    /**
+     * Sets whether client authentication should be required
+     * Must be called before {@link #bind()}
+     * Note: Calling this method clears the wantClientAuth flag
+     * in the underlying implementation.
+     */
+    public void setNeedClientAuth(boolean needAuth) {
+        this.needClientAuth = needAuth;
+    }
+    
+    /**
+     * Returns whether client authentication should be required.
+     */
+    public boolean getNeedClientAuth() {
+        return this.needClientAuth;
+    }
+    
+    /**
+     * Returns whether client authentication should be requested.
+     */
+    public boolean getWantClientAuth() {
+        return this.wantClientAuth;
+    }
+    
+    /**
+     * Sets whether client authentication should be requested.
+     * Must be called before {@link #bind()}
+     * Note: Calling this method clears the needClientAuth flag
+     * in the underlying implementation.
+     */
+    public void setWantClientAuth(boolean wantAuth) {
+        this.wantClientAuth = wantAuth;
+    }
+    
+    /**
+     * Binds this socket to the previously specified URI.
+     * 
+     * Overridden to allow for proper handling of needClientAuth.
+     * 
+     * @throws IOException passed up from TcpTransportServer. 
+     */
+    public void bind() throws IOException {
+        super.bind();
+        if (needClientAuth) {
+            ((SSLServerSocket)this.serverSocket).setNeedClientAuth(true);
+        } else if (wantClientAuth) {
+            ((SSLServerSocket)this.serverSocket).setWantClientAuth(true);
+        }
+    }
+    
+    /**
+     * Used to create Transports for this server.
+     * 
+     * Overridden to allow the use of SslTransports (instead of TcpTransports).
+     * 
+     * @param socket The incoming socket that will be wrapped into the new Transport. 
+     * @param format The WireFormat being used.
+     * @return The newly return (SSL) Transport.
+     * @throws IOException
+     */
+    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+        return new SslTransport(format, (SSLSocket)socket);
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An optimized buffered input stream for Tcp
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class TcpBufferedInputStream extends FilterInputStream {
+    private static final int DEFAULT_BUFFER_SIZE = 8192;
+    protected byte internalBuffer[];
+    protected int count;
+    protected int position;
+
+    public TcpBufferedInputStream(InputStream in) {
+        this(in, DEFAULT_BUFFER_SIZE);
+    }
+
+    public TcpBufferedInputStream(InputStream in, int size) {
+        super(in);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        internalBuffer = new byte[size];
+    }
+
+    private void fill() throws IOException {
+        byte[] buffer = internalBuffer;
+        count = 0;
+        position = 0;
+        int n = in.read(buffer, position, buffer.length - position);
+        if (n > 0) {
+            count = n + position;
+        }
+    }
+
+    public int read() throws IOException {
+        if (position >= count) {
+            fill();
+            if (position >= count) {
+                return -1;
+            }
+        }
+        return internalBuffer[position++] & 0xff;
+    }
+
+    private int readStream(byte[] b, int off, int len) throws IOException {
+        int avail = count - position;
+        if (avail <= 0) {
+            if (len >= internalBuffer.length) {
+                return in.read(b, off, len);
+            }
+            fill();
+            avail = count - position;
+            if (avail <= 0) {
+                return -1;
+            }
+        }
+        int cnt = (avail < len) ? avail : len;
+        System.arraycopy(internalBuffer, position, b, off, cnt);
+        position += cnt;
+        return cnt;
+    }
+
+    public int read(byte b[], int off, int len) throws IOException {
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+        int n = 0;
+        for (;;) {
+            int nread = readStream(b, off + n, len - n);
+            if (nread <= 0) {
+                return (n == 0) ? nread : n;
+            }
+            n += nread;
+            if (n >= len) {
+                return n;
+            }
+            // if not closed but no bytes available, return
+            InputStream input = in;
+            if (input != null && input.available() <= 0) {
+                return n;
+            }
+        }
+    }
+
+    public long skip(long n) throws IOException {
+        if (n <= 0) {
+            return 0;
+        }
+        long avail = count - position;
+        if (avail <= 0) {
+            return in.skip(n);
+        }
+        long skipped = (avail < n) ? avail : n;
+        position += skipped;
+        return skipped;
+    }
+
+    public int available() throws IOException {
+        return in.available() + (count - position);
+    }
+
+    public boolean markSupported() {
+        return false;
+    }
+
+    public void close() throws IOException {
+        if (in != null) {
+            in.close();
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.tcp;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An optimized buffered outputstream for Tcp
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+
+public class TcpBufferedOutputStream extends FilterOutputStream {
+    private static final int BUFFER_SIZE = 8192;
+    private byte[] buffer;
+    private int bufferlen;
+    private int count;
+    private volatile long writeTimestamp = -1;//concurrent reads of this value
+    
+
+    /**
+     * Constructor
+     * 
+     * @param out
+     */
+    public TcpBufferedOutputStream(OutputStream out) {
+        this(out, BUFFER_SIZE);
+    }
+
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream with the specified buffer size.
+     * 
+     * @param out the underlying output stream.
+     * @param size the buffer size.
+     * @throws IllegalArgumentException if size <= 0.
+     */
+    public TcpBufferedOutputStream(OutputStream out, int size) {
+        super(out);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buffer = new byte[size];
+        bufferlen = size;
+    }
+
+    /**
+     * write a byte on to the stream
+     * 
+     * @param b - byte to write
+     * @throws IOException
+     */
+    public void write(int b) throws IOException {
+        if ((bufferlen - count) < 1) {
+            flush();
+        }
+        buffer[count++] = (byte)b;
+    }
+
+    /**
+     * write a byte array to the stream
+     * 
+     * @param b the byte buffer
+     * @param off the offset into the buffer
+     * @param len the length of data to write
+     * @throws IOException
+     */
+    public void write(byte b[], int off, int len) throws IOException {
+        if (b != null) {
+            if ((bufferlen - count) < len) {
+                flush();
+            }
+            if (buffer.length >= len) {
+                System.arraycopy(b, off, buffer, count, len);
+                count += len;
+            } else {
+                try {
+                    writeTimestamp = System.currentTimeMillis();
+                    out.write(b, off, len);
+                } finally {
+                    writeTimestamp = -1;
+                }
+            }
+        }
+    }
+
+    /**
+     * flush the data to the output stream This doesn't call flush on the
+     * underlying outputstream, because Tcp is particularly efficent at doing
+     * this itself ....
+     * 
+     * @throws IOException
+     */
+    public void flush() throws IOException {
+        if (count > 0 && out != null) {
+            try {
+                writeTimestamp = System.currentTimeMillis();
+                out.write(buffer, 0, count);
+            } finally {
+            	writeTimestamp = -1;
+            }
+            count = 0;
+        }
+    }
+
+    /**
+     * close this stream
+     * 
+     * @throws IOException
+     */
+    public void close() throws IOException {
+        super.close();
+    }
+
+    public boolean isWriting() {
+        return writeTimestamp > 0;
+    }
+    
+    public long getWriteTimestamp() {
+    	return writeTimestamp;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+//import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.TransportThreadSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of the {@link Transport} interface using raw tcp/ip
+ * 
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
+ * @version $Revision$
+ */
+public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
+    private static final Log LOG = LogFactory.getLog(TcpTransport.class);
+    private static final ThreadPoolExecutor SOCKET_CLOSE;
+    protected final URI remoteLocation;
+    protected final URI localLocation;
+    protected final WireFormat wireFormat;
+
+    protected int connectionTimeout = 30000;
+    protected int soTimeout;
+    protected int socketBufferSize = 64 * 1024;
+    protected int ioBufferSize = 8 * 1024;
+    protected boolean closeAsync=true;
+    protected Socket socket;
+    protected DataOutputStream dataOut;
+    protected DataInputStream dataIn;
+    protected TcpBufferedOutputStream buffOut = null;
+    /**
+     * trace=true -> the Transport stack where this TcpTransport
+     * object will be, will have a TransportLogger layer
+     * trace=false -> the Transport stack where this TcpTransport
+     * object will be, will NOT have a TransportLogger layer, and therefore
+     * will never be able to print logging messages.
+     * This parameter is most probably set in Connection or TransportConnector URIs.
+     */
+    protected boolean trace = false;
+//    /**
+//     * Name of the LogWriter implementation to use.
+//     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
+//     * This parameter is most probably set in Connection or TransportConnector URIs.
+//     */
+//    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
+    /**
+     * Specifies if the TransportLogger will be manageable by JMX or not.
+     * Also, as long as there is at least 1 TransportLogger which is manageable,
+     * a TransportLoggerControl MBean will me created.
+     */
+    protected boolean dynamicManagement = false;
+    /**
+     * startLogging=true -> the TransportLogger object of the Transport stack
+     * will initially write messages to the log.
+     * startLogging=false -> the TransportLogger object of the Transport stack
+     * will initially NOT write messages to the log.
+     * This parameter only has an effect if trace == true.
+     * This parameter is most probably set in Connection or TransportConnector URIs.
+     */
+    protected boolean startLogging = true;
+    /**
+     * Specifies the port that will be used by the JMX server to manage
+     * the TransportLoggers.
+     * This should only be set in an URI by a client (producer or consumer) since
+     * a broker will already create a JMX server.
+     * It is useful for people who test a broker and clients in the same machine
+     * and want to control both via JMX; a different port will be needed.
+     */
+    protected int jmxPort = 1099;
+    protected boolean useLocalHost = true;
+    protected int minmumWireFormatVersion;
+    protected SocketFactory socketFactory;
+    protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
+
+    private Map<String, Object> socketOptions;
+    private Boolean keepAlive;
+    private Boolean tcpNoDelay;
+    private Thread runnerThread;
+
+    /**
+     * Connect to a remote Node - e.g. a Broker
+     * 
+     * @param wireFormat
+     * @param socketFactory
+     * @param remoteLocation
+     * @param localLocation - e.g. local InetAddress and local port
+     * @throws IOException
+     * @throws UnknownHostException
+     */
+    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
+                        URI localLocation) throws UnknownHostException, IOException {
+        this.wireFormat = wireFormat;
+        this.socketFactory = socketFactory;
+        try {
+            this.socket = socketFactory.createSocket();
+        } catch (SocketException e) {
+            this.socket = null;
+        }
+        this.remoteLocation = remoteLocation;
+        this.localLocation = localLocation;
+        setDaemon(false);
+    }
+
+    /**
+     * Initialize from a server Socket
+     * 
+     * @param wireFormat
+     * @param socket
+     * @throws IOException
+     */
+    public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
+        this.wireFormat = wireFormat;
+        this.socket = socket;
+        this.remoteLocation = null;
+        this.localLocation = null;
+        setDaemon(true);
+    }
+
+    /**
+     * A one way asynchronous send
+     */
+    public void oneway(Object command) throws IOException {
+        checkStarted();
+        wireFormat.marshal(command, dataOut);
+        dataOut.flush();
+    }
+
+    /**
+     * @return pretty print of 'this'
+     */
+    public String toString() {
+        return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
+    }
+
+    /**
+     * reads packets from a Socket
+     */
+    public void run() {
+        LOG.trace("TCP consumer thread for " + this + " starting");
+        this.runnerThread=Thread.currentThread();
+        try {
+            while (!isStopped()) {
+                doRun();
+            }
+        } catch (IOException e) {
+            stoppedLatch.get().countDown();
+            onException(e);
+        } catch (Throwable e){
+            stoppedLatch.get().countDown();
+            IOException ioe=new IOException("Unexpected error occured");
+            ioe.initCause(e);
+            onException(ioe);
+        }finally {
+            stoppedLatch.get().countDown();
+        }
+    }
+
+    protected void doRun() throws IOException {
+        try {
+            Object command = readCommand();
+            doConsume(command);
+        } catch (SocketTimeoutException e) {
+        } catch (InterruptedIOException e) {
+        }
+    }
+
+    protected Object readCommand() throws IOException {
+        return wireFormat.unmarshal(dataIn);
+    }
+
+    // Properties
+    // -------------------------------------------------------------------------
+
+    public boolean isTrace() {
+        return trace;
+    }
+
+    public void setTrace(boolean trace) {
+        this.trace = trace;
+    }
+    
+//    public String getLogWriterName() {
+//        return logWriterName;
+//    }
+//
+//    public void setLogWriterName(String logFormat) {
+//        this.logWriterName = logFormat;
+//    }
+
+    public boolean isDynamicManagement() {
+        return dynamicManagement;
+    }
+
+    public void setDynamicManagement(boolean useJmx) {
+        this.dynamicManagement = useJmx;
+    }
+
+    public boolean isStartLogging() {
+        return startLogging;
+    }
+
+    public void setStartLogging(boolean startLogging) {
+        this.startLogging = startLogging;
+    }
+
+    public int getJmxPort() {
+        return jmxPort;
+    }
+
+    public void setJmxPort(int jmxPort) {
+        this.jmxPort = jmxPort;
+    }
+    
+    public int getMinmumWireFormatVersion() {
+        return minmumWireFormatVersion;
+    }
+
+    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
+        this.minmumWireFormatVersion = minmumWireFormatVersion;
+    }
+
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    /**
+     * Sets whether 'localhost' or the actual local host name should be used to
+     * make local connections. On some operating systems such as Macs its not
+     * possible to connect as the local host name so localhost is better.
+     */
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
+    }
+
+    public int getSocketBufferSize() {
+        return socketBufferSize;
+    }
+
+    /**
+     * Sets the buffer size to use on the socket
+     */
+    public void setSocketBufferSize(int socketBufferSize) {
+        this.socketBufferSize = socketBufferSize;
+    }
+
+    public int getSoTimeout() {
+        return soTimeout;
+    }
+
+    /**
+     * Sets the socket timeout
+     */
+    public void setSoTimeout(int soTimeout) {
+        this.soTimeout = soTimeout;
+    }
+
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    /**
+     * Sets the timeout used to connect to the socket
+     */
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    public Boolean getKeepAlive() {
+        return keepAlive;
+    }
+
+    /**
+     * Enable/disable TCP KEEP_ALIVE mode
+     */
+    public void setKeepAlive(Boolean keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public Boolean getTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    /**
+     * Enable/disable the TCP_NODELAY option on the socket
+     */
+    public void setTcpNoDelay(Boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    /**
+     * @return the ioBufferSize
+     */
+    public int getIoBufferSize() {
+        return this.ioBufferSize;
+    }
+
+    /**
+     * @param ioBufferSize the ioBufferSize to set
+     */
+    public void setIoBufferSize(int ioBufferSize) {
+        this.ioBufferSize = ioBufferSize;
+    }
+    
+    /**
+     * @return the closeAsync
+     */
+    public boolean isCloseAsync() {
+        return closeAsync;
+    }
+
+    /**
+     * @param closeAsync the closeAsync to set
+     */
+    public void setCloseAsync(boolean closeAsync) {
+        this.closeAsync = closeAsync;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    protected String resolveHostName(String host) throws UnknownHostException {
+        String localName = InetAddress.getLocalHost().getHostName();
+        if (localName != null && isUseLocalHost()) {
+            if (localName.equals(host)) {
+                return "localhost";
+            }
+        }
+        return host;
+    }
+
+    /**
+     * Configures the socket for use
+     * 
+     * @param sock
+     * @throws SocketException
+     */
+    protected void initialiseSocket(Socket sock) throws SocketException {
+        if (socketOptions != null) {
+            IntrospectionSupport.setProperties(socket, socketOptions);
+        }
+
+        try {
+            sock.setReceiveBufferSize(socketBufferSize);
+            sock.setSendBufferSize(socketBufferSize);
+        } catch (SocketException se) {
+            LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
+            LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
+        }
+        sock.setSoTimeout(soTimeout);
+
+        if (keepAlive != null) {
+            sock.setKeepAlive(keepAlive.booleanValue());
+        }
+        if (tcpNoDelay != null) {
+            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
+        }
+    }
+
+    protected void doStart() throws Exception {
+        connect();
+        stoppedLatch.set(new CountDownLatch(1));
+        super.doStart();
+    }
+
+    protected void connect() throws Exception {
+
+        if (socket == null && socketFactory == null) {
+            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
+        }
+
+        InetSocketAddress localAddress = null;
+        InetSocketAddress remoteAddress = null;
+
+        if (localLocation != null) {
+            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
+                                                 localLocation.getPort());
+        }
+
+        if (remoteLocation != null) {
+            String host = resolveHostName(remoteLocation.getHost());
+            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
+        }
+
+        if (socket != null) {
+
+            if (localAddress != null) {
+                socket.bind(localAddress);
+            }
+
+            // If it's a server accepted socket.. we don't need to connect it
+            // to a remote address.
+            if (remoteAddress != null) {
+                if (connectionTimeout >= 0) {
+                    socket.connect(remoteAddress, connectionTimeout);
+                } else {
+                    socket.connect(remoteAddress);
+                }
+            }
+
+        } else {
+            // For SSL sockets.. you can't create an unconnected socket :(
+            // This means the timout option are not supported either.
+            if (localAddress != null) {
+                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
+                                                    localAddress.getAddress(), localAddress.getPort());
+            } else {
+                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
+            }
+        }
+
+        initialiseSocket(socket);
+        initializeStreams();
+    }
+
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Stopping transport " + this);
+        }
+
+        // Closing the streams flush the sockets before closing.. if the socket
+        // is hung.. then this hangs the close.
+        // closeStreams();
+        if (socket != null) {
+            if (closeAsync) {
+                //closing the socket can hang also 
+                final CountDownLatch latch = new CountDownLatch(1);
+                
+                SOCKET_CLOSE.execute(new Runnable() {
+    
+                    public void run() {
+                        try {
+                            socket.close();
+                        } catch (IOException e) {
+                            LOG.debug("Caught exception closing socket",e);
+                        }finally {
+                            latch.countDown();
+                        }
+                    }
+                    
+                });
+                latch.await(1,TimeUnit.SECONDS);
+            }else {
+                try {
+                    socket.close();
+                } catch (IOException e) {
+                    LOG.debug("Caught exception closing socket",e);
+                }
+            }
+           
+        }
+    }
+
+    /**
+     * Override so that stop() blocks until the run thread is no longer running.
+     */
+    @Override
+    public void stop() throws Exception {
+        super.stop();
+        CountDownLatch countDownLatch = stoppedLatch.get();
+        if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
+            countDownLatch.await(1,TimeUnit.SECONDS);
+        }
+    }
+
+    protected void initializeStreams() throws Exception {
+        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+        this.dataIn = new DataInputStream(buffIn);
+        buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+        this.dataOut = new DataOutputStream(buffOut);
+    }
+
+    protected void closeStreams() throws IOException {
+        if (dataOut != null) {
+            dataOut.close();
+        }
+        if (dataIn != null) {
+            dataIn.close();
+        }
+    }
+
+    public void setSocketOptions(Map<String, Object> socketOptions) {
+        this.socketOptions = new HashMap<String, Object>(socketOptions);
+    }
+
+    public String getRemoteAddress() {
+        if (socket != null) {
+            return "" + socket.getRemoteSocketAddress();
+        }
+        return null;
+    }
+    
+    @Override
+    public <T> T narrow(Class<T> target) {
+        if (target == Socket.class) {
+            return target.cast(socket);
+        } else if ( target == TcpBufferedOutputStream.class) {
+            return target.cast(buffOut);
+        }
+        return super.narrow(target);
+    }
+    
+
+    static {
+        SOCKET_CLOSE =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
+                thread.setPriority(Thread.MAX_PRIORITY);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+//import org.apache.activemq.transport.TransportLoggerFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
+ * @version $Revision$
+ */
+public class TcpTransportFactory extends TransportFactory {
+    private static final Log LOG = LogFactory.getLog(TcpTransportFactory.class);
+
+    public TransportServer doBind(final URI location) throws IOException {
+        try {
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+
+            ServerSocketFactory serverSocketFactory = createServerSocketFactory();
+            TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
+            server.setWireFormatFactory(createWireFormatFactory(options));
+            IntrospectionSupport.setProperties(server, options);
+            Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
+            server.setTransportOption(transportOptions);
+            server.bind();
+
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    /**
+     * Allows subclasses of TcpTransportFactory to create custom instances of
+     * TcpTransportServer.
+     * 
+     * @param location
+     * @param serverSocketFactory
+     * @return
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+    protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+        return new TcpTransportServer(this, location, serverSocketFactory);
+    }
+
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+
+        TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
+        IntrospectionSupport.setProperties(tcpTransport, options);
+
+        Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
+        tcpTransport.setSocketOptions(socketOptions);
+        
+//        if (tcpTransport.isTrace()) {
+//            try {
+//                transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, tcpTransport.getLogWriterName(),
+//                        tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort());
+//            } catch (Throwable e) {
+//                LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e);
+//            }
+//        }
+
+        boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
+        if (useInactivityMonitor && isUseInactivityMonitor(transport)) {
+            transport = new InactivityMonitor(transport, format);
+        }
+
+        // Only need the WireFormatNegotiator if using openwire
+        if (format instanceof OpenWireFormat) {
+            transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
+        }
+
+        return transport;
+    }
+
+    private String getOption(Map options, String key, String def) {
+        String rc = (String) options.remove(key);
+        if( rc == null ) {
+            rc = def;
+        }
+        return rc;
+    }
+
+    /**
+     * Returns true if the inactivity monitor should be used on the transport
+     */
+    protected boolean isUseInactivityMonitor(Transport transport) {
+        return true;
+    }
+
+    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+        URI localLocation = null;
+        String path = location.getPath();
+        // see if the path is a local URI location
+        if (path != null && path.length() > 0) {
+            int localPortIndex = path.indexOf(':');
+            try {
+                Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
+                String localString = location.getScheme() + ":/" + path;
+                localLocation = new URI(localString);
+            } catch (Exception e) {
+                LOG.warn("path isn't a valid local location for TcpTransport to use", e);
+            }
+        }
+        SocketFactory socketFactory = createSocketFactory();
+        return createTcpTransport(wf, socketFactory, location, localLocation);
+    }
+
+    /**
+     * Allows subclasses of TcpTransportFactory to provide a create custom
+     * TcpTransport intances.
+     * 
+     * @param location
+     * @param wf
+     * @param socketFactory
+     * @param localLocation
+     * @return
+     * @throws UnknownHostException
+     * @throws IOException
+     */
+    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+        return new TcpTransport(wf, socketFactory, location, localLocation);
+    }
+
+    protected ServerSocketFactory createServerSocketFactory() throws IOException {
+        return ServerSocketFactory.getDefault();
+    }
+
+    protected SocketFactory createSocketFactory() throws IOException {
+        return SocketFactory.getDefault();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-bio/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message