activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [2/4] activemq-artemis git commit: ARTEMIS-994 Support Netty Native Epoll on Linux
Date Thu, 23 Mar 2017 23:20:14 GMT
ARTEMIS-994 Support Netty Native Epoll on Linux

The following changes are made to support Epoll.

Refactored SharedNioEventLoopGroup into renamed SharedEventLoopGroup to be generic (as so
we can re-use for both Nio and Epoll)

Add support and toggles for Epoll in NettyAcceptor and NettyConnector (with fall back to NIO
if cannot load Epoll)

Removal from code of PartialPooledByteBufAllocator, caused bad address when doing native,
and no longer needed - see jira discussion

New Connector Properties:

useEpoll - toggles to use epoll or not, default true (but we failback to nio gracefully)
remotingThreads = same behaviour as nioRemotingThreads. Previous property is depreated.
useGlobalWorkerPool = same behaviour as useNioGlobalWorkerPool. Old property is deprecated.

New Acceptor Properties:

useEpoll - toggles to use epoll or not, default true (but we failback to nio gracefully)
useGlobalWorkerPool = same behaviour as useNioGlobalWorkerPool but for Epoll.

This closes #1093


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a610748c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a610748c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a610748c

Branch: refs/heads/master
Commit: a610748c09ef18f0f4f589a9bf6ac2a745b52977
Parents: 2c9b028
Author: Michael André Pearce <michael.andre.pearce@me.com>
Authored: Wed Mar 15 06:06:45 2017 +0000
Committer: Justin Bertram <jbertram@apache.org>
Committed: Thu Mar 23 18:16:48 2017 -0500

----------------------------------------------------------------------
 .../impl/netty/DelegatingEventLoopGroup.java    | 198 +++++++++++++++++++
 .../remoting/impl/netty/NettyConnection.java    |   3 +-
 .../remoting/impl/netty/NettyConnector.java     |  66 +++++--
 .../impl/netty/SharedEventLoopGroup.java        | 112 +++++++++++
 .../impl/netty/SharedNioEventLoopGroup.java     | 111 -----------
 .../remoting/impl/netty/TransportConstants.java |  15 ++
 .../core/remoting/impl/netty/NettyAcceptor.java |  64 ++++--
 7 files changed, 430 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a610748c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java
new file mode 100644
index 0000000..f682893
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/DelegatingEventLoopGroup.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.remoting.impl.netty;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+
+public class DelegatingEventLoopGroup implements EventLoopGroup {
+
+   private final EventLoopGroup delegate;
+
+   public DelegatingEventLoopGroup(EventLoopGroup eventLoopGroup) {
+      this.delegate = eventLoopGroup;
+   }
+
+   @Override
+   public EventLoop next() {
+      return delegate.next();
+   }
+
+   @Override
+   public ChannelFuture register(Channel channel) {
+      return delegate.register(channel);
+   }
+
+   @Override
+   public ChannelFuture register(ChannelPromise channelPromise) {
+      return delegate.register(channelPromise);
+   }
+
+   @Override
+   @Deprecated
+   public ChannelFuture register(Channel channel, ChannelPromise channelPromise) {
+      return delegate.register(channel, channelPromise);
+   }
+
+   @Override
+   public boolean isShuttingDown() {
+      return delegate.isShuttingDown();
+   }
+
+   @Override
+   public Future<?> shutdownGracefully() {
+      return delegate.shutdownGracefully();
+   }
+
+   @Override
+   public Future<?> shutdownGracefully(long l, long l1, TimeUnit timeUnit) {
+      return delegate.shutdownGracefully(l, l1, timeUnit);
+   }
+
+   @Override
+   public Future<?> terminationFuture() {
+      return delegate.terminationFuture();
+   }
+
+   @Override
+   @Deprecated
+   public void shutdown() {
+      delegate.shutdown();
+   }
+
+   @Override
+   @Deprecated
+   public List<Runnable> shutdownNow() {
+      return delegate.shutdownNow();
+   }
+
+   @Override
+   public Iterator<EventExecutor> iterator() {
+      return delegate.iterator();
+   }
+
+   @Override
+   public Future<?> submit(Runnable runnable) {
+      return delegate.submit(runnable);
+   }
+
+   @Override
+   public <T> Future<T> submit(Runnable runnable, T t) {
+      return delegate.submit(runnable, t);
+   }
+
+   @Override
+   public <T> Future<T> submit(Callable<T> callable) {
+      return delegate.submit(callable);
+   }
+
+   @Override
+   public io.netty.util.concurrent.ScheduledFuture<?> schedule(Runnable runnable, long
l, TimeUnit timeUnit) {
+      return delegate.schedule(runnable, l, timeUnit);
+   }
+
+   @Override
+   public <V> io.netty.util.concurrent.ScheduledFuture<V> schedule(Callable<V>
callable, long l, TimeUnit timeUnit) {
+      return delegate.schedule(callable, l, timeUnit);
+   }
+
+   @Override
+   public io.netty.util.concurrent.ScheduledFuture<?> scheduleAtFixedRate(Runnable
runnable,
+                                                                          long l,
+                                                                          long l1,
+                                                                          TimeUnit timeUnit)
{
+      return delegate.scheduleAtFixedRate(runnable, l, l1, timeUnit);
+   }
+
+   @Override
+   public io.netty.util.concurrent.ScheduledFuture<?> scheduleWithFixedDelay(Runnable
runnable,
+                                                                             long l,
+                                                                             long l1,
+                                                                             TimeUnit timeUnit)
{
+      return delegate.scheduleWithFixedDelay(runnable, l, l1, timeUnit);
+   }
+
+   @Override
+   public boolean isShutdown() {
+      return delegate.isShutdown();
+   }
+
+   @Override
+   public boolean isTerminated() {
+      return delegate.isTerminated();
+   }
+
+   @Override
+   public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
{
+      return delegate.awaitTermination(timeout, unit);
+   }
+
+   @Override
+   public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<?
extends Callable<T>> tasks) throws InterruptedException {
+      return delegate.invokeAll(tasks);
+   }
+
+   @Override
+   public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<?
extends Callable<T>> tasks,
+                                                             long timeout,
+                                                             TimeUnit unit) throws InterruptedException
{
+      return delegate.invokeAll(tasks, timeout, unit);
+   }
+
+   @Override
+   public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws
InterruptedException, ExecutionException {
+      return delegate.invokeAny(tasks);
+   }
+
+   @Override
+   public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
+                          long timeout,
+                          TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
+      return delegate.invokeAny(tasks, timeout, unit);
+   }
+
+   @Override
+   public void execute(Runnable command) {
+      delegate.execute(command);
+   }
+
+   @Override
+   public void forEach(Consumer<? super EventExecutor> action) {
+      delegate.forEach(action);
+   }
+
+   @Override
+   public Spliterator<EventExecutor> spliterator() {
+      return delegate.spliterator();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a610748c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index c3a71c5..2355439 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.concurrent.Semaphore;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -216,7 +215,7 @@ public class NettyConnection implements Connection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
-      return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size),
true);
+      return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a610748c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 4a5e9e6..cc062d3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -58,6 +58,9 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -218,6 +221,12 @@ public class NettyConnector extends AbstractConnector {
 
    private boolean useNioGlobalWorkerPool;
 
+   private boolean useEpoll;
+
+   private int epollRemotingThreads;
+
+   private boolean useEpollGlobalWorkerPool;
+
    private ScheduledExecutorService scheduledThreadPool;
 
    private Executor closeExecutor;
@@ -288,6 +297,13 @@ public class NettyConnector extends AbstractConnector {
 
       useNioGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME,
TransportConstants.DEFAULT_USE_NIO_GLOBAL_WORKER_POOL, configuration);
 
+      useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME,
TransportConstants.DEFAULT_USE_EPOLL, configuration);
+
+      epollRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME,
-1, configuration);
+
+      useEpollGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME,
TransportConstants.DEFAULT_USE_EPOLL_GLOBAL_WORKER_POOL, configuration);
+
+
       useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME,
TransportConstants.DEFAULT_USE_SERVLET, configuration);
       host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST,
configuration);
       port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT,
configuration);
@@ -371,22 +387,46 @@ public class NettyConnector extends AbstractConnector {
          return;
       }
 
-      int threadsToUse;
+      // Default to number of cores * 3
+      int defaultThreadsToUse = Runtime.getRuntime().availableProcessors() * 3;
 
-      if (nioRemotingThreads == -1) {
-         // Default to number of cores * 3
+      if (useEpoll) {
+         if (Epoll.isAvailable()) {
+            int epollThreadsToUse;
+            if (epollRemotingThreads == -1) {
+               epollThreadsToUse = defaultThreadsToUse;
+            } else {
+               epollThreadsToUse = this.epollRemotingThreads;
+            }
+            if (useEpollGlobalWorkerPool) {
+               channelClazz = EpollSocketChannel.class;
+               group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(epollThreadsToUse,
threadFactory)));
+            } else {
+               channelClazz = EpollSocketChannel.class;
+               group = new EpollEventLoopGroup(epollThreadsToUse);
+            }
+            logger.info("Connector using native epoll");
 
-         threadsToUse = Runtime.getRuntime().availableProcessors() * 3;
-      } else {
-         threadsToUse = this.nioRemotingThreads;
+         } else {
+            logger.warn("Connector unable to load native epoll, will continue and load nio");
+         }
       }
 
-      if (useNioGlobalWorkerPool) {
-         channelClazz = NioSocketChannel.class;
-         group = SharedNioEventLoopGroup.getInstance(threadsToUse);
-      } else {
-         channelClazz = NioSocketChannel.class;
-         group = new NioEventLoopGroup(threadsToUse);
+      if (channelClazz == null || group == null) {
+         int nioThreadsToUse;
+         if (nioRemotingThreads == -1) {
+            nioThreadsToUse = defaultThreadsToUse;
+         } else {
+            nioThreadsToUse = this.nioRemotingThreads;
+         }
+         if (useNioGlobalWorkerPool) {
+            channelClazz = NioSocketChannel.class;
+            group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(nioThreadsToUse,
threadFactory)));
+         } else {
+            channelClazz = NioSocketChannel.class;
+            group = new NioEventLoopGroup(nioThreadsToUse);
+         }
+         logger.info("Connector using nio");
       }
       // if we are a servlet wrap the socketChannelFactory
 
@@ -1053,7 +1093,7 @@ public class NettyConnector extends AbstractConnector {
    }
 
    public static void clearThreadPools() {
-      SharedNioEventLoopGroup.forceShutdown();
+      SharedEventLoopGroup.forceShutdown();
    }
 
    private static ClassLoader getThisClassLoader() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a610748c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java
new file mode 100644
index 0000000..0af54de
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.remoting.impl.netty;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.Promise;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+
+public class SharedEventLoopGroup extends DelegatingEventLoopGroup {
+
+   private static SharedEventLoopGroup instance;
+
+   private final AtomicReference<ScheduledFuture<?>> shutdown = new AtomicReference<>();
+   private final AtomicLong channelFactoryCount = new AtomicLong();
+   private final Promise<?> terminationPromise = ImmediateEventExecutor.INSTANCE.newPromise();
+
+   private SharedEventLoopGroup(EventLoopGroup eventLoopGroup) {
+      super(eventLoopGroup);
+   }
+
+   public static synchronized void forceShutdown() {
+      if (instance != null) {
+         instance.shutdown();
+         instance.channelFactoryCount.set(0);
+         instance = null;
+      }
+   }
+
+   public static synchronized SharedEventLoopGroup getInstance(Function<ThreadFactory,
EventLoopGroup> eventLoopGroupSupplier) {
+      if (instance != null) {
+         ScheduledFuture f = instance.shutdown.getAndSet(null);
+         if (f != null) {
+            f.cancel(false);
+         }
+      } else {
+         instance = new SharedEventLoopGroup(eventLoopGroupSupplier.apply(AccessController.doPrivileged(new
PrivilegedAction<ThreadFactory>() {
+            @Override
+            public ThreadFactory run() {
+               return new ActiveMQThreadFactory("ActiveMQ-client-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
+            }
+         })));
+      }
+      instance.channelFactoryCount.incrementAndGet();
+      return instance;
+   }
+
+   @Override
+   public Future<?> terminationFuture() {
+      return terminationPromise;
+   }
+
+   @Override
+   public Future<?> shutdownGracefully() {
+      return shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS);
+   }
+
+   @Override
+   public Future<?> shutdownGracefully(final long l, final long l2, final TimeUnit
timeUnit) {
+      if (channelFactoryCount.decrementAndGet() == 0) {
+         shutdown.compareAndSet(null, next().scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+               synchronized (SharedEventLoopGroup.class) {
+                  if (shutdown.get() != null) {
+                     Future<?> future = SharedEventLoopGroup.super.shutdownGracefully(l,
l2, timeUnit);
+                     future.addListener(new FutureListener<Object>() {
+                        @Override
+                        public void operationComplete(Future future) throws Exception {
+                           if (future.isSuccess()) {
+                              terminationPromise.setSuccess(null);
+                           } else {
+                              terminationPromise.setFailure(future.cause());
+                           }
+                        }
+                     });
+                     instance = null;
+                  }
+               }
+            }
+
+         }, 10, 10, TimeUnit.SECONDS));
+      }
+      return terminationPromise;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a610748c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedNioEventLoopGroup.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedNioEventLoopGroup.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedNioEventLoopGroup.java
deleted file mode 100644
index 0750105..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedNioEventLoopGroup.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.remoting.impl.netty;
-
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.ImmediateEventExecutor;
-import io.netty.util.concurrent.Promise;
-import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-
-public class SharedNioEventLoopGroup extends NioEventLoopGroup {
-
-   private static SharedNioEventLoopGroup instance;
-
-   private final AtomicReference<ScheduledFuture<?>> shutdown = new AtomicReference<>();
-   private final AtomicLong nioChannelFactoryCount = new AtomicLong();
-   private final Promise<?> terminationPromise = ImmediateEventExecutor.INSTANCE.newPromise();
-
-   private SharedNioEventLoopGroup(int numThreads, ThreadFactory factory) {
-      super(numThreads, factory);
-   }
-
-   public static synchronized void forceShutdown() {
-      if (instance != null) {
-         instance.shutdown();
-         instance.nioChannelFactoryCount.set(0);
-         instance = null;
-      }
-   }
-
-   public static synchronized SharedNioEventLoopGroup getInstance(int numThreads) {
-      if (instance != null) {
-         ScheduledFuture f = instance.shutdown.getAndSet(null);
-         if (f != null) {
-            f.cancel(false);
-         }
-      } else {
-         instance = new SharedNioEventLoopGroup(numThreads, AccessController.doPrivileged(new
PrivilegedAction<ThreadFactory>() {
-            @Override
-            public ThreadFactory run() {
-               return new ActiveMQThreadFactory("ActiveMQ-client-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
-            }
-         }));
-      }
-      instance.nioChannelFactoryCount.incrementAndGet();
-      return instance;
-   }
-
-   @Override
-   public Future<?> terminationFuture() {
-      return terminationPromise;
-   }
-
-   @Override
-   public Future<?> shutdownGracefully() {
-      return shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS);
-   }
-
-   @Override
-   public Future<?> shutdownGracefully(final long l, final long l2, final TimeUnit
timeUnit) {
-      if (nioChannelFactoryCount.decrementAndGet() == 0) {
-         shutdown.compareAndSet(null, next().scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-               synchronized (SharedNioEventLoopGroup.class) {
-                  if (shutdown.get() != null) {
-                     Future<?> future = SharedNioEventLoopGroup.super.shutdownGracefully(l,
l2, timeUnit);
-                     future.addListener(new FutureListener<Object>() {
-                        @Override
-                        public void operationComplete(Future future) throws Exception {
-                           if (future.isSuccess()) {
-                              terminationPromise.setSuccess(null);
-                           } else {
-                              terminationPromise.setFailure(future.cause());
-                           }
-                        }
-                     });
-                     instance = null;
-                  }
-               }
-            }
-
-         }, 10, 10, TimeUnit.SECONDS));
-      }
-      return terminationPromise;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a610748c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 14efb79..12840c1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -51,6 +51,10 @@ public class TransportConstants {
 
    public static final String USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME = "useNioGlobalWorkerPool";
 
+   public static final String USE_EPOLL_PROP_NAME = "useEpoll";
+
+   public static final String USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME = "useEpollGlobalWorkerPool";
+
    public static final String USE_INVM_PROP_NAME = "useInvm";
 
    public static final String ACTIVEMQ_SERVER_NAME = "activemqServerName";
@@ -113,6 +117,8 @@ public class TransportConstants {
 
    public static final String NIO_REMOTING_THREADS_PROPNAME = "nioRemotingThreads";
 
+   public static final String EPOLL_REMOTING_THREADS_PROPNAME = "epollRemotingThreads";
+
    public static final String BATCH_DELAY = "batchDelay";
 
    public static final String DIRECT_DELIVER = "directDeliver";
@@ -127,6 +133,10 @@ public class TransportConstants {
 
    public static final boolean DEFAULT_USE_NIO_GLOBAL_WORKER_POOL = true;
 
+   public static final boolean DEFAULT_USE_EPOLL_GLOBAL_WORKER_POOL = true;
+
+   public static final boolean DEFAULT_USE_EPOLL = true;
+
    public static final boolean DEFAULT_USE_INVM = false;
 
    public static final boolean DEFAULT_USE_SERVLET = false;
@@ -218,6 +228,7 @@ public class TransportConstants {
       allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.PROTOCOLS_PROP_NAME);
@@ -237,6 +248,7 @@ public class TransportConstants {
       allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
+      allowableAcceptorKeys.add(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME);
       allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
       allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
       allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
@@ -267,6 +279,8 @@ public class TransportConstants {
       allowableConnectorKeys.add(TransportConstants.SERVLET_PATH);
       allowableConnectorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.USE_EPOLL_GLOBAL_WORKER_POOL_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);
       allowableConnectorKeys.add(TransportConstants.LOCAL_ADDRESS_PROP_NAME);
@@ -284,6 +298,7 @@ public class TransportConstants {
       allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
+      allowableConnectorKeys.add(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME);
       allowableConnectorKeys.add(TransportConstants.BATCH_DELAY);
       allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
       allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a610748c/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index a46684d..50faa46 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -44,6 +44,9 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.ServerChannel;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.channel.group.DefaultChannelGroup;
@@ -117,6 +120,8 @@ public class NettyAcceptor extends AbstractAcceptor {
 
    private final boolean useInvm;
 
+   private final boolean useEpoll;
+
    private final ProtocolHandler protocolHandler;
 
    private final String host;
@@ -154,6 +159,8 @@ public class NettyAcceptor extends AbstractAcceptor {
 
    private final int nioRemotingThreads;
 
+   private final int epollRemotingThreads;
+
    private final ConcurrentMap<Object, NettyServerConnection> connections = new ConcurrentHashMap<>();
 
    private final Map<String, Object> configuration;
@@ -202,6 +209,11 @@ public class NettyAcceptor extends AbstractAcceptor {
       sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
TransportConstants.DEFAULT_SSL_ENABLED, configuration);
 
       nioRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,
-1, configuration);
+
+      epollRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.EPOLL_REMOTING_THREADS_PROPNAME,
-1, configuration);
+      useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME,
TransportConstants.DEFAULT_USE_EPOLL, configuration);
+
+
       backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME,
-1, configuration);
       useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
TransportConstants.DEFAULT_USE_INVM, configuration);
 
@@ -270,22 +282,48 @@ public class NettyAcceptor extends AbstractAcceptor {
          channelClazz = LocalServerChannel.class;
          eventLoopGroup = new LocalEventLoopGroup();
       } else {
-         int threadsToUse;
-
-         if (nioRemotingThreads == -1) {
-            // Default to number of cores * 3
+         // Default to number of cores * 3
+         int defaultThreadsToUse = Runtime.getRuntime().availableProcessors() * 3;
+
+         if (useEpoll) {
+            if (Epoll.isAvailable()) {
+               int epollThreadsToUse;
+               if (epollRemotingThreads == -1) {
+                  epollThreadsToUse = defaultThreadsToUse;
+               } else {
+                  epollThreadsToUse = this.epollRemotingThreads;
+               }
 
-            threadsToUse = Runtime.getRuntime().availableProcessors() * 3;
-         } else {
-            threadsToUse = this.nioRemotingThreads;
+               channelClazz = EpollServerSocketChannel.class;
+               eventLoopGroup = new EpollEventLoopGroup(epollThreadsToUse, AccessController.doPrivileged(new
PrivilegedAction<ActiveMQThreadFactory>() {
+                  @Override
+                  public ActiveMQThreadFactory run() {
+                     return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
+                  }
+               }));
+               logger.info("Acceptor using native epoll");
+            } else {
+               logger.warn("Acceptor unable to load native epoll, will continue and load
nio");
+            }
          }
-         channelClazz = NioServerSocketChannel.class;
-         eventLoopGroup = new NioEventLoopGroup(threadsToUse, AccessController.doPrivileged(new
PrivilegedAction<ActiveMQThreadFactory>() {
-            @Override
-            public ActiveMQThreadFactory run() {
-               return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
+
+         if (channelClazz == null || eventLoopGroup == null) {
+            int nioThreadsToUse;
+            if (nioRemotingThreads == -1) {
+               nioThreadsToUse = defaultThreadsToUse;
+            } else {
+               nioThreadsToUse = nioRemotingThreads;
             }
-         }));
+
+            channelClazz = NioServerSocketChannel.class;
+            eventLoopGroup = new NioEventLoopGroup(nioThreadsToUse, AccessController.doPrivileged(new
PrivilegedAction<ActiveMQThreadFactory>() {
+               @Override
+               public ActiveMQThreadFactory run() {
+                  return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
+               }
+            }));
+            logger.info("Acceptor using nio");
+         }
       }
 
       bootstrap = new ServerBootstrap();


Mime
View raw message