activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: Activemq5 unit test fixes --Fix server wrapper startup/shutdown issues that mainly causes a lot of "Address already in use" in tests.
Date Mon, 13 Jul 2015 13:40:03 GMT
Activemq5 unit test fixes
  --Fix server wrapper startup/shutdown issues that mainly causes
    a lot of "Address already in use" in tests.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fbdf9cd0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fbdf9cd0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fbdf9cd0

Branch: refs/heads/master
Commit: fbdf9cd04c36397fe2a15a0a65b1d1b491ef923a
Parents: fe95e1f
Author: Howard Gao <hgao@redhat.com>
Authored: Mon Jul 13 14:36:44 2015 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Jul 13 09:39:52 2015 -0400

----------------------------------------------------------------------
 .../activemq/ActiveMQConnectionFactory.java     | 995 +++++++++++++++++++
 .../artemiswrapper/ArtemisBrokerHelper.java     |  20 +
 .../apache/activemq/broker/BrokerService.java   |  11 +-
 .../artemiswrapper/ArtemisBrokerBase.java       |   2 +-
 .../artemiswrapper/ArtemisBrokerWrapper.java    |  17 +-
 .../transport/tcp/TcpTransportFactory.java      | 169 ++++
 .../activemq/ActiveMQConnectionFactoryTest.java |   8 +
 .../activemq/ActiveMQInputStreamTest.java       |   5 +-
 8 files changed, 1215 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fbdf9cd0/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
new file mode 100644
index 0000000..c530ab7
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -0,0 +1,995 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.RejectedExecutionHandler;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.naming.Context;
+
+import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.apache.activemq.jndi.JNDIBaseStorable;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.management.StatsCapable;
+import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.URISupport.CompositeData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
+   private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class);
+   private static final String DEFAULT_BROKER_HOST;
+   private static final int DEFAULT_BROKER_PORT;
+   private static URI defaultTcpUri;
+   static{
+      String host = null;
+      String port = null;
+      try {
+         host = AccessController.doPrivileged(new PrivilegedAction<String>() {
+            @Override
+            public String run() {
+               String result = System.getProperty("org.apache.activemq.AMQ_HOST");
+               result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_HOST","localhost") : result;
+               return result;
+            }
+         });
+         port = AccessController.doPrivileged(new PrivilegedAction<String>() {
+            @Override
+            public String run() {
+               String result = System.getProperty("org.apache.activemq.AMQ_PORT");
+               result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_PORT","61616") : result;
+               return result;
+            }
+         });
+      }catch(Throwable e){
+         LOG.debug("Failed to look up System properties for host and port",e);
+      }
+      host = (host == null || host.isEmpty()) ? "localhost" : host;
+      port = (port == null || port.isEmpty()) ? "61616" : port;
+      DEFAULT_BROKER_HOST = host;
+      DEFAULT_BROKER_PORT = Integer.parseInt(port);
+   }
+
+
+   public static final String DEFAULT_BROKER_BIND_URL;
+
+   static{
+      final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
+      String bindURL = null;
+
+      try {
+         bindURL = AccessController.doPrivileged(new PrivilegedAction<String>() {
+            @Override
+            public String run() {
+               String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL");
+               result = (result==null||result.isEmpty()) ?  System.getProperty("BROKER_BIND_URL",defaultURL) : result;
+               return result;
+            }
+         });
+      }catch(Throwable e){
+         LOG.debug("Failed to look up System properties for host and port",e);
+      }
+      bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL;
+      DEFAULT_BROKER_BIND_URL = bindURL;
+        try {
+            defaultTcpUri = new URI(defaultURL);
+        } catch (URISyntaxException e) {
+            LOG.debug("Failed to build default tcp url",e);
+        }
+
+   }
+
+   public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
+   public static final String DEFAULT_USER = null;
+   public static final String DEFAULT_PASSWORD = null;
+   public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
+
+   protected URI brokerURL;
+   protected URI vmBrokerUri;
+   protected String userName;
+   protected String password;
+   protected String clientID;
+   protected boolean dispatchAsync=true;
+   protected boolean alwaysSessionAsync=true;
+
+   JMSStatsImpl factoryStats = new JMSStatsImpl();
+
+   private IdGenerator clientIdGenerator;
+   private String clientIDPrefix;
+   private IdGenerator connectionIdGenerator;
+   private String connectionIDPrefix;
+
+   // client policies
+   private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+   private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+   {
+      redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
+   }
+   private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
+   private MessageTransformer transformer;
+
+   private boolean disableTimeStampsByDefault;
+   private boolean optimizedMessageDispatch = true;
+   private long optimizeAcknowledgeTimeOut = 300;
+   private long optimizedAckScheduledAckInterval = 0;
+   private boolean copyMessageOnSend = true;
+   private boolean useCompression;
+   private boolean objectMessageSerializationDefered;
+   private boolean useAsyncSend;
+   private boolean optimizeAcknowledge;
+   private int closeTimeout = 15000;
+   private boolean useRetroactiveConsumer;
+   private boolean exclusiveConsumer;
+   private boolean nestedMapAndListEnabled = true;
+   private boolean alwaysSyncSend;
+   private boolean watchTopicAdvisories = true;
+   private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
+   private long warnAboutUnstartedConnectionTimeout = 500L;
+   private int sendTimeout = 0;
+   private boolean sendAcksAsync=true;
+   private TransportListener transportListener;
+   private ExceptionListener exceptionListener;
+   private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+   private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+   private boolean useDedicatedTaskRunner;
+   private long consumerFailoverRedeliveryWaitPeriod = 0;
+   private boolean checkForDuplicates = true;
+   private ClientInternalExceptionListener clientInternalExceptionListener;
+   private boolean messagePrioritySupported = false;
+   private boolean transactedIndividualAck = false;
+   private boolean nonBlockingRedelivery = false;
+   private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
+   private TaskRunnerFactory sessionTaskRunner;
+   private RejectedExecutionHandler rejectedTaskHandler = null;
+   protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
+   private boolean rmIdFromConnectionId = false;
+   private boolean consumerExpiryCheckEnabled = true;
+
+   // /////////////////////////////////////////////
+   //
+   // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
+   //
+   // /////////////////////////////////////////////
+
+   public ActiveMQConnectionFactory() {
+      this(DEFAULT_BROKER_URL);
+   }
+
+   public ActiveMQConnectionFactory(String brokerURL) {
+      this(createURI(brokerURL));
+      try
+      {
+         URI uri = new URI(brokerURL);
+         String scheme = uri.getScheme();
+         if ("vm".equals(scheme)) {
+            Map<String, String> params = URISupport.parseParameters(uri);
+            params.clear();
+
+            this.vmBrokerUri = URISupport.createRemainingURI(uri, params);;
+         }
+      }
+      catch (URISyntaxException e)
+      {
+      }
+
+   }
+
+   public ActiveMQConnectionFactory(URI brokerURL) {
+      setBrokerURL(brokerURL.toString());
+   }
+
+   public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
+      setUserName(userName);
+      setPassword(password);
+      setBrokerURL(brokerURL.toString());
+   }
+
+   public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
+      setUserName(userName);
+      setPassword(password);
+      setBrokerURL(brokerURL);
+   }
+
+   public ActiveMQConnectionFactory copy() {
+      try {
+         return (ActiveMQConnectionFactory)super.clone();
+      } catch (CloneNotSupportedException e) {
+         throw new RuntimeException("This should never happen: " + e, e);
+      }
+   }
+
+   private static URI createURI(String brokerURL) {
+      try {
+         URI uri = new URI(brokerURL);
+         String scheme = uri.getScheme();
+         if ("vm".equals(scheme)) {
+             Map<String, String> params = URISupport.parseParameters(uri);
+             params.put("invmBrokerId", uri.getHost() == null ? "localhost" : uri.getHost());
+             defaultTcpUri = URISupport.createRemainingURI(defaultTcpUri, params);
+             return defaultTcpUri;
+         }
+         return uri;
+      } catch (URISyntaxException e) {
+         throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
+      }
+   }
+
+   @Override
+   public Connection createConnection() throws JMSException {
+      return createActiveMQConnection();
+   }
+
+   @Override
+   public Connection createConnection(String userName, String password) throws JMSException {
+      return createActiveMQConnection(userName, password);
+   }
+
+   @Override
+   public QueueConnection createQueueConnection() throws JMSException {
+      return createActiveMQConnection().enforceQueueOnlyConnection();
+   }
+
+   @Override
+   public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
+      return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
+   }
+
+   @Override
+   public TopicConnection createTopicConnection() throws JMSException {
+      return createActiveMQConnection();
+   }
+
+   @Override
+   public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
+      return createActiveMQConnection(userName, password);
+   }
+
+   @Override
+   public StatsImpl getStats() {
+      return this.factoryStats;
+   }
+
+   // /////////////////////////////////////////////
+   //
+   // Implementation methods.
+   //
+   // /////////////////////////////////////////////
+
+   protected ActiveMQConnection createActiveMQConnection() throws JMSException {
+      return createActiveMQConnection(userName, password);
+   }
+
+   protected Transport createTransport() throws JMSException {
+      try {
+         System.out.println("xxxxxcreating conn: " + brokerURL.toString());
+         Transport t = TransportFactory.connect(brokerURL);
+         System.out.println("xxxxxxxxxxxx created transport" + t);
+         return t;
+      } catch (Exception e) {
+         throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
+      }
+   }
+
+   protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
+      if (brokerURL == null) {
+         throw new ConfigurationException("brokerURL not set.");
+      }
+      ActiveMQConnection connection = null;
+      try {
+         Transport transport = createTransport();
+         connection = createActiveMQConnection(transport, factoryStats);
+
+         connection.setUserName(userName);
+         connection.setPassword(password);
+
+         configureConnection(connection);
+
+         transport.start();
+
+         if (clientID != null) {
+            connection.setDefaultClientID(clientID);
+         }
+
+         return connection;
+      } catch (JMSException e) {
+         // Clean up!
+         try {
+            connection.close();
+         } catch (Throwable ignore) {
+         }
+         throw e;
+      } catch (Exception e) {
+         // Clean up!
+         try {
+            connection.close();
+         } catch (Throwable ignore) {
+         }
+         throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
+      }
+   }
+
+   protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
+      ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
+              getConnectionIdGenerator(), stats);
+      return connection;
+   }
+
+   protected void configureConnection(ActiveMQConnection connection) throws JMSException {
+      connection.setPrefetchPolicy(getPrefetchPolicy());
+      connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
+      connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
+      connection.setCopyMessageOnSend(isCopyMessageOnSend());
+      connection.setUseCompression(isUseCompression());
+      connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
+      connection.setDispatchAsync(isDispatchAsync());
+      connection.setUseAsyncSend(isUseAsyncSend());
+      connection.setAlwaysSyncSend(isAlwaysSyncSend());
+      connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
+      connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
+      connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
+      connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
+      connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
+      connection.setExclusiveConsumer(isExclusiveConsumer());
+      connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
+      connection.setTransformer(getTransformer());
+      connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
+      connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
+      connection.setProducerWindowSize(getProducerWindowSize());
+      connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
+      connection.setSendTimeout(getSendTimeout());
+      connection.setCloseTimeout(getCloseTimeout());
+      connection.setSendAcksAsync(isSendAcksAsync());
+      connection.setAuditDepth(getAuditDepth());
+      connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
+      connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
+      connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
+      connection.setCheckForDuplicates(isCheckForDuplicates());
+      connection.setMessagePrioritySupported(isMessagePrioritySupported());
+      connection.setTransactedIndividualAck(isTransactedIndividualAck());
+      connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
+      connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
+      connection.setSessionTaskRunner(getSessionTaskRunner());
+      connection.setRejectedTaskHandler(getRejectedTaskHandler());
+      connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
+      connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
+      connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
+      if (transportListener != null) {
+         connection.addTransportListener(transportListener);
+      }
+      if (exceptionListener != null) {
+         connection.setExceptionListener(exceptionListener);
+      }
+      if (clientInternalExceptionListener != null) {
+         connection.setClientInternalExceptionListener(clientInternalExceptionListener);
+      }
+   }
+
+   // /////////////////////////////////////////////
+   //
+   // Property Accessors
+   //
+   // /////////////////////////////////////////////
+
+   public String getBrokerURL() {
+      System.out.println("vm uri: " + vmBrokerUri);
+      if (vmBrokerUri != null) return vmBrokerUri.toString();
+      return brokerURL == null ? null : brokerURL.toString();
+   }
+
+   public void setBrokerURL(String brokerURL) {
+      URI uri = null;
+      try
+      {
+         uri = new URI(brokerURL);
+         String scheme = uri.getScheme();
+         if ("vm".equals(scheme)) {
+            this.vmBrokerUri = uri;
+         }
+      }
+      catch (URISyntaxException e)
+      {
+      }
+      this.brokerURL = createURI(brokerURL);
+
+      // Use all the properties prefixed with 'jms.' to set the connection
+      // factory
+      // options.
+      if (this.brokerURL.getQuery() != null) {
+         // It might be a standard URI or...
+         try {
+
+            Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
+            Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
+            if (buildFromMap(jmsOptionsMap)) {
+               if (!jmsOptionsMap.isEmpty()) {
+                  String msg = "There are " + jmsOptionsMap.size()
+                          + " jms options that couldn't be set on the ConnectionFactory."
+                          + " Check the options are spelled correctly."
+                          + " Unknown parameters=[" + jmsOptionsMap + "]."
+                          + " This connection factory cannot be started.";
+                  throw new IllegalArgumentException(msg);
+               }
+
+               this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
+            }
+
+         } catch (URISyntaxException e) {
+         }
+
+      } else {
+
+         // It might be a composite URI.
+         try {
+            CompositeData data = URISupport.parseComposite(this.brokerURL);
+            Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
+            if (buildFromMap(jmsOptionsMap)) {
+               if (!jmsOptionsMap.isEmpty()) {
+                  String msg = "There are " + jmsOptionsMap.size()
+                          + " jms options that couldn't be set on the ConnectionFactory."
+                          + " Check the options are spelled correctly."
+                          + " Unknown parameters=[" + jmsOptionsMap + "]."
+                          + " This connection factory cannot be started.";
+                  throw new IllegalArgumentException(msg);
+               }
+
+               this.brokerURL = data.toURI();
+            }
+         } catch (URISyntaxException e) {
+         }
+      }
+   }
+
+   public String getClientID() {
+      return clientID;
+   }
+
+   public void setClientID(String clientID) {
+      this.clientID = clientID;
+   }
+
+   public boolean isCopyMessageOnSend() {
+      return copyMessageOnSend;
+   }
+
+   public void setCopyMessageOnSend(boolean copyMessageOnSend) {
+      this.copyMessageOnSend = copyMessageOnSend;
+   }
+
+   public boolean isDisableTimeStampsByDefault() {
+      return disableTimeStampsByDefault;
+   }
+
+   public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
+      this.disableTimeStampsByDefault = disableTimeStampsByDefault;
+   }
+
+   public boolean isOptimizedMessageDispatch() {
+      return optimizedMessageDispatch;
+   }
+
+   public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
+      this.optimizedMessageDispatch = optimizedMessageDispatch;
+   }
+
+   public String getPassword() {
+      return password;
+   }
+
+   public void setPassword(String password) {
+      this.password = password;
+   }
+
+   public ActiveMQPrefetchPolicy getPrefetchPolicy() {
+      return prefetchPolicy;
+   }
+
+   public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
+      this.prefetchPolicy = prefetchPolicy;
+   }
+
+   public boolean isUseAsyncSend() {
+      return useAsyncSend;
+   }
+
+   public BlobTransferPolicy getBlobTransferPolicy() {
+      return blobTransferPolicy;
+   }
+
+   public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
+      this.blobTransferPolicy = blobTransferPolicy;
+   }
+
+   public void setUseAsyncSend(boolean useAsyncSend) {
+      this.useAsyncSend = useAsyncSend;
+   }
+
+   public synchronized boolean isWatchTopicAdvisories() {
+      return watchTopicAdvisories;
+   }
+
+   public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
+      this.watchTopicAdvisories = watchTopicAdvisories;
+   }
+
+   public boolean isAlwaysSyncSend() {
+      return this.alwaysSyncSend;
+   }
+
+   public void setAlwaysSyncSend(boolean alwaysSyncSend) {
+      this.alwaysSyncSend = alwaysSyncSend;
+   }
+
+   public String getUserName() {
+      return userName;
+   }
+
+   public void setUserName(String userName) {
+      this.userName = userName;
+   }
+
+   public boolean isUseRetroactiveConsumer() {
+      return useRetroactiveConsumer;
+   }
+
+   public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
+      this.useRetroactiveConsumer = useRetroactiveConsumer;
+   }
+
+   public boolean isExclusiveConsumer() {
+      return exclusiveConsumer;
+   }
+
+   public void setExclusiveConsumer(boolean exclusiveConsumer) {
+      this.exclusiveConsumer = exclusiveConsumer;
+   }
+
+   public RedeliveryPolicy getRedeliveryPolicy() {
+      return redeliveryPolicyMap.getDefaultEntry();
+   }
+
+   public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+      this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
+   }
+
+   public RedeliveryPolicyMap getRedeliveryPolicyMap() {
+      return this.redeliveryPolicyMap;
+   }
+
+   public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
+      this.redeliveryPolicyMap = redeliveryPolicyMap;
+   }
+
+   public MessageTransformer getTransformer() {
+      return transformer;
+   }
+
+   public int getSendTimeout() {
+      return sendTimeout;
+   }
+
+   public void setSendTimeout(int sendTimeout) {
+      this.sendTimeout = sendTimeout;
+   }
+
+   public boolean isSendAcksAsync() {
+      return sendAcksAsync;
+   }
+
+   public void setSendAcksAsync(boolean sendAcksAsync) {
+      this.sendAcksAsync = sendAcksAsync;
+   }
+
+   public boolean isMessagePrioritySupported() {
+      return this.messagePrioritySupported;
+   }
+
+   public void setMessagePrioritySupported(boolean messagePrioritySupported) {
+      this.messagePrioritySupported = messagePrioritySupported;
+   }
+
+
+   public void setTransformer(MessageTransformer transformer) {
+      this.transformer = transformer;
+   }
+
+   @SuppressWarnings({ "unchecked", "rawtypes" })
+   @Override
+   public void buildFromProperties(Properties properties) {
+
+      if (properties == null) {
+         properties = new Properties();
+      }
+
+      String temp = properties.getProperty(Context.PROVIDER_URL);
+      if (temp == null || temp.length() == 0) {
+         temp = properties.getProperty("brokerURL");
+      }
+      if (temp != null && temp.length() > 0) {
+         setBrokerURL(temp);
+      }
+
+      Map<String, Object> p = new HashMap(properties);
+      buildFromMap(p);
+   }
+
+   public boolean buildFromMap(Map<String, Object> properties) {
+      boolean rc = false;
+
+      ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
+      if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
+         setPrefetchPolicy(p);
+         rc = true;
+      }
+
+      RedeliveryPolicy rp = new RedeliveryPolicy();
+      if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
+         setRedeliveryPolicy(rp);
+         rc = true;
+      }
+
+      BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
+      if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
+         setBlobTransferPolicy(blobTransferPolicy);
+         rc = true;
+      }
+
+      rc |= IntrospectionSupport.setProperties(this, properties);
+
+      return rc;
+   }
+
+   @Override
+   public void populateProperties(Properties props) {
+      props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
+
+      if (getBrokerURL() != null) {
+         props.setProperty(Context.PROVIDER_URL, getBrokerURL());
+         props.setProperty("brokerURL", getBrokerURL());
+      }
+
+      if (getClientID() != null) {
+         props.setProperty("clientID", getClientID());
+      }
+
+      IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
+      IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
+      IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
+
+      props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
+      props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
+      props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
+      props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
+
+      if (getPassword() != null) {
+         props.setProperty("password", getPassword());
+      }
+
+      props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
+      props.setProperty("useCompression", Boolean.toString(isUseCompression()));
+      props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
+      props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
+
+      if (getUserName() != null) {
+         props.setProperty("userName", getUserName());
+      }
+
+      props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
+      props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
+      props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
+      props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
+      props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
+      props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
+      props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
+      props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
+      props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
+      props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
+      props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
+      props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
+      props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
+      props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
+      props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
+      props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
+      props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
+      props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
+      props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled()));
+   }
+
+   public boolean isUseCompression() {
+      return useCompression;
+   }
+
+   public void setUseCompression(boolean useCompression) {
+      this.useCompression = useCompression;
+   }
+
+   public boolean isObjectMessageSerializationDefered() {
+      return objectMessageSerializationDefered;
+   }
+
+   public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
+      this.objectMessageSerializationDefered = objectMessageSerializationDefered;
+   }
+
+   public boolean isDispatchAsync() {
+      return dispatchAsync;
+   }
+
+   public void setDispatchAsync(boolean asyncDispatch) {
+      this.dispatchAsync = asyncDispatch;
+   }
+
+   public int getCloseTimeout() {
+      return closeTimeout;
+   }
+
+   public void setCloseTimeout(int closeTimeout) {
+      this.closeTimeout = closeTimeout;
+   }
+
+   public boolean isAlwaysSessionAsync() {
+      return alwaysSessionAsync;
+   }
+
+   public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
+      this.alwaysSessionAsync = alwaysSessionAsync;
+   }
+
+   public boolean isOptimizeAcknowledge() {
+      return optimizeAcknowledge;
+   }
+
+   public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
+      this.optimizeAcknowledge = optimizeAcknowledge;
+   }
+
+   public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
+      this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
+   }
+
+   public long getOptimizeAcknowledgeTimeOut() {
+      return optimizeAcknowledgeTimeOut;
+   }
+
+   public boolean isNestedMapAndListEnabled() {
+      return nestedMapAndListEnabled;
+   }
+
+   public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
+      this.nestedMapAndListEnabled = structuredMapsEnabled;
+   }
+
+   public String getClientIDPrefix() {
+      return clientIDPrefix;
+   }
+
+   public void setClientIDPrefix(String clientIDPrefix) {
+      this.clientIDPrefix = clientIDPrefix;
+   }
+
+   protected synchronized IdGenerator getClientIdGenerator() {
+      if (clientIdGenerator == null) {
+         if (clientIDPrefix != null) {
+            clientIdGenerator = new IdGenerator(clientIDPrefix);
+         } else {
+            clientIdGenerator = new IdGenerator();
+         }
+      }
+      return clientIdGenerator;
+   }
+
+   protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
+      this.clientIdGenerator = clientIdGenerator;
+   }
+
+   public void setConnectionIDPrefix(String connectionIDPrefix) {
+      this.connectionIDPrefix = connectionIDPrefix;
+   }
+
+   protected synchronized IdGenerator getConnectionIdGenerator() {
+      if (connectionIdGenerator == null) {
+         if (connectionIDPrefix != null) {
+            connectionIdGenerator = new IdGenerator(connectionIDPrefix);
+         } else {
+            connectionIdGenerator = new IdGenerator();
+         }
+      }
+      return connectionIdGenerator;
+   }
+
+   protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
+      this.connectionIdGenerator = connectionIdGenerator;
+   }
+
+   public boolean isStatsEnabled() {
+      return this.factoryStats.isEnabled();
+   }
+
+   public void setStatsEnabled(boolean statsEnabled) {
+      this.factoryStats.setEnabled(statsEnabled);
+   }
+
+   public synchronized int getProducerWindowSize() {
+      return producerWindowSize;
+   }
+
+   public synchronized void setProducerWindowSize(int producerWindowSize) {
+      this.producerWindowSize = producerWindowSize;
+   }
+
+   public long getWarnAboutUnstartedConnectionTimeout() {
+      return warnAboutUnstartedConnectionTimeout;
+   }
+
+   public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
+      this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
+   }
+
+   public TransportListener getTransportListener() {
+      return transportListener;
+   }
+
+   public void setTransportListener(TransportListener transportListener) {
+      this.transportListener = transportListener;
+   }
+
+
+   public ExceptionListener getExceptionListener() {
+      return exceptionListener;
+   }
+
+   public void setExceptionListener(ExceptionListener exceptionListener) {
+      this.exceptionListener = exceptionListener;
+   }
+
+   public int getAuditDepth() {
+      return auditDepth;
+   }
+
+   public void setAuditDepth(int auditDepth) {
+      this.auditDepth = auditDepth;
+   }
+
+   public int getAuditMaximumProducerNumber() {
+      return auditMaximumProducerNumber;
+   }
+
+   public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+      this.auditMaximumProducerNumber = auditMaximumProducerNumber;
+   }
+
+   public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
+      this.useDedicatedTaskRunner = useDedicatedTaskRunner;
+   }
+
+   public boolean isUseDedicatedTaskRunner() {
+      return useDedicatedTaskRunner;
+   }
+
+   public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
+      this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
+   }
+
+   public long getConsumerFailoverRedeliveryWaitPeriod() {
+      return consumerFailoverRedeliveryWaitPeriod;
+   }
+
+   public ClientInternalExceptionListener getClientInternalExceptionListener() {
+      return clientInternalExceptionListener;
+   }
+
+   public void setClientInternalExceptionListener(
+           ClientInternalExceptionListener clientInternalExceptionListener) {
+      this.clientInternalExceptionListener = clientInternalExceptionListener;
+   }
+
+   public boolean isCheckForDuplicates() {
+      return this.checkForDuplicates;
+   }
+
+   public void setCheckForDuplicates(boolean checkForDuplicates) {
+      this.checkForDuplicates = checkForDuplicates;
+   }
+
+   public boolean isTransactedIndividualAck() {
+      return transactedIndividualAck;
+   }
+
+   public void setTransactedIndividualAck(boolean transactedIndividualAck) {
+      this.transactedIndividualAck = transactedIndividualAck;
+   }
+
+
+   public boolean isNonBlockingRedelivery() {
+      return nonBlockingRedelivery;
+   }
+
+   public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
+      this.nonBlockingRedelivery = nonBlockingRedelivery;
+   }
+
+   public int getMaxThreadPoolSize() {
+      return maxThreadPoolSize;
+   }
+
+   public void setMaxThreadPoolSize(int maxThreadPoolSize) {
+      this.maxThreadPoolSize = maxThreadPoolSize;
+   }
+
+   public TaskRunnerFactory getSessionTaskRunner() {
+      return sessionTaskRunner;
+   }
+
+   public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
+      this.sessionTaskRunner = sessionTaskRunner;
+   }
+
+   public RejectedExecutionHandler getRejectedTaskHandler() {
+      return rejectedTaskHandler;
+   }
+
+   public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
+      this.rejectedTaskHandler = rejectedTaskHandler;
+   }
+
+   public long getOptimizedAckScheduledAckInterval() {
+      return optimizedAckScheduledAckInterval;
+   }
+
+   public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
+      this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
+   }
+
+
+   public boolean isRmIdFromConnectionId() {
+      return rmIdFromConnectionId;
+   }
+
+   public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
+      this.rmIdFromConnectionId = rmIdFromConnectionId;
+   }
+
+   public boolean isConsumerExpiryCheckEnabled() {
+      return consumerExpiryCheckEnabled;
+   }
+
+   public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
+      this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fbdf9cd0/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/artemiswrapper/ArtemisBrokerHelper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/artemiswrapper/ArtemisBrokerHelper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/artemiswrapper/ArtemisBrokerHelper.java
index 8d5cdab..d586886 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/artemiswrapper/ArtemisBrokerHelper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/artemiswrapper/ArtemisBrokerHelper.java
@@ -21,6 +21,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URI;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
 
 public class ArtemisBrokerHelper {
@@ -72,5 +73,24 @@ public class ArtemisBrokerHelper {
       service = startedBroker;
    }
 
+   public static BrokerService getBroker() {
+      return (BrokerService)service;
+   }
+
+   public static void stopArtemisBroker() throws Exception
+   {
+      try
+      {
+         if (service != null)
+         {
+            Method startMethod = serviceClass.getMethod("stop");
+            startMethod.invoke(service, (Object[]) null);
+         }
+      }
+      finally
+      {
+         service = null;
+      }
+   }
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fbdf9cd0/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
index 34dc6e4..75eff1f 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -194,11 +194,13 @@ public class BrokerService implements Service
    @Override
    public void stop() throws Exception
    {
+      System.out.println("broker is: " + broker);
 
       LOG.info("Apache ActiveMQ Artemis{} ({}, {}) is shutting down", new Object[]{getBrokerVersion(), getBrokerName(), brokerId});
 
       if (broker != null)
       {
+         System.out.println("______________________stopping broker: " + broker.getClass().getName());
          broker.stop();
          broker = null;
       }
@@ -566,7 +568,10 @@ public class BrokerService implements Service
    public TransportConnector addConnector(URI bindAddress) throws Exception
    {
       Integer port = bindAddress.getPort();
-      this.extraConnectors.add(port);
+      if (port != 0)
+      {
+         this.extraConnectors.add(port);
+      }
       return null;
    }
 
@@ -719,6 +724,10 @@ public class BrokerService implements Service
       return null;
    }
 
+   public String getDefaultUri()
+   {
+      return "tcp://localhost:61616";
+   }
 }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fbdf9cd0/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
index 227ad1b..52f1f42 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
@@ -507,7 +507,7 @@ public abstract class ArtemisBrokerBase implements Broker {
 			throws Exception {
 		if (netty) {
 			return createDefaultConfig(new HashMap<String, Object>(),
-					INVM_ACCEPTOR_FACTORY, NETTY_ACCEPTOR_FACTORY);
+					NETTY_ACCEPTOR_FACTORY);
 		} else {
 			return createDefaultConfig(new HashMap<String, Object>(),
 					INVM_ACCEPTOR_FACTORY);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fbdf9cd0/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index ced7857..811d6e5 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -58,7 +58,8 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
    {
       testDir = temporaryFolder.getRoot().getAbsolutePath();
       clearDataRecreateServerDirs();
-      server = createServer(realStore, false);
+      server = createServer(realStore, true);
+      server.getConfiguration().getAcceptorConfigurations().clear();
       HashMap<String, Object> params = new HashMap<String, Object>();
       params.put(TransportConstants.PORT_PROP_NAME, "61616");
       params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE");
@@ -66,8 +67,6 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
 
       Configuration serverConfig = server.getConfiguration();
 
-      Set<TransportConfiguration> acceptors0 = serverConfig.getAcceptorConfigurations();
-
       Map<String, AddressSettings> addressSettingsMap = serverConfig.getAddressesSettings();
 
       //do policy translation
@@ -170,18 +169,18 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
          anySet.add(destRole);
       }
 
-      jmsServer = new JMSServerManagerImpl(server);
-      InVMNamingContext namingContext = new InVMNamingContext();
-      jmsServer.setRegistry(new JndiBindingRegistry(namingContext));
-      jmsServer.start();
-
       Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
       Iterator<TransportConfiguration> iter = acceptors.iterator();
-
       while (iter.hasNext())
       {
          System.out.println("acceptor =>: " + iter.next());
       }
+
+      jmsServer = new JMSServerManagerImpl(server);
+      InVMNamingContext namingContext = new InVMNamingContext();
+      jmsServer.setRegistry(new JndiBindingRegistry(namingContext));
+      jmsServer.start();
+
       server.start();
 
 /*

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fbdf9cd0/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
new file mode 100644
index 0000000..959dea4
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.TransportLoggerSupport;
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.*;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TcpTransportFactory extends TransportFactory {
+   private static final Logger LOG = LoggerFactory.getLogger(TcpTransportFactory.class);
+
+   private static volatile String brokerService = null;
+
+   //if a broker is started or stopped it should set this.
+   public static void setBrokerName(String name) {
+      brokerService = name;
+   }
+
+   @Override
+   public Transport doConnect(URI location) throws Exception {
+      //here check broker, if no broker, we start one
+      Map<String, String> params = URISupport.parseParameters(location);
+      String brokerId = params.remove("invmBrokerId");
+      params.clear();
+      location = URISupport.createRemainingURI(location, params);
+      if (brokerService == null) {
+
+         ArtemisBrokerHelper.startArtemisBroker(location);
+         brokerService = location.toString();
+
+         if (brokerId != null) {
+            BrokerRegistry.getInstance().bind(brokerId, ArtemisBrokerHelper.getBroker());
+            System.out.println("bound: " + brokerId);
+         }
+      }
+      return super.doConnect(location);
+   }
+
+   public TransportServer doBind(final URI location) throws IOException {
+      try {
+         Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
+
+         ServerSocketFactory serverSocketFactory = createServerSocketFactory();
+         TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
+         server.setWireFormatFactory(createWireFormatFactory(options));
+         IntrospectionSupport.setProperties(server, options);
+         Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
+         server.setTransportOption(transportOptions);
+         server.bind();
+
+         return server;
+      } catch (URISyntaxException e) {
+         throw IOExceptionSupport.create(e);
+      }
+   }
+
+   protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+      return new TcpTransportServer(this, location, serverSocketFactory);
+   }
+
+   @SuppressWarnings("rawtypes")
+   public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+
+      TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
+      IntrospectionSupport.setProperties(tcpTransport, options);
+
+      Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
+      tcpTransport.setSocketOptions(socketOptions);
+
+      if (tcpTransport.isTrace()) {
+         try {
+            transport = TransportLoggerSupport.createTransportLogger(transport, tcpTransport.getLogWriterName(), tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort());
+         } catch (Throwable e) {
+            LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e);
+         }
+      }
+
+      boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
+      if (useInactivityMonitor && isUseInactivityMonitor(transport)) {
+         transport = createInactivityMonitor(transport, format);
+         IntrospectionSupport.setProperties(transport, options);
+      }
+
+      // Only need the WireFormatNegotiator if using openwire
+      if (format instanceof OpenWireFormat) {
+         transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
+      }
+
+      return super.compositeConfigure(transport, format, options);
+   }
+
+   protected boolean isUseInactivityMonitor(Transport transport) {
+      return true;
+   }
+
+   protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+      URI localLocation = null;
+      String path = location.getPath();
+      // see if the path is a local URI location
+      if (path != null && path.length() > 0) {
+         int localPortIndex = path.indexOf(':');
+         try {
+            Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
+            String localString = location.getScheme() + ":/" + path;
+            localLocation = new URI(localString);
+         } catch (Exception e) {
+            LOG.warn("path isn't a valid local location for TcpTransport to use", e.getMessage());
+            if(LOG.isDebugEnabled()) {
+               LOG.debug("Failure detail", e);
+            }
+         }
+      }
+      SocketFactory socketFactory = createSocketFactory();
+      return createTcpTransport(wf, socketFactory, location, localLocation);
+   }
+
+   protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+      return new TcpTransport(wf, socketFactory, location, localLocation);
+   }
+
+   protected ServerSocketFactory createServerSocketFactory() throws IOException {
+      return ServerSocketFactory.getDefault();
+   }
+
+   protected SocketFactory createSocketFactory() throws IOException {
+      return SocketFactory.getDefault();
+   }
+
+   protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+      return new InactivityMonitor(transport, format);
+   }
+
+   public static void clearService()
+   {
+      brokerService = null;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fbdf9cd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
index 353f1d3..bb1776c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
@@ -27,9 +27,11 @@ import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Session;
 
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
 import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +67,11 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
             broker.stop();
         } catch (Throwable ignore) {
         }
+        try {
+            ArtemisBrokerHelper.stopArtemisBroker();
+        } catch (Throwable ignore) {
+        }
+        TcpTransportFactory.clearService();
     }
 
     public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException {
@@ -120,6 +127,7 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
     }
 
     public void testGetBrokerName() throws URISyntaxException, JMSException {
+        System.out.println("------------------beging testing...............");
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
         connection = (ActiveMQConnection)cf.createConnection();
         connection.start();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fbdf9cd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
index 77f422e..c3fd2d0 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
@@ -24,8 +24,10 @@ import javax.jms.Session;
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +55,8 @@ public class ActiveMQInputStreamTest extends TestCase {
         broker.start();
         broker.waitUntilStarted();
 
-        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+        //some internal api we don't implement
+        connectionUri = broker.getDefaultUri();
     }
 
     @Override


Mime
View raw message