activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [29/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:59 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnector.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnector.java
new file mode 100644
index 0000000..569ae20
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnector.java
@@ -0,0 +1,1266 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting.impl.netty;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.security.AccessController;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.base64.Base64;
+import io.netty.handler.codec.http.ClientCookieEncoder;
+import io.netty.handler.codec.http.Cookie;
+import io.netty.handler.codec.http.CookieDecoder;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.DefaultHttpRequest;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpRequestEncoder;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseDecoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.AttributeKey;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import org.apache.activemq6.api.config.HornetQDefaultConfiguration;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.core.client.impl.ClientSessionFactoryImpl;
+import org.apache.activemq6.core.protocol.core.impl.HornetQClientProtocolManager;
+import org.apache.activemq6.core.remoting.impl.ssl.SSLSupport;
+import org.apache.activemq6.core.server.HornetQComponent;
+import org.apache.activemq6.spi.core.remoting.AbstractConnector;
+import org.apache.activemq6.spi.core.remoting.BufferHandler;
+import org.apache.activemq6.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq6.spi.core.remoting.Connection;
+import org.apache.activemq6.spi.core.remoting.ConnectionLifeCycleListener;
+import org.apache.activemq6.utils.ConfigurationHelper;
+import org.apache.activemq6.utils.FutureLatch;
+
+import static org.apache.activemq6.utils.Base64.encodeBytes;
+
+/**
+ * A NettyConnector
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:tlee@redhat.com">Trustin Lee</a>
+ * @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
+ */
+public class NettyConnector extends AbstractConnector
+{
+   // Constants -----------------------------------------------------
+   public static final String JAVAX_KEYSTORE_PATH_PROP_NAME = "javax.net.ssl.keyStore";
+   public static final String JAVAX_KEYSTORE_PASSWORD_PROP_NAME = "javax.net.ssl.keyStorePassword";
+   public static final String JAVAX_TRUSTSTORE_PATH_PROP_NAME = "javax.net.ssl.trustStore";
+   public static final String JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME = "javax.net.ssl.trustStorePassword";
+   public static final String HORNETQ_KEYSTORE_PROVIDER_PROP_NAME = "org.apache.activemq6.ssl.keyStoreProvider";
+   public static final String HORNETQ_KEYSTORE_PATH_PROP_NAME = "org.apache.activemq6.ssl.keyStore";
+   public static final String HORNETQ_KEYSTORE_PASSWORD_PROP_NAME = "org.apache.activemq6.ssl.keyStorePassword";
+   public static final String HORNETQ_TRUSTSTORE_PROVIDER_PROP_NAME = "org.apache.activemq6.ssl.trustStoreProvider";
+   public static final String HORNETQ_TRUSTSTORE_PATH_PROP_NAME = "org.apache.activemq6.ssl.trustStore";
+   public static final String HORNETQ_TRUSTSTORE_PASSWORD_PROP_NAME = "org.apache.activemq6.ssl.trustStorePassword";
+
+   // Constants for HTTP upgrade
+   // These constants are exposed publicly as they are used on the server-side to fetch
+   // headers from the HTTP request, compute some values and fill the HTTP response
+   public static final String MAGIC_NUMBER = "CF70DEB8-70F9-4FBA-8B4F-DFC3E723B4CD";
+   public static final String SEC_HORNETQ_REMOTING_KEY = "Sec-HornetQRemoting-Key";
+   public static final String SEC_HORNETQ_REMOTING_ACCEPT = "Sec-HornetQRemoting-Accept";
+   public static final String HORNETQ_REMOTING = "hornetq-remoting";
+
+   private static final AttributeKey<String> REMOTING_KEY = AttributeKey.valueOf(SEC_HORNETQ_REMOTING_KEY);
+
+   // Default Configuration
+   public static final Map<String, Object> DEFAULT_CONFIG;
+
+   static
+   {
+      // Disable resource leak detection for performance reasons by default
+      ResourceLeakDetector.setEnabled(false);
+
+      // Set default Configuration
+      Map<String, Object> config = new HashMap<String , Object>();
+      config.put(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST);
+      config.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT);
+      DEFAULT_CONFIG = Collections.unmodifiableMap(config);
+   }
+
+   // Attributes ----------------------------------------------------
+
+   private Class<? extends Channel> channelClazz;
+
+   private Bootstrap bootstrap;
+
+   private ChannelGroup channelGroup;
+
+   private final BufferHandler handler;
+
+   private final ConnectionLifeCycleListener listener;
+
+   private final boolean sslEnabled;
+
+   private final boolean httpEnabled;
+
+   private final long httpMaxClientIdleTime;
+
+   private final long httpClientIdleScanPeriod;
+
+   private final boolean httpRequiresSessionId;
+
+   // if true, after the connection, the connector will send
+   // a HTTP GET request (+ Upgrade: hornetq-remoting) that
+   // will be handled by the server's http server.
+   private final boolean httpUpgradeEnabled;
+
+   private final boolean useServlet;
+
+   private final String host;
+
+   private final int port;
+
+   private final String localAddress;
+
+   private final int localPort;
+
+   private final String keyStoreProvider;
+
+   private final String keyStorePath;
+
+   private final String keyStorePassword;
+
+   private final String trustStoreProvider;
+
+   private final String trustStorePath;
+
+   private final String trustStorePassword;
+
+   private final String enabledCipherSuites;
+
+   private final String enabledProtocols;
+
+   private final boolean tcpNoDelay;
+
+   private final int tcpSendBufferSize;
+
+   private final int tcpReceiveBufferSize;
+
+   private final long batchDelay;
+
+   private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
+
+   private final String servletPath;
+
+   private final int nioRemotingThreads;
+
+   private final boolean useNioGlobalWorkerPool;
+
+   private final ScheduledExecutorService scheduledThreadPool;
+
+   private final Executor closeExecutor;
+
+   private BatchFlusher flusher;
+
+   private ScheduledFuture<?> batchFlusherFuture;
+
+   private EventLoopGroup group;
+
+   private int connectTimeoutMillis;
+
+   private final ClientProtocolManager protocolManager;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   public NettyConnector(final Map<String, Object> configuration,
+                         final BufferHandler handler,
+                         final ConnectionLifeCycleListener listener,
+                         final Executor closeExecutor,
+                         final Executor threadPool,
+                         final ScheduledExecutorService scheduledThreadPool)
+   {
+      this(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, new HornetQClientProtocolManager());
+   }
+
+
+   public NettyConnector(final Map<String, Object> configuration,
+                         final BufferHandler handler,
+                         final ConnectionLifeCycleListener listener,
+                         final Executor closeExecutor,
+                         final Executor threadPool,
+                         final ScheduledExecutorService scheduledThreadPool,
+                         final ClientProtocolManager protocolManager)
+   {
+      super(configuration);
+
+      this.protocolManager = protocolManager;
+
+      if (listener == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.nullListener();
+      }
+
+      if (handler == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.nullHandler();
+      }
+
+      this.listener = listener;
+
+      this.handler = handler;
+
+      sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
+                                                          TransportConstants.DEFAULT_SSL_ENABLED,
+                                                          configuration);
+      httpEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME,
+                                                           TransportConstants.DEFAULT_HTTP_ENABLED,
+                                                           configuration);
+      servletPath = ConfigurationHelper.getStringProperty(TransportConstants.SERVLET_PATH,
+                                                          TransportConstants.DEFAULT_SERVLET_PATH,
+                                                          configuration);
+      if (httpEnabled)
+      {
+         httpMaxClientIdleTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME,
+                                                                     TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME,
+                                                                     configuration);
+         httpClientIdleScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD,
+                                                                        TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD,
+                                                                        configuration);
+         httpRequiresSessionId = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_REQUIRES_SESSION_ID,
+                                                                        TransportConstants.DEFAULT_HTTP_REQUIRES_SESSION_ID,
+                                                                        configuration);
+      }
+      else
+      {
+         httpMaxClientIdleTime = 0;
+         httpClientIdleScanPeriod = -1;
+         httpRequiresSessionId = false;
+      }
+
+      httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME,
+                                                                  TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED,
+                                                                  configuration);
+
+      nioRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,
+                                                              -1,
+                                                              configuration);
+
+      useNioGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME,
+                                                                      TransportConstants.DEFAULT_USE_NIO_GLOBAL_WORKER_POOL,
+                                                                      configuration);
+
+      useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME,
+                                                          TransportConstants.DEFAULT_USE_SERVLET,
+                                                          configuration);
+      host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
+                                                   TransportConstants.DEFAULT_HOST,
+                                                   configuration);
+      port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME,
+                                                TransportConstants.DEFAULT_PORT,
+                                                configuration);
+      localAddress = ConfigurationHelper.getStringProperty(TransportConstants.LOCAL_ADDRESS_PROP_NAME,
+                                                           TransportConstants.DEFAULT_LOCAL_ADDRESS,
+                                                           configuration);
+
+      localPort = ConfigurationHelper.getIntProperty(TransportConstants.LOCAL_PORT_PROP_NAME,
+                                                     TransportConstants.DEFAULT_LOCAL_PORT,
+                                                     configuration);
+      if (sslEnabled)
+      {
+         keyStoreProvider = ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME,
+                                                                  TransportConstants.DEFAULT_KEYSTORE_PROVIDER,
+                                                                  configuration);
+
+         keyStorePath = ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME,
+                                                              TransportConstants.DEFAULT_KEYSTORE_PATH,
+                                                              configuration);
+
+         keyStorePassword = ConfigurationHelper.getPasswordProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME,
+                                                                    TransportConstants.DEFAULT_KEYSTORE_PASSWORD,
+                                                                    configuration,
+                                                                    HornetQDefaultConfiguration.getPropMaskPassword(),
+                                                                    HornetQDefaultConfiguration.getPropMaskPassword());
+
+         trustStoreProvider = ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME,
+                                                                    TransportConstants.DEFAULT_TRUSTSTORE_PROVIDER,
+                                                                    configuration);
+
+         trustStorePath = ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME,
+                                                                TransportConstants.DEFAULT_TRUSTSTORE_PATH,
+                                                                configuration);
+
+         trustStorePassword = ConfigurationHelper.getPasswordProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME,
+                                                                      TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD,
+                                                                      configuration,
+                                                                      HornetQDefaultConfiguration.getPropMaskPassword(),
+                                                                      HornetQDefaultConfiguration.getPropMaskPassword());
+
+         enabledCipherSuites = ConfigurationHelper.getStringProperty(TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME,
+                                                                     TransportConstants.DEFAULT_ENABLED_CIPHER_SUITES,
+                                                                     configuration);
+
+         enabledProtocols = ConfigurationHelper.getStringProperty(TransportConstants.ENABLED_PROTOCOLS_PROP_NAME,
+                                                                  TransportConstants.DEFAULT_ENABLED_PROTOCOLS,
+                                                                  configuration);
+      }
+      else
+      {
+         keyStoreProvider = TransportConstants.DEFAULT_KEYSTORE_PROVIDER;
+         keyStorePath = TransportConstants.DEFAULT_KEYSTORE_PATH;
+         keyStorePassword = TransportConstants.DEFAULT_KEYSTORE_PASSWORD;
+         trustStoreProvider = TransportConstants.DEFAULT_TRUSTSTORE_PROVIDER;
+         trustStorePath = TransportConstants.DEFAULT_TRUSTSTORE_PATH;
+         trustStorePassword = TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD;
+         enabledCipherSuites = TransportConstants.DEFAULT_ENABLED_CIPHER_SUITES;
+         enabledProtocols = TransportConstants.DEFAULT_ENABLED_PROTOCOLS;
+      }
+
+      tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME,
+                                                          TransportConstants.DEFAULT_TCP_NODELAY,
+                                                          configuration);
+      tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME,
+                                                             TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE,
+                                                             configuration);
+      tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME,
+                                                                TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
+                                                                configuration);
+
+      batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY,
+                                                       TransportConstants.DEFAULT_BATCH_DELAY,
+                                                       configuration);
+
+      connectTimeoutMillis = ConfigurationHelper.getIntProperty(TransportConstants.NETTY_CONNECT_TIMEOUT,
+                                                                TransportConstants.DEFAULT_NETTY_CONNECT_TIMEOUT,
+                                                                configuration);
+      this.closeExecutor = closeExecutor;
+      this.scheduledThreadPool = scheduledThreadPool;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "NettyConnector [host=" + host +
+         ", port=" +
+         port +
+         ", httpEnabled=" +
+         httpEnabled +
+         ", httpUpgradeEnabled=" +
+         httpUpgradeEnabled +
+         ", useServlet=" +
+         useServlet +
+         ", servletPath=" +
+         servletPath +
+         ", sslEnabled=" +
+         sslEnabled +
+         ", useNio=" +
+         true +
+         "]";
+   }
+
+   public synchronized void start()
+   {
+      if (channelClazz != null)
+      {
+         return;
+      }
+
+      int threadsToUse;
+
+      if (nioRemotingThreads == -1)
+      {
+         // Default to number of cores * 3
+
+         threadsToUse = Runtime.getRuntime().availableProcessors() * 3;
+      }
+      else
+      {
+         threadsToUse = this.nioRemotingThreads;
+      }
+
+
+      if (useNioGlobalWorkerPool)
+      {
+         channelClazz = NioSocketChannel.class;
+         group = SharedNioEventLoopGroup.getInstance(threadsToUse);
+      }
+      else
+      {
+         channelClazz = NioSocketChannel.class;
+         group = new NioEventLoopGroup(threadsToUse);
+      }
+      // if we are a servlet wrap the socketChannelFactory
+
+      bootstrap = new Bootstrap();
+      bootstrap.channel(channelClazz);
+      bootstrap.group(group);
+
+      bootstrap.option(ChannelOption.TCP_NODELAY, tcpNoDelay);
+
+      if (connectTimeoutMillis != -1)
+      {
+         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
+      }
+      if (tcpReceiveBufferSize != -1)
+      {
+         bootstrap.option(ChannelOption.SO_RCVBUF, tcpReceiveBufferSize);
+      }
+      if (tcpSendBufferSize != -1)
+      {
+         bootstrap.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize);
+      }
+      bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+      bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+      bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
+      channelGroup = new DefaultChannelGroup("hornetq-connector", GlobalEventExecutor.INSTANCE);
+
+      final SSLContext context;
+      if (sslEnabled)
+      {
+         try
+         {
+            // HORNETQ-680 - override the server-side config if client-side system properties are set
+            String realKeyStorePath = keyStorePath;
+            String realKeyStoreProvider = keyStoreProvider;
+            String realKeyStorePassword = keyStorePassword;
+            if (System.getProperty(JAVAX_KEYSTORE_PATH_PROP_NAME) != null)
+            {
+               realKeyStorePath = System.getProperty(JAVAX_KEYSTORE_PATH_PROP_NAME);
+            }
+            if (System.getProperty(JAVAX_KEYSTORE_PASSWORD_PROP_NAME) != null)
+            {
+               realKeyStorePassword = System.getProperty(JAVAX_KEYSTORE_PASSWORD_PROP_NAME);
+            }
+
+            if (System.getProperty(HORNETQ_KEYSTORE_PROVIDER_PROP_NAME) != null)
+            {
+               realKeyStoreProvider = System.getProperty(HORNETQ_KEYSTORE_PROVIDER_PROP_NAME);
+            }
+            if (System.getProperty(HORNETQ_KEYSTORE_PATH_PROP_NAME) != null)
+            {
+               realKeyStorePath = System.getProperty(HORNETQ_KEYSTORE_PATH_PROP_NAME);
+            }
+            if (System.getProperty(HORNETQ_KEYSTORE_PASSWORD_PROP_NAME) != null)
+            {
+               realKeyStorePassword = System.getProperty(HORNETQ_KEYSTORE_PASSWORD_PROP_NAME);
+            }
+
+            String realTrustStorePath = trustStorePath;
+            String realTrustStoreProvider = trustStoreProvider;
+            String realTrustStorePassword = trustStorePassword;
+            if (System.getProperty(JAVAX_TRUSTSTORE_PATH_PROP_NAME) != null)
+            {
+               realTrustStorePath = System.getProperty(JAVAX_TRUSTSTORE_PATH_PROP_NAME);
+            }
+            if (System.getProperty(JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME) != null)
+            {
+               realTrustStorePassword = System.getProperty(JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME);
+            }
+
+            if (System.getProperty(HORNETQ_TRUSTSTORE_PROVIDER_PROP_NAME) != null)
+            {
+               realTrustStoreProvider = System.getProperty(HORNETQ_TRUSTSTORE_PROVIDER_PROP_NAME);
+            }
+            if (System.getProperty(HORNETQ_TRUSTSTORE_PATH_PROP_NAME) != null)
+            {
+               realTrustStorePath = System.getProperty(HORNETQ_TRUSTSTORE_PATH_PROP_NAME);
+            }
+            if (System.getProperty(HORNETQ_TRUSTSTORE_PASSWORD_PROP_NAME) != null)
+            {
+               realTrustStorePassword = System.getProperty(HORNETQ_TRUSTSTORE_PASSWORD_PROP_NAME);
+            }
+            context = SSLSupport.createContext(realKeyStoreProvider, realKeyStorePath, realKeyStorePassword, realTrustStoreProvider, realTrustStorePath, realTrustStorePassword);
+         }
+         catch (Exception e)
+         {
+            close();
+            IllegalStateException ise = new IllegalStateException("Unable to create NettyConnector for " + host + ":" + port);
+            ise.initCause(e);
+            throw ise;
+         }
+      }
+      else
+      {
+         context = null; // Unused
+      }
+
+      if (context != null && useServlet)
+      {
+         // TODO: Fix me
+         //bootstrap.setOption("sslContext", context);
+      }
+
+      bootstrap.handler(new ChannelInitializer<Channel>()
+      {
+         public void initChannel(Channel channel) throws Exception
+         {
+            final ChannelPipeline pipeline = channel.pipeline();
+            if (sslEnabled && !useServlet)
+            {
+               SSLEngine engine = context.createSSLEngine();
+
+               engine.setUseClientMode(true);
+
+               engine.setWantClientAuth(true);
+
+               // setting the enabled cipher suites resets the enabled protocols so we need
+               // to save the enabled protocols so that after the customer cipher suite is enabled
+               // we can reset the enabled protocols if a customer protocol isn't specified
+               String[] originalProtocols = engine.getEnabledProtocols();
+
+               if (enabledCipherSuites != null)
+               {
+                  try
+                  {
+                     engine.setEnabledCipherSuites(SSLSupport.parseCommaSeparatedListIntoArray(enabledCipherSuites));
+                  }
+                  catch (IllegalArgumentException e)
+                  {
+                     HornetQClientLogger.LOGGER.invalidCipherSuite(SSLSupport.parseArrayIntoCommandSeparatedList(engine.getSupportedCipherSuites()));
+                     throw e;
+                  }
+               }
+
+               if (enabledProtocols != null)
+               {
+                  try
+                  {
+                     engine.setEnabledProtocols(SSLSupport.parseCommaSeparatedListIntoArray(enabledProtocols));
+                  }
+                  catch (IllegalArgumentException e)
+                  {
+                     HornetQClientLogger.LOGGER.invalidProtocol(SSLSupport.parseArrayIntoCommandSeparatedList(engine.getSupportedProtocols()));
+                     throw e;
+                  }
+               }
+               else
+               {
+                  engine.setEnabledProtocols(originalProtocols);
+               }
+
+               SslHandler handler = new SslHandler(engine);
+
+               pipeline.addLast(handler);
+            }
+
+            if (httpEnabled)
+            {
+               pipeline.addLast(new HttpRequestEncoder());
+
+               pipeline.addLast(new HttpResponseDecoder());
+
+               pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
+
+               pipeline.addLast(new HttpHandler());
+            }
+
+            if (httpUpgradeEnabled)
+            {
+               // prepare to handle a HTTP 101 response to upgrade the protocol.
+               final HttpClientCodec httpClientCodec = new HttpClientCodec();
+               pipeline.addLast(httpClientCodec);
+               pipeline.addLast("http-upgrade", new HttpUpgradeHandler(pipeline, httpClientCodec));
+            }
+
+            protocolManager.addChannelHandlers(pipeline);
+
+            pipeline.addLast(new HornetQClientChannelHandler(channelGroup, handler, new Listener()));
+         }
+      });
+
+      if (batchDelay > 0)
+      {
+         flusher = new BatchFlusher();
+
+         batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
+      }
+
+      HornetQClientLogger.LOGGER.debug("Started Netty Connector version " + TransportConstants.NETTY_VERSION);
+   }
+
+   public synchronized void close()
+   {
+      if (channelClazz == null)
+      {
+         return;
+      }
+
+      if (batchFlusherFuture != null)
+      {
+         batchFlusherFuture.cancel(false);
+
+         flusher.cancel();
+
+         flusher = null;
+
+         batchFlusherFuture = null;
+      }
+
+      bootstrap = null;
+      channelGroup.close().awaitUninterruptibly();
+
+      // Shutdown the EventLoopGroup if no new task was added for 100ms or if
+      // 3000ms elapsed.
+      group.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS);
+
+      channelClazz = null;
+
+      for (Connection connection : connections.values())
+      {
+         listener.connectionDestroyed(connection.getID());
+      }
+
+      connections.clear();
+   }
+
+   public boolean isStarted()
+   {
+      return channelClazz != null;
+   }
+
+   public Connection createConnection()
+   {
+      if (channelClazz == null)
+      {
+         return null;
+      }
+
+      // HORNETQ-907 - strip off IPv6 scope-id (if necessary)
+      SocketAddress remoteDestination = new InetSocketAddress(host, port);
+      InetAddress inetAddress = ((InetSocketAddress) remoteDestination).getAddress();
+      if (inetAddress instanceof Inet6Address)
+      {
+         Inet6Address inet6Address = (Inet6Address) inetAddress;
+         if (inet6Address.getScopeId() != 0)
+         {
+            try
+            {
+               remoteDestination = new InetSocketAddress(InetAddress.getByAddress(inet6Address.getAddress()), ((InetSocketAddress) remoteDestination).getPort());
+            }
+            catch (UnknownHostException e)
+            {
+               throw new IllegalArgumentException(e.getMessage());
+            }
+         }
+      }
+
+      HornetQClientLogger.LOGGER.debug("Remote destination: " + remoteDestination);
+
+      ChannelFuture future;
+      //port 0 does not work so only use local address if set
+      if (localPort != 0)
+      {
+         SocketAddress localDestination;
+         if (localAddress != null)
+         {
+            localDestination = new InetSocketAddress(localAddress, localPort);
+         }
+         else
+         {
+            localDestination = new InetSocketAddress(localPort);
+         }
+         future = bootstrap.connect(remoteDestination, localDestination);
+      }
+      else
+      {
+         future = bootstrap.connect(remoteDestination);
+      }
+
+      future.awaitUninterruptibly();
+
+      if (future.isSuccess())
+      {
+         final Channel ch = future.channel();
+         SslHandler sslHandler = ch.pipeline().get(SslHandler.class);
+         if (sslHandler != null)
+         {
+            Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
+            if (handshakeFuture.awaitUninterruptibly(30000))
+            {
+               if (handshakeFuture.isSuccess())
+               {
+                  ChannelPipeline channelPipeline = ch.pipeline();
+                  HornetQChannelHandler channelHandler = channelPipeline.get(HornetQChannelHandler.class);
+                  channelHandler.active = true;
+               }
+               else
+               {
+                  ch.close().awaitUninterruptibly();
+                  HornetQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause());
+                  return null;
+               }
+            }
+            else
+            {
+               //handshakeFuture.setFailure(new SSLException("Handshake was not completed in 30 seconds"));
+               ch.close().awaitUninterruptibly();
+               return null;
+            }
+
+         }
+         if (httpUpgradeEnabled)
+         {
+            // Send a HTTP GET + Upgrade request that will be handled by the http-upgrade handler.
+            try
+            {
+               //get this first incase it removes itself
+               HttpUpgradeHandler httpUpgradeHandler = (HttpUpgradeHandler) ch.pipeline().get("http-upgrade");
+               URI uri = new URI("http", null, host, port, null, null, null);
+               HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
+               request.headers().set(HttpHeaders.Names.HOST, host);
+               request.headers().set(HttpHeaders.Names.UPGRADE, HORNETQ_REMOTING);
+               request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE);
+
+               final String endpoint = ConfigurationHelper.getStringProperty(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME,
+                                                                             null,
+                                                                             configuration);
+               if (endpoint != null)
+               {
+                  request.headers().set(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, endpoint);
+               }
+
+               // Get 16 bit nonce and base 64 encode it
+               byte[] nonce = randomBytes(16);
+               String key = base64(nonce);
+               request.headers().set(SEC_HORNETQ_REMOTING_KEY, key);
+               ch.attr(REMOTING_KEY).set(key);
+
+               HornetQClientLogger.LOGGER.debugf("Sending HTTP request %s", request);
+
+               // Send the HTTP request.
+               ch.writeAndFlush(request);
+
+               if (!httpUpgradeHandler.awaitHandshake())
+               {
+                  return null;
+               }
+            }
+            catch (URISyntaxException e)
+            {
+               HornetQClientLogger.LOGGER.errorCreatingNettyConnection(e);
+               return null;
+            }
+         }
+         else
+         {
+            ChannelPipeline channelPipeline = ch.pipeline();
+            HornetQChannelHandler channelHandler = channelPipeline.get(HornetQChannelHandler.class);
+            channelHandler.active = true;
+         }
+
+         // No acceptor on a client connection
+         Listener connectionListener = new Listener();
+         NettyConnection conn = new NettyConnection(configuration, ch, connectionListener, !httpEnabled && batchDelay > 0, false);
+         connectionListener.connectionCreated(null, conn, protocolManager.getName());
+         return conn;
+      }
+      else
+      {
+         Throwable t = future.cause();
+
+         if (t != null && !(t instanceof ConnectException))
+         {
+            HornetQClientLogger.LOGGER.errorCreatingNettyConnection(future.cause());
+         }
+
+         return null;
+      }
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private static final class HornetQClientChannelHandler extends HornetQChannelHandler
+   {
+      HornetQClientChannelHandler(final ChannelGroup group,
+                                  final BufferHandler handler,
+                                  final ConnectionLifeCycleListener listener)
+      {
+         super(group, handler, listener);
+      }
+   }
+
+   private static class HttpUpgradeHandler extends SimpleChannelInboundHandler<HttpObject>
+   {
+      private final ChannelPipeline pipeline;
+      private final HttpClientCodec httpClientCodec;
+      private final CountDownLatch latch = new CountDownLatch(1);
+      private boolean handshakeComplete = false;
+
+      public HttpUpgradeHandler(ChannelPipeline pipeline, HttpClientCodec httpClientCodec)
+      {
+         this.pipeline = pipeline;
+         this.httpClientCodec = httpClientCodec;
+      }
+
+      @Override
+      public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
+      {
+         if (msg instanceof HttpResponse)
+         {
+            HttpResponse response = (HttpResponse) msg;
+            if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code()
+               && response.headers().get(HttpHeaders.Names.UPGRADE).equals(HORNETQ_REMOTING))
+            {
+               String accept = response.headers().get(SEC_HORNETQ_REMOTING_ACCEPT);
+               String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
+
+               if (expectedResponse.equals(accept))
+               {
+                  // remove the http handlers and flag the hornetq channel handler as active
+                  pipeline.remove(httpClientCodec);
+                  pipeline.remove(this);
+                  handshakeComplete = true;
+                  HornetQChannelHandler channelHandler = pipeline.get(HornetQChannelHandler.class);
+                  channelHandler.active = true;
+               }
+               else
+               {
+                  HornetQClientLogger.LOGGER.httpHandshakeFailed(accept, expectedResponse);
+                  ctx.close();
+               }
+            }
+            else if (response.getStatus().code() == HttpResponseStatus.FORBIDDEN.code())
+            {
+               HornetQClientLogger.LOGGER.httpUpgradeNotSupportedByRemoteAcceptor();
+               ctx.close();
+            }
+            latch.countDown();
+         }
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
+      {
+         HornetQClientLogger.LOGGER.errorCreatingNettyConnection(cause);
+         ctx.close();
+      }
+
+      public boolean awaitHandshake()
+      {
+         try
+         {
+            if (!latch.await(30000, TimeUnit.MILLISECONDS))
+            {
+               return false;
+            }
+         }
+         catch (InterruptedException e)
+         {
+            return false;
+         }
+         return handshakeComplete;
+      }
+   }
+
+   class HttpHandler extends ChannelDuplexHandler
+   {
+      private Channel channel;
+
+      private long lastSendTime = 0;
+
+      private boolean waitingGet = false;
+
+      private HttpIdleTimer task;
+
+      private final String url;
+
+      private final FutureLatch handShakeFuture = new FutureLatch();
+
+      private boolean active = false;
+
+      private boolean handshaking = false;
+
+      private String cookie;
+
+      public HttpHandler() throws Exception
+      {
+         url = new URI("http", null, host, port, servletPath, null, null).toString();
+      }
+
+      @Override
+      public void channelActive(final ChannelHandlerContext ctx) throws Exception
+      {
+         super.channelActive(ctx);
+         channel = ctx.channel();
+         if (httpClientIdleScanPeriod > 0)
+         {
+            task = new HttpIdleTimer();
+            java.util.concurrent.Future<?> future = scheduledThreadPool.scheduleAtFixedRate(task,
+                                                                                            httpClientIdleScanPeriod,
+                                                                                            httpClientIdleScanPeriod,
+                                                                                            TimeUnit.MILLISECONDS);
+            task.setFuture(future);
+         }
+      }
+
+      @Override
+      public void channelInactive(final ChannelHandlerContext ctx) throws Exception
+      {
+         if (task != null)
+         {
+            task.close();
+         }
+
+         super.channelInactive(ctx);
+      }
+
+      @Override
+      public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
+      {
+         FullHttpResponse response = (FullHttpResponse) msg;
+         if (httpRequiresSessionId && !active)
+         {
+            Set<Cookie> cookieMap = CookieDecoder.decode(response.headers().get(HttpHeaders.Names.SET_COOKIE));
+            for (Cookie cookie : cookieMap)
+            {
+               if (cookie.getName().equals("JSESSIONID"))
+               {
+                  this.cookie = ClientCookieEncoder.encode(cookie);
+               }
+            }
+            active = true;
+            handShakeFuture.run();
+         }
+         waitingGet = false;
+         ctx.fireChannelRead(response.content());
+      }
+
+      @Override
+      public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception
+      {
+         if (msg instanceof ByteBuf)
+         {
+            if (httpRequiresSessionId && !active)
+            {
+               if (handshaking)
+               {
+                  handshaking = true;
+               }
+               else
+               {
+                  if (!handShakeFuture.await(5000))
+                  {
+                     throw new RuntimeException("Handshake failed after timeout");
+                  }
+               }
+            }
+
+            ByteBuf buf = (ByteBuf) msg;
+            FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, url, buf);
+            httpRequest.headers().add(HttpHeaders.Names.HOST, NettyConnector.this.host);
+            if (cookie != null)
+            {
+               httpRequest.headers().add(HttpHeaders.Names.COOKIE, cookie);
+            }
+            httpRequest.headers().add(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.readableBytes()));
+            ctx.write(httpRequest, promise);
+            lastSendTime = System.currentTimeMillis();
+         }
+         else
+         {
+            ctx.write(msg, promise);
+            lastSendTime = System.currentTimeMillis();
+         }
+      }
+
+      private class HttpIdleTimer implements Runnable
+      {
+         private boolean closed = false;
+
+         private java.util.concurrent.Future<?> future;
+
+         public synchronized void run()
+         {
+            if (closed)
+            {
+               return;
+            }
+
+            if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime)
+            {
+               FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
+               httpRequest.headers().add(HttpHeaders.Names.HOST, NettyConnector.this.host);
+               waitingGet = true;
+               channel.writeAndFlush(httpRequest);
+            }
+         }
+
+         public synchronized void setFuture(final java.util.concurrent.Future<?> future)
+         {
+            this.future = future;
+         }
+
+         public void close()
+         {
+            if (future != null)
+            {
+               future.cancel(false);
+            }
+
+            closed = true;
+         }
+      }
+   }
+
+   private class Listener implements ConnectionLifeCycleListener
+   {
+      public void connectionCreated(final HornetQComponent component, final Connection connection, final String protocol)
+      {
+         if (connections.putIfAbsent(connection.getID(), connection) != null)
+         {
+            throw HornetQClientMessageBundle.BUNDLE.connectionExists(connection.getID());
+         }
+      }
+
+      public void connectionDestroyed(final Object connectionID)
+      {
+         if (connections.remove(connectionID) != null)
+         {
+            // Execute on different thread to avoid deadlocks
+            closeExecutor.execute(new Runnable()
+            {
+               public void run()
+               {
+                  listener.connectionDestroyed(connectionID);
+               }
+            });
+         }
+      }
+
+      public void connectionException(final Object connectionID, final HornetQException me)
+      {
+         // Execute on different thread to avoid deadlocks
+         closeExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               listener.connectionException(connectionID, me);
+            }
+         });
+      }
+
+      public void connectionReadyForWrites(Object connectionID, boolean ready)
+      {
+      }
+
+
+   }
+
+   private class BatchFlusher implements Runnable
+   {
+      private boolean cancelled;
+
+      public synchronized void run()
+      {
+         if (!cancelled)
+         {
+            for (Connection connection : connections.values())
+            {
+               connection.checkFlushBatchBuffer();
+            }
+         }
+      }
+
+      public synchronized void cancel()
+      {
+         cancelled = true;
+      }
+   }
+
+   public boolean isEquivalent(Map<String, Object> configuration)
+   {
+      //here we only check host and port because these two parameters
+      //is sufficient to determine the target host
+      String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
+                                                          TransportConstants.DEFAULT_HOST,
+                                                          configuration);
+      Integer port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME,
+                                                        TransportConstants.DEFAULT_PORT,
+                                                        configuration);
+
+      if (!port.equals(this.port)) return false;
+
+      if (host.equals(this.host)) return true;
+
+      //The host may be an alias. We need to compare raw IP address.
+      boolean result = false;
+      try
+      {
+         InetAddress inetAddr1 = InetAddress.getByName(host);
+         InetAddress inetAddr2 = InetAddress.getByName(this.host);
+         String ip1 = inetAddr1.getHostAddress();
+         String ip2 = inetAddr2.getHostAddress();
+         HornetQClientLogger.LOGGER.debug(this + " host 1: " + host + " ip address: " + ip1 + " host 2: " + this.host + " ip address: " + ip2);
+
+         result = ip1.equals(ip2);
+      }
+      catch (UnknownHostException e)
+      {
+         HornetQClientLogger.LOGGER.error("Cannot resolve host", e);
+      }
+
+      return result;
+   }
+
+   public void finalize() throws Throwable
+   {
+      close();
+      super.finalize();
+   }
+
+   //for test purpose only
+   public Bootstrap getBootStrap()
+   {
+      return bootstrap;
+   }
+
+   public static void clearThreadPools()
+   {
+      SharedNioEventLoopGroup.forceShutdown();
+   }
+
+   private static ClassLoader getThisClassLoader()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return ClientSessionFactoryImpl.class.getClassLoader();
+         }
+      });
+
+   }
+
+   private static String base64(byte[] data)
+   {
+      ByteBuf encodedData = Unpooled.wrappedBuffer(data);
+      ByteBuf encoded = Base64.encode(encodedData);
+      String encodedString = encoded.toString(StandardCharsets.UTF_8);
+      encoded.release();
+      return encodedString;
+   }
+
+   /**
+    * Creates an arbitrary number of random bytes
+    *
+    * @param size the number of random bytes to create
+    * @return An array of random bytes
+    */
+   private static byte[] randomBytes(int size)
+   {
+      byte[] bytes = new byte[size];
+
+      for (int index = 0; index < size; index++)
+      {
+         bytes[index] = (byte) randomNumber(0, 255);
+      }
+
+      return bytes;
+   }
+
+   private static int randomNumber(int minimum, int maximum)
+   {
+      return (int) (Math.random() * maximum + minimum);
+   }
+
+   public static String createExpectedResponse(final String magicNumber, final String secretKey) throws IOException
+   {
+      try
+      {
+         final String concat = secretKey + magicNumber;
+         final MessageDigest digest = MessageDigest.getInstance("SHA1");
+
+         digest.update(concat.getBytes(StandardCharsets.UTF_8));
+         final byte[] bytes = digest.digest();
+         return encodeBytes(bytes);
+      }
+      catch (NoSuchAlgorithmException e)
+      {
+         throw new IOException(e);
+      }
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnectorFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnectorFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnectorFactory.java
new file mode 100644
index 0000000..e2c31b6
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnectorFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting.impl.netty;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.activemq6.spi.core.remoting.BufferHandler;
+import org.apache.activemq6.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq6.spi.core.remoting.ConnectionLifeCycleListener;
+import org.apache.activemq6.spi.core.remoting.Connector;
+import org.apache.activemq6.spi.core.remoting.ConnectorFactory;
+
+/**
+ * A NettyConnectorFactory
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
+ */
+public class NettyConnectorFactory implements ConnectorFactory
+{
+   public Connector createConnector(final Map<String, Object> configuration,
+                                    final BufferHandler handler,
+                                    final ConnectionLifeCycleListener listener,
+                                    final Executor closeExecutor,
+                                    final Executor threadPool,
+                                    final ScheduledExecutorService scheduledThreadPool,
+                                    final ClientProtocolManager protocolManager)
+   {
+      return new NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
+   }
+
+   public Set<String> getAllowableProperties()
+   {
+      return TransportConstants.ALLOWABLE_CONNECTOR_KEYS;
+   }
+
+   @Override
+   public boolean isReliable()
+   {
+      return false;
+   }
+
+   @Override
+   public Map<String, Object> getDefaults()
+   {
+      return NettyConnector.DEFAULT_CONFIG;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/PartialPooledByteBufAllocator.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
new file mode 100644
index 0000000..6c481c6
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting.impl.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+
+/**
+ * A {@link ByteBufAllocator} which is partial pooled. Which means only direct {@link ByteBuf}s are pooled. The rest
+ * is unpooled.
+ *
+ * @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
+ */
+public class PartialPooledByteBufAllocator implements ByteBufAllocator
+{
+   private static final ByteBufAllocator POOLED = new PooledByteBufAllocator(false);
+   private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false);
+
+   public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator();
+
+   private PartialPooledByteBufAllocator()
+   {
+   }
+
+   @Override
+   public ByteBuf buffer()
+   {
+      return UNPOOLED.heapBuffer();
+   }
+
+   @Override
+   public ByteBuf buffer(int initialCapacity)
+   {
+      return UNPOOLED.heapBuffer(initialCapacity);
+   }
+
+   @Override
+   public ByteBuf buffer(int initialCapacity, int maxCapacity)
+   {
+      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+   }
+
+   @Override
+   public ByteBuf ioBuffer()
+   {
+      return UNPOOLED.heapBuffer();
+   }
+
+   @Override
+   public ByteBuf ioBuffer(int initialCapacity)
+   {
+      return UNPOOLED.heapBuffer(initialCapacity);
+   }
+
+   @Override
+   public ByteBuf ioBuffer(int initialCapacity, int maxCapacity)
+   {
+      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+   }
+
+   @Override
+   public ByteBuf heapBuffer()
+   {
+      return UNPOOLED.heapBuffer();
+   }
+
+   @Override
+   public ByteBuf heapBuffer(int initialCapacity)
+   {
+      return UNPOOLED.heapBuffer(initialCapacity);
+   }
+
+   @Override
+   public ByteBuf heapBuffer(int initialCapacity, int maxCapacity)
+   {
+      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+   }
+
+   @Override
+   public ByteBuf directBuffer()
+   {
+      return POOLED.directBuffer();
+   }
+
+   @Override
+   public ByteBuf directBuffer(int initialCapacity)
+   {
+      return POOLED.directBuffer(initialCapacity);
+   }
+
+   @Override
+   public ByteBuf directBuffer(int initialCapacity, int maxCapacity)
+   {
+      return POOLED.directBuffer(initialCapacity, maxCapacity);
+   }
+
+   @Override
+   public CompositeByteBuf compositeBuffer()
+   {
+      return UNPOOLED.compositeHeapBuffer();
+   }
+
+   @Override
+   public CompositeByteBuf compositeBuffer(int maxNumComponents)
+   {
+      return UNPOOLED.compositeHeapBuffer(maxNumComponents);
+   }
+
+   @Override
+   public CompositeByteBuf compositeHeapBuffer()
+   {
+      return UNPOOLED.compositeHeapBuffer();
+   }
+
+   @Override
+   public CompositeByteBuf compositeHeapBuffer(int maxNumComponents)
+   {
+      return UNPOOLED.compositeHeapBuffer(maxNumComponents);
+   }
+
+   @Override
+   public CompositeByteBuf compositeDirectBuffer()
+   {
+      return POOLED.compositeDirectBuffer();
+   }
+
+   @Override
+   public CompositeByteBuf compositeDirectBuffer(int maxNumComponents)
+   {
+      return POOLED.compositeDirectBuffer();
+   }
+
+   @Override
+   public boolean isDirectBufferPooled()
+   {
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/SharedNioEventLoopGroup.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/SharedNioEventLoopGroup.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/SharedNioEventLoopGroup.java
new file mode 100644
index 0000000..f7ed135
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/SharedNioEventLoopGroup.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting.impl.netty;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.Promise;
+import org.apache.activemq6.core.client.impl.ClientSessionFactoryImpl;
+import org.apache.activemq6.utils.HornetQThreadFactory;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author <a href="nmaurer@redhat.com">Norman Maurer</a>
+ */
+public class SharedNioEventLoopGroup extends NioEventLoopGroup
+{
+   private static SharedNioEventLoopGroup instance;
+
+   private final AtomicReference<ScheduledFuture<?>> shutdown = new AtomicReference<ScheduledFuture<?>>();
+   private final AtomicLong nioChannelFactoryCount = new AtomicLong();
+   private final Promise<?> terminationPromise = ImmediateEventExecutor.INSTANCE.newPromise();
+
+   private SharedNioEventLoopGroup(int numThreads, ThreadFactory factory)
+   {
+      super(numThreads, factory);
+   }
+
+   private static ClassLoader getThisClassLoader()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return ClientSessionFactoryImpl.class.getClassLoader();
+         }
+      });
+   }
+
+   public static synchronized void forceShutdown()
+   {
+      if (instance != null)
+      {
+         instance.shutdown();
+         instance.nioChannelFactoryCount.set(0);
+         instance = null;
+      }
+   }
+
+   public static synchronized SharedNioEventLoopGroup getInstance(int numThreads)
+   {
+      if (instance != null)
+      {
+         ScheduledFuture f = instance.shutdown.getAndSet(null);
+         if (f != null)
+         {
+            f.cancel(false);
+         }
+      }
+      else
+      {
+         instance = new SharedNioEventLoopGroup(numThreads, new HornetQThreadFactory("HornetQ-client-netty-threads", true, getThisClassLoader()));
+      }
+      instance.nioChannelFactoryCount.incrementAndGet();
+      return instance;
+   }
+
+   @Override
+   public Future<?> terminationFuture()
+   {
+      return terminationPromise;
+   }
+
+   @Override
+   public Future<?> shutdownGracefully()
+   {
+      return shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS);
+   }
+
+   @Override
+   public Future<?> shutdownGracefully(final long l, final long l2, final TimeUnit timeUnit)
+   {
+      if (nioChannelFactoryCount.decrementAndGet() == 0)
+      {
+         shutdown.compareAndSet(null, next().scheduleAtFixedRate(new Runnable()
+         {
+            @Override
+            public void run()
+            {
+               synchronized (SharedNioEventLoopGroup.class)
+               {
+                  if (shutdown.get() != null)
+                  {
+                     Future<?> future = SharedNioEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit);
+                     future.addListener(new FutureListener<Object>()
+                     {
+                        @Override
+                        public void operationComplete(Future future) throws Exception
+                        {
+                           if (future.isSuccess())
+                           {
+                              terminationPromise.setSuccess(null);
+                           }
+                           else
+                           {
+                              terminationPromise.setFailure(future.cause());
+                           }
+                        }
+                     });
+                     instance = null;
+                  }
+               }
+            }
+
+         }, 10, 10, TimeUnit.SECONDS));
+      }
+      return terminationPromise;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/TransportConstants.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/TransportConstants.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/TransportConstants.java
new file mode 100644
index 0000000..990e99e
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/TransportConstants.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting.impl.netty;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import io.netty.util.Version;
+import org.apache.activemq6.api.config.HornetQDefaultConfiguration;
+
+/**
+ * A TransportConstants
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class TransportConstants
+{
+   public static final String SSL_ENABLED_PROP_NAME = "ssl-enabled";
+
+   public static final String HTTP_ENABLED_PROP_NAME = "http-enabled";
+
+   public static final String HTTP_CLIENT_IDLE_PROP_NAME = "http-client-idle-time";
+
+   public static final String HTTP_CLIENT_IDLE_SCAN_PERIOD = "http-client-idle-scan-period";
+
+   public static final String HTTP_RESPONSE_TIME_PROP_NAME = "http-response-time";
+
+   public static final String HTTP_SERVER_SCAN_PERIOD_PROP_NAME = "http-server-scan-period";
+
+   public static final String HTTP_REQUIRES_SESSION_ID = "http-requires-session-id";
+
+   public static final String HTTP_UPGRADE_ENABLED_PROP_NAME = "http-upgrade-enabled";
+
+   public static final String HTTP_UPGRADE_ENDPOINT_PROP_NAME = "http-upgrade-endpoint";
+
+   public static final String USE_SERVLET_PROP_NAME = "use-servlet";
+
+   public static final String SERVLET_PATH = "servlet-path";
+
+   public static final String USE_NIO_PROP_NAME = "use-nio";
+
+   public static final String USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME = "use-nio-global-worker-pool";
+
+   public static final String USE_INVM_PROP_NAME = "use-invm";
+
+   public static final String PROTOCOL_PROP_NAME = "protocol";
+
+   public static final String PROTOCOLS_PROP_NAME = "protocols";
+
+   public static final String HOST_PROP_NAME = "host";
+
+   public static final String PORT_PROP_NAME = "port";
+
+   public static final String LOCAL_ADDRESS_PROP_NAME = "local-address";
+
+   public static final String LOCAL_PORT_PROP_NAME = "local-port";
+
+   public static final String KEYSTORE_PROVIDER_PROP_NAME = "key-store-provider";
+
+   public static final String KEYSTORE_PATH_PROP_NAME = "key-store-path";
+
+   public static final String KEYSTORE_PASSWORD_PROP_NAME = "key-store-password";
+
+   public static final String TRUSTSTORE_PROVIDER_PROP_NAME = "trust-store-provider";
+
+   public static final String TRUSTSTORE_PATH_PROP_NAME = "trust-store-path";
+
+   public static final String TRUSTSTORE_PASSWORD_PROP_NAME = "trust-store-password";
+
+   public static final String ENABLED_CIPHER_SUITES_PROP_NAME = "enabled-cipher-suites";
+
+   public static final String ENABLED_PROTOCOLS_PROP_NAME = "enabled-protocols";
+
+   public static final String NEED_CLIENT_AUTH_PROP_NAME = "need-client-auth";
+
+   public static final String BACKLOG_PROP_NAME = "backlog";
+
+   public static final String NETTY_VERSION;
+
+   /**
+    * Disable Nagle's algorithm.<br>
+    * Valid for (client) Sockets.
+    *
+    * @see <a
+    * href="http://design.jboss.org/jbossorg/branding/Javadocs/doc/api/org/jboss/netty/channel/socket/SocketChannelConfig.html#setTcpNoDelay%28boolean%29">
+    * Netty note on this option</a>
+    * @see <a href="http://docs.oracle.com/javase/6/docs/technotes/guides/net/socketOpt.html">Oracle
+    * doc on tcpNoDelay</a>
+    */
+   public static final String TCP_NODELAY_PROPNAME = "tcp-no-delay";
+
+   public static final String TCP_SENDBUFFER_SIZE_PROPNAME = "tcp-send-buffer-size";
+
+   public static final String TCP_RECEIVEBUFFER_SIZE_PROPNAME = "tcp-receive-buffer-size";
+
+   public static final String NIO_REMOTING_THREADS_PROPNAME = "nio-remoting-threads";
+
+   public static final String BATCH_DELAY = "batch-delay";
+
+   public static final String DIRECT_DELIVER = "direct-deliver";
+
+   public static final String CLUSTER_CONNECTION = "cluster-connection";
+
+   public static final String STOMP_CONSUMERS_CREDIT = "stomp-consumer-credits";
+
+   public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K
+
+   public static final boolean DEFAULT_SSL_ENABLED = false;
+
+   public static final boolean DEFAULT_USE_NIO_GLOBAL_WORKER_POOL = true;
+
+   public static final boolean DEFAULT_USE_INVM = false;
+
+   public static final boolean DEFAULT_USE_SERVLET = false;
+
+   public static final String DEFAULT_HOST = "localhost";
+
+   public static final int DEFAULT_PORT = 5445;
+
+   public static final String DEFAULT_LOCAL_ADDRESS = null;
+
+   public static final int DEFAULT_LOCAL_PORT = 0;
+
+   public static final int DEFAULT_STOMP_PORT = 61613;
+
+   public static final String DEFAULT_KEYSTORE_PROVIDER = "JKS";
+
+   public static final String DEFAULT_KEYSTORE_PATH = null;
+
+   public static final String DEFAULT_KEYSTORE_PASSWORD = null;
+
+   public static final String DEFAULT_TRUSTSTORE_PROVIDER = "JKS";
+
+   public static final String DEFAULT_TRUSTSTORE_PATH = null;
+
+   public static final String DEFAULT_TRUSTSTORE_PASSWORD = null;
+
+   public static final String DEFAULT_ENABLED_CIPHER_SUITES = null;
+
+   public static final String DEFAULT_ENABLED_PROTOCOLS = null;
+
+   public static final boolean DEFAULT_NEED_CLIENT_AUTH = false;
+
+   public static final boolean DEFAULT_TCP_NODELAY = true;
+
+   public static final int DEFAULT_TCP_SENDBUFFER_SIZE = 32768;
+
+   public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
+
+   public static final boolean DEFAULT_HTTP_ENABLED = false;
+
+   public static final long DEFAULT_HTTP_CLIENT_IDLE_TIME = 500;
+
+   public static final long DEFAULT_HTTP_CLIENT_SCAN_PERIOD = 500;
+
+   public static final long DEFAULT_HTTP_RESPONSE_TIME = 10000;
+
+   public static final long DEFAULT_HTTP_SERVER_SCAN_PERIOD = 5000;
+
+   public static final boolean DEFAULT_HTTP_REQUIRES_SESSION_ID = false;
+
+   public static final boolean DEFAULT_HTTP_UPGRADE_ENABLED = false;
+
+   public static final String DEFAULT_SERVLET_PATH = "/messaging/HornetQServlet";
+
+   public static final long DEFAULT_BATCH_DELAY = 0;
+
+   public static final boolean DEFAULT_DIRECT_DELIVER = true;
+
+   public static final Set<String> ALLOWABLE_CONNECTOR_KEYS;
+
+   public static final Set<String> ALLOWABLE_ACCEPTOR_KEYS;
+
+   public static final String CONNECTION_TTL = "connection-ttl";
+
+   public static final String STOMP_ENABLE_MESSAGE_ID = "stomp-enable-message-id";
+
+   public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size";
+
+   public static final String NETTY_CONNECT_TIMEOUT = "connect-timeout-millis";
+
+   public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1;
+
+   static
+   {
+      Set<String> allowableAcceptorKeys = new HashSet<String>();
+      allowableAcceptorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.PROTOCOLS_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.HOST_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.PORT_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PATH_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.TRUSTSTORE_PATH_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.ENABLED_PROTOCOLS_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME);
+      allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
+      allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
+      allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
+      allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
+      allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
+      allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
+      allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
+      allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
+      allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
+      allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID);
+      allowableAcceptorKeys.add(HornetQDefaultConfiguration.getPropMaskPassword());
+      allowableAcceptorKeys.add(HornetQDefaultConfiguration.getPropPasswordCodec());
+
+      ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
+
+      Set<String> allowableConnectorKeys = new HashSet<String>();
+      allowableConnectorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.HTTP_ENABLED_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD);
+      allowableConnectorKeys.add(TransportConstants.HTTP_REQUIRES_SESSION_ID);
+      allowableConnectorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.USE_SERVLET_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.SERVLET_PATH);
+      allowableConnectorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.LOCAL_ADDRESS_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.LOCAL_PORT_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.KEYSTORE_PATH_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.TRUSTSTORE_PATH_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.ENABLED_PROTOCOLS_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME);
+      allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
+      allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
+      allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
+      allowableConnectorKeys.add(TransportConstants.BATCH_DELAY);
+      allowableConnectorKeys.add(HornetQDefaultConfiguration.getPropMaskPassword());
+      allowableConnectorKeys.add(HornetQDefaultConfiguration.getPropPasswordCodec());
+      allowableConnectorKeys.add(TransportConstants.NETTY_CONNECT_TIMEOUT);
+
+      ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys);
+
+      String version;
+      Version v = Version.identify().get("netty-transport");
+      if (v == null)
+      {
+         version = "unknown";
+      }
+      else
+      {
+         version = v.artifactVersion();
+      }
+      NETTY_VERSION = version;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/ssl/SSLSupport.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/ssl/SSLSupport.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/ssl/SSLSupport.java
new file mode 100644
index 0000000..f2036af
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/ssl/SSLSupport.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.remoting.impl.ssl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.KeyStore;
+import java.security.PrivilegedAction;
+import java.security.SecureRandom;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.activemq6.utils.ClassloadingUtil;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author Justin Bertram
+ *
+ * Please note, this class supports PKCS#11 keystores, but there are no specific tests in the HornetQ test-suite to
+ * validate/verify this works because this requires a functioning PKCS#11 provider which is not available by default
+ * (see java.security.Security#getProviders()).  The main thing to keep in mind is that PKCS#11 keystores will have a
+ * null keystore path.
+ */
+public class SSLSupport
+{
+   // Public --------------------------------------------------------
+
+   public static SSLContext createContext(final String keystoreProvider, final String keystorePath, final String keystorePassword,
+                                          final String trustStoreProvider, final String trustStorePath, final String trustStorePassword) throws Exception
+   {
+      SSLContext context = SSLContext.getInstance("TLS");
+      KeyManager[] keyManagers = SSLSupport.loadKeyManagers(keystoreProvider, keystorePath, keystorePassword);
+      TrustManager[] trustManagers = SSLSupport.loadTrustManager(trustStoreProvider, trustStorePath, trustStorePassword);
+      context.init(keyManagers, trustManagers, new SecureRandom());
+      return context;
+   }
+
+   public static String[] parseCommaSeparatedListIntoArray(String suites)
+   {
+      String[] cipherSuites = suites.split(",");
+      for (int i = 0; i < cipherSuites.length; i++)
+      {
+         cipherSuites[i] = cipherSuites[i].trim();
+      }
+      return cipherSuites;
+   }
+
+   public static String parseArrayIntoCommandSeparatedList(String[] suites)
+   {
+      StringBuilder supportedSuites = new StringBuilder();
+
+      for (int i = 0; i < suites.length; i++)
+      {
+         supportedSuites.append(suites[i]);
+         supportedSuites.append(", ");
+      }
+
+      // trim the last 2 characters (i.e. unnecessary comma and space)
+      return supportedSuites.delete(supportedSuites.length() - 2, supportedSuites.length()).toString();
+   }
+
+   // Private -------------------------------------------------------
+
+   private static TrustManager[] loadTrustManager(final String trustStoreProvider,
+                                                  final String trustStorePath,
+                                                  final String trustStorePassword) throws Exception
+   {
+      if (trustStorePath == null && (trustStoreProvider == null || (trustStoreProvider != null && !"PKCS11".equals(trustStoreProvider.toUpperCase()))))
+      {
+         return null;
+      }
+      else
+      {
+         TrustManagerFactory trustMgrFactory;
+         KeyStore trustStore = SSLSupport.loadKeystore(trustStoreProvider, trustStorePath, trustStorePassword);
+         trustMgrFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+         trustMgrFactory.init(trustStore);
+         return trustMgrFactory.getTrustManagers();
+      }
+   }
+
+   private static KeyStore loadKeystore(final String keystoreProvider, final String keystorePath, final String keystorePassword) throws Exception
+   {
+      KeyStore ks = KeyStore.getInstance(keystoreProvider);
+      InputStream in = null;
+      try
+      {
+         if (keystorePath != null)
+         {
+            URL keystoreURL = SSLSupport.validateStoreURL(keystorePath);
+            in = keystoreURL.openStream();
+         }
+         ks.load(in, keystorePassword.toCharArray());
+      }
+      finally
+      {
+         if (in != null)
+         {
+            try
+            {
+               in.close();
+            }
+            catch (IOException ignored)
+            {
+            }
+         }
+      }
+      return ks;
+   }
+
+   private static KeyManager[] loadKeyManagers(final String keyStoreProvider, final String keystorePath, final String keystorePassword) throws Exception
+   {
+      if (keystorePath == null && (keyStoreProvider == null || (keyStoreProvider != null && !"PKCS11".equals(keyStoreProvider.toUpperCase()))))
+      {
+         return null;
+      }
+      else
+      {
+         KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+         KeyStore ks = SSLSupport.loadKeystore(keyStoreProvider, keystorePath, keystorePassword);
+         kmf.init(ks, keystorePassword.toCharArray());
+
+         return kmf.getKeyManagers();
+      }
+   }
+
+   private static URL validateStoreURL(final String storePath) throws Exception
+   {
+      assert storePath != null;
+
+      // First see if this is a URL
+      try
+      {
+         return new URL(storePath);
+      }
+      catch (MalformedURLException e)
+      {
+         File file = new File(storePath);
+         if (file.exists() == true && file.isFile())
+         {
+            return file.toURI().toURL();
+         }
+         else
+         {
+            URL url = findResource(storePath);
+            if (url != null)
+            {
+               return url;
+            }
+         }
+      }
+
+      throw new Exception("Failed to find a store at " + storePath);
+   }
+
+   /** This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
+    *  utility class, as it would be a door to load anything you like in a safe VM.
+    *  For that reason any class trying to do a privileged block should do with the AccessController directly.
+    */
+   private static URL findResource(final String resourceName)
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<URL>()
+      {
+         public URL run()
+         {
+            return ClassloadingUtil.findResource(resourceName);
+         }
+      });
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/package-info.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/package-info.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/package-info.java
new file mode 100644
index 0000000..457daae
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+/**
+ * Remoting API.
+ * <br>
+ * This package defines the API used by HornetQ for remoting.
+ */
+package org.apache.activemq6.core.remoting;
+

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/HornetQPrincipal.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/HornetQPrincipal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/HornetQPrincipal.java
new file mode 100644
index 0000000..94f6752
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/HornetQPrincipal.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.security;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         1/30/12
+ */
+public class HornetQPrincipal
+{
+   private final String userName;
+
+   private final String password;
+
+   public HornetQPrincipal(String userName, String password)
+   {
+      this.userName = userName;
+      this.password = password;
+   }
+
+   public String getUserName()
+   {
+      return userName;
+   }
+
+   public String getPassword()
+   {
+      return password;
+   }
+}


Mime
View raw message