activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-898 - Adding Plugin Support
Date Wed, 03 May 2017 16:17:12 GMT
ARTEMIS-898 - Adding Plugin Support

Adding a new ActievMQServerPlugin interface to support adding custom
behavior to the broker at certain events such as connection or session
creation.

https://issues.apache.org/jira/browse/ARTEMIS-898


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1e1ede84
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1e1ede84
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1e1ede84

Branch: refs/heads/master
Commit: 1e1ede84c0483099f27741bc046ef95c08e1d090
Parents: 303d97c
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Tue May 2 09:46:17 2017 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Wed May 3 11:21:32 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/config/Configuration.java      |  21 ++
 .../core/config/impl/ConfigurationImpl.java     |  24 ++
 .../core/postoffice/impl/PostOfficeImpl.java    |   4 +
 .../server/impl/RemotingServiceImpl.java        |   7 +-
 .../artemis/core/server/ActiveMQServer.java     |  18 +-
 .../core/server/cluster/ClusterManager.java     |   3 +
 .../core/server/impl/ActiveMQServerImpl.java    |  57 +++-
 .../core/server/impl/LastValueQueue.java        |   6 +-
 .../core/server/impl/QueueFactoryImpl.java      |  19 +-
 .../artemis/core/server/impl/QueueImpl.java     |  31 +-
 .../core/server/impl/ServerConsumerImpl.java    |   7 +
 .../core/server/impl/ServerSessionImpl.java     |  33 +-
 .../server/plugin/ActiveMQPluginRunnable.java   |  24 ++
 .../server/plugin/ActiveMQServerPlugin.java     | 336 +++++++++++++++++++
 .../integration/amqp/AmqpClientTestSupport.java |   7 +
 .../integration/client/HangConsumerTest.java    |  37 +-
 .../client/InterruptedLargeMessageTest.java     |  18 +-
 .../jms/client/TopicCleanupTest.java            |  12 +-
 .../openwire/amq/JmsResourceProvider.java       |   2 +-
 .../integration/plugin/AmqpPluginTest.java      | 131 ++++++++
 .../integration/plugin/CorePluginTest.java      | 257 ++++++++++++++
 .../plugin/MethodCalledVerifier.java            | 276 +++++++++++++++
 .../integration/plugin/MqttPluginTest.java      | 132 ++++++++
 .../integration/plugin/OpenwirePluginTest.java  | 109 ++++++
 .../integration/plugin/StompPluginTest.java     | 126 +++++++
 .../timing/core/server/impl/QueueImplTest.java  |  15 +-
 .../unit/core/server/impl/QueueImplTest.java    |   3 +-
 .../server/impl/fakes/FakeQueueFactory.java     |   7 +-
 28 files changed, 1664 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 7dfb1a5..7da5b02 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
 
@@ -1081,4 +1082,24 @@ public interface Configuration {
    Configuration setNetworkCheckPing6Command(String command);
 
    String getInternalNamingPrefix();
+
+   /**
+    * @param plugins
+    */
+   void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
+
+   /**
+    * @param plugin
+    */
+   void registerBrokerPlugin(ActiveMQServerPlugin plugin);
+
+   /**
+    * @param plugin
+    */
+   void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
+
+   /**
+    * @return
+    */
+   List<ActiveMQServerPlugin> getBrokerPlugins();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 2a538ca..8edeb5b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@@ -63,6 +64,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
 import org.apache.activemq.artemis.utils.Env;
@@ -232,6 +234,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
 
    private List<SecuritySettingPlugin> securitySettingPlugins = new ArrayList<>();
 
+   private final List<ActiveMQServerPlugin> brokerPlugins = new CopyOnWriteArrayList<>();
+
    private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>();
 
    protected List<ConnectorServiceConfiguration> connectorServiceConfigurations = new ArrayList<>();
@@ -1321,6 +1325,26 @@ public class ConfigurationImpl implements Configuration, Serializable {
    }
 
    @Override
+   public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
+      brokerPlugins.addAll(plugins);
+   }
+
+   @Override
+   public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
+      brokerPlugins.add(plugin);
+   }
+
+   @Override
+   public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
+      brokerPlugins.remove(plugin);
+   }
+
+   @Override
+   public List<ActiveMQServerPlugin> getBrokerPlugins() {
+      return brokerPlugins;
+   }
+
+   @Override
    public File getBrokerInstance() {
       if (artemisInstance != null) {
          return artemisInstance;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 2ef7657..a927768 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -783,7 +783,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          }
       } else {
          try {
+            server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
             processRoute(message, context, direct);
+            final RoutingStatus finalResult = result;
+            server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct,
+                  rejectDuplicates, finalResult) : null);
          } catch (ActiveMQAddressFullException e) {
             if (startedTX.get()) {
                context.getTransaction().rollback();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index e0e5b52..7c9c675 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -514,6 +514,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       }
 
       ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConnection(entry.connection) : null);
 
       if (logger.isTraceEnabled()) {
          logger.trace("Connection created " + connection);
@@ -534,8 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       ConnectionEntry conn = connections.get(connectionID);
 
       if (conn != null && !conn.connection.isSupportReconnect()) {
-         removeConnection(connectionID);
-
+         RemotingConnection removedConnection = removeConnection(connectionID);
+         if (removedConnection != null) {
+            server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDestroyConnection(removedConnection) : null);
+         }
          conn.connection.fail(new ActiveMQRemoteDisconnectException());
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index bfd9aec..e16557f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.server;
 
-import javax.management.MBeanServer;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -24,6 +23,8 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import javax.management.MBeanServer;
+
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -47,6 +48,8 @@ import org.apache.activemq.artemis.core.server.impl.Activation;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.server.reload.ReloadManager;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -186,6 +189,18 @@ public interface ActiveMQServer extends ServiceComponent {
     */
    void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception;
 
+   void registerBrokerPlugin(ActiveMQServerPlugin plugin);
+
+   void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
+
+   void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
+
+   List<ActiveMQServerPlugin> getBrokerPlugins();
+
+   void callBrokerPlugins(ActiveMQPluginRunnable pluginRun);
+
+   boolean hasBrokerPlugins();
+
    void checkQueueCreationLimit(String username) throws Exception;
 
    ServerSession createSession(String name,
@@ -447,4 +462,5 @@ public interface ActiveMQServer extends ServiceComponent {
    void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
 
    String getInternalNamingPrefix();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index d2219c2..70edb68 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -405,6 +405,8 @@ public final class ClusterManager implements ActiveMQComponent {
          return;
       }
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeployBridge(config) : null);
+
       Queue queue = (Queue) binding.getBindable();
 
       ServerLocatorInternal serverLocator;
@@ -478,6 +480,7 @@ public final class ClusterManager implements ActiveMQComponent {
 
       bridge.start();
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeployBridge(bridge) : null);
    }
 
    public static class IncomingInterceptorLookingForExceptionMessage implements Interceptor {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 8482cb3..06964ee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
-import javax.management.MBeanServer;
-import javax.security.cert.X509Certificate;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -48,6 +46,9 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.MBeanServer;
+import javax.security.cert.X509Certificate;
+
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -144,6 +145,8 @@ import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
 import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
 import org.apache.activemq.artemis.core.server.reload.ReloadManager;
 import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
@@ -1309,10 +1312,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       checkSessionLimit(validatedUser);
 
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
+            autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
+
       final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes);
 
       sessions.put(name, session);
 
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateSession(session) : null);
+
       return session;
    }
 
@@ -1705,6 +1713,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          return;
       }
 
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
+            removeConsumers, autoDeleteAddress) : null);
+
       addressSettingsRepository.clearCache();
 
       Binding binding = postOffice.getBinding(queueName);
@@ -1743,6 +1754,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
 
       callPostQueueDeletionCallbacks(address, queueName);
+
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
+            removeConsumers, autoDeleteAddress) : null);
    }
 
    @Override
@@ -1808,6 +1822,38 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
+      configuration.registerBrokerPlugins(plugins);
+   }
+
+   @Override
+   public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
+      configuration.registerBrokerPlugin(plugin);
+   }
+
+   @Override
+   public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
+      configuration.unRegisterBrokerPlugin(plugin);
+   }
+
+   @Override
+   public List<ActiveMQServerPlugin> getBrokerPlugins() {
+      return configuration.getBrokerPlugins();
+   }
+
+   @Override
+   public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) {
+      if (pluginRun != null) {
+         getBrokerPlugins().forEach(plugin -> pluginRun.run(plugin));
+      }
+   }
+
+   @Override
+   public boolean hasBrokerPlugins() {
+      return !getBrokerPlugins().isEmpty();
+   }
+
+   @Override
    public ExecutorFactory getExecutorFactory() {
       return executorFactory;
    }
@@ -2103,7 +2149,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       securityStore = new SecurityStoreImpl(securityRepository, securityManager, configuration.getSecurityInvalidationInterval(), configuration.isSecurityEnabled(), configuration.getClusterUser(), configuration.getClusterPassword(), managementService);
 
-      queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager);
+      queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager, this);
 
       pagingManager = createPagingManager();
 
@@ -2508,6 +2554,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).build();
 
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
+
       final Queue queue = queueFactory.createQueueWith(queueConfig);
 
       if (transientQueue) {
@@ -2550,6 +2598,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       callPostQueueCreationCallbacks(queue.getName());
 
+      callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
+
       return queue;
    }
 
@@ -2763,4 +2813,5 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          deployAddressesFromConfiguration(config);
       }
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index ceec92c..8370839 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -63,8 +64,9 @@ public class LastValueQueue extends QueueImpl {
                          final PostOffice postOffice,
                          final StorageManager storageManager,
                          final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                         final Executor executor) {
-      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+                         final Executor executor,
+                         final ActiveMQServer server) {
+      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 9258a07..3d8ceb1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
@@ -49,17 +50,19 @@ public class QueueFactoryImpl implements QueueFactory {
 
    protected final ExecutorFactory executorFactory;
 
+   protected final ActiveMQServer server;
+
    public QueueFactoryImpl(final ExecutorFactory executorFactory,
                            final ScheduledExecutorService scheduledExecutor,
                            final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                           final StorageManager storageManager) {
-      this.addressSettingsRepository = addressSettingsRepository;
+                           final StorageManager storageManager,
+                           final ActiveMQServer server) {
 
+      this.addressSettingsRepository = addressSettingsRepository;
       this.scheduledExecutor = scheduledExecutor;
-
       this.storageManager = storageManager;
-
       this.executorFactory = executorFactory;
+      this.server = server;
    }
 
    @Override
@@ -72,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory {
       final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
       final Queue queue;
       if (addressSettings.isLastValueQueue()) {
-         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
       } else {
-         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
       }
       return queue;
    }
@@ -98,9 +101,9 @@ public class QueueFactoryImpl implements QueueFactory {
 
       Queue queue;
       if (addressSettings.isLastValueQueue()) {
-         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),  scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),  scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
       } else {
-         queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+         queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
       }
 
       return queue;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a62ae79..c2cfdef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@@ -69,7 +70,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
@@ -198,6 +198,8 @@ public class QueueImpl implements Queue {
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
+   private final ActiveMQServer server;
+
    private final ScheduledExecutorService scheduledExecutor;
 
    private final SimpleString address;
@@ -330,8 +332,9 @@ public class QueueImpl implements Queue {
                     final PostOffice postOffice,
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                    final Executor executor) {
-      this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+                    final Executor executor,
+                    final ActiveMQServer server) {
+      this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
    }
 
    public QueueImpl(final long id,
@@ -347,8 +350,9 @@ public class QueueImpl implements Queue {
                     final PostOffice postOffice,
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                    final Executor executor) {
-      this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+                    final Executor executor,
+                    final ActiveMQServer server) {
+      this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
    }
 
    public QueueImpl(final long id,
@@ -367,7 +371,8 @@ public class QueueImpl implements Queue {
                     final PostOffice postOffice,
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                    final Executor executor) {
+                    final Executor executor,
+                    final ActiveMQServer server) {
 
       this.id = id;
 
@@ -401,6 +406,8 @@ public class QueueImpl implements Queue {
 
       this.scheduledExecutor = scheduledExecutor;
 
+      this.server = server;
+
       scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
 
       if (addressSettingsRepository != null) {
@@ -1078,6 +1085,9 @@ public class QueueImpl implements Queue {
          messagesAcknowledged++;
       }
 
+      if (server != null) {
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null);
+      }
    }
 
    @Override
@@ -1112,6 +1122,10 @@ public class QueueImpl implements Queue {
       } else {
          messagesAcknowledged++;
       }
+
+      if (server != null) {
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageAcknowledged(ref, reason) : null);
+      }
    }
 
    @Override
@@ -1195,6 +1209,11 @@ public class QueueImpl implements Queue {
          }
          acknowledge(ref, AckReason.EXPIRED);
       }
+
+      if (server != null) {
+         final SimpleString expiryAddress = messageExpiryAddress;
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.messageExpired(ref, expiryAddress) : null);
+      }
    }
 
    private SimpleString expiryAddressFromMessageAddress(MessageReference ref) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 9e33602..af8524d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -416,6 +416,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       try {
          Message message = reference.getMessage();
 
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeDeliver(reference) : null);
+
          if (message.isLargeMessage() && supportLargeMessage) {
             if (largeMessageDeliverer == null) {
                // This can't really happen as handle had already crated the deliverer
@@ -432,6 +434,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       } finally {
          lockDelivery.readLock().unlock();
          callback.afterDelivery();
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterDeliver(reference) : null);
       }
 
    }
@@ -447,6 +450,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
       }
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseConsumer(this, failed) : null);
+
       setStarted(false);
 
       LargeMessageDeliverer del = largeMessageDeliverer;
@@ -501,6 +506,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
          managementService.sendNotification(notification);
       }
+
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseConsumer(this, failed) : null);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index edd7afc..7245843 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -16,10 +16,8 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonObjectBuilder;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.Xid;
+import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,6 +29,11 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonObjectBuilder;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.Closeable;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@@ -89,8 +92,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
 import org.apache.activemq.artemis.utils.TypedProperties;
 import org.jboss.logging.Logger;
 
-import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
-
 /**
  * Server side Session implementation
  */
@@ -345,6 +346,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    protected void doClose(final boolean failed) throws Exception {
       synchronized (this) {
+         if (!closed) {
+            server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null);
+         }
          this.setStarted(false);
          if (closed)
             return;
@@ -395,6 +399,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          }
 
          closed = true;
+
+         server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCloseSession(this, failed) : null);
       }
    }
 
@@ -444,9 +450,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       Filter filter = FilterImpl.createFilter(filterString);
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCreateConsumer(consumerID, unPrefixedQueueName,
+            filterString, browseOnly, supportLargeMessage) : null);
+
       ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
       consumers.put(consumer.getID(), consumer);
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterCreateConsumer(consumer) : null);
+
       if (!browseOnly) {
          TypedProperties props = new TypedProperties();
 
@@ -1290,6 +1301,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                                           final boolean direct,
                                           boolean noAutoCreateQueue) throws Exception {
 
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(tx, message, direct, noAutoCreateQueue) : null);
+
       // If the protocol doesn't support flow control, we have no choice other than fail the communication
       if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
          ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
@@ -1333,10 +1346,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       if (message.getAddressSimpleString().equals(managementAddress)) {
          // It's a management message
 
-         handleManagementMessage(tx, message, direct);
+         result = handleManagementMessage(tx, message, direct);
       } else {
          result = doSend(tx, message, address, direct, noAutoCreateQueue);
       }
+
+      final RoutingStatus finalResult = result;
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(tx, message, direct, noAutoCreateQueue, finalResult) : null);
+
       return result;
    }
 
@@ -1367,10 +1384,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public void addMetaData(String key, String data) {
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSessionMetadataAdded(this, key, data) : null);
       if (metaData == null) {
          metaData = new HashMap<>();
       }
       metaData.put(key, data);
+      server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSessionMetadataAdded(this, key, data) : null);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
new file mode 100644
index 0000000..bc85475
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQPluginRunnable.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.plugin;
+
+public interface ActiveMQPluginRunnable {
+
+   void run(ActiveMQServerPlugin plugin);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
new file mode 100644
index 0000000..95296f0
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.plugin;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.cluster.Bridge;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+
+
+public interface ActiveMQServerPlugin {
+
+
+   /**
+    * A connection has been created.
+    *
+    * @param connection The newly created connection
+    */
+   default void afterCreateConnection(RemotingConnection connection) {
+
+   }
+
+   /**
+    * A connection has been destroyed.
+    *
+    * @param connection
+    */
+   default void afterDestroyConnection(RemotingConnection connection) {
+
+   }
+
+   /**
+    * Before a session is created.
+    *
+    * @param name
+    * @param username
+    * @param minLargeMessageSize
+    * @param connection
+    * @param autoCommitSends
+    * @param autoCommitAcks
+    * @param preAcknowledge
+    * @param xa
+    * @param defaultAddress
+    * @param callback
+    * @param autoCreateQueues
+    * @param context
+    * @param prefixes
+    */
+   default void beforeCreateSession(String name, String username, int minLargeMessageSize,
+         RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
+         boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
+         Map<SimpleString, RoutingType> prefixes) {
+
+   }
+
+   /**
+    * After a session has been created.
+    *
+    * @param session The newly created session
+    */
+   default void afterCreateSession(ServerSession session) {
+
+   }
+
+   /**
+    * Before a session is closed
+    *
+    * @param session
+    * @param failed
+    */
+   default void beforeCloseSession(ServerSession session, boolean failed) {
+
+   }
+
+   /**
+    * After a session is closed
+    *
+    * @param session
+    * @param failed
+    */
+   default void afterCloseSession(ServerSession session, boolean failed) {
+
+   }
+
+   /**
+    * Before session metadata is added to the session
+    *
+    * @param session
+    * @param key
+    * @param data
+    */
+   default void beforeSessionMetadataAdded(ServerSession session, String key, String data) {
+
+   }
+
+   /**
+    * After session metadata is added to the session
+    *
+    * @param session
+    * @param key
+    * @param data
+    */
+   default void afterSessionMetadataAdded(ServerSession session, String key, String data) {
+
+   }
+
+   /**
+    * Before a consumer is created
+    *
+    * @param consumerID
+    * @param queueName
+    * @param filterString
+    * @param browseOnly
+    * @param supportLargeMessage
+    */
+   default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
+         boolean browseOnly, boolean supportLargeMessage) {
+
+   }
+
+   /**
+    * After a consumer has been created
+    *
+    * @param consumer the created consumer
+    */
+   default void afterCreateConsumer(ServerConsumer consumer) {
+
+   }
+
+   /**
+    * Before a consumer is closed
+    *
+    * @param consumer
+    * @param failed
+    */
+   default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
+
+   }
+
+   /**
+    * After a consumer is closed
+    *
+    * @param consumer
+    * @param failed
+    */
+   default void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
+
+   }
+
+   /**
+    * Before a queue is created
+    *
+    * @param queueConfig
+    */
+   default void beforeCreateQueue(QueueConfig queueConfig) {
+
+   }
+
+   /**
+    * After a queue has been created
+    *
+    * @param queue The newly created queue
+    */
+   default void afterCreateQueue(Queue queue) {
+
+   }
+
+   /**
+    * Before a queue is destroyed
+    *
+    * @param queueName
+    * @param session
+    * @param checkConsumerCount
+    * @param removeConsumers
+    * @param autoDeleteAddress
+    */
+   default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
+         boolean removeConsumers, boolean autoDeleteAddress) {
+
+   }
+
+   /**
+    * After a queue has been destroyed
+    *
+    * @param queue
+    * @param address
+    * @param session
+    * @param checkConsumerCount
+    * @param removeConsumers
+    * @param autoDeleteAddress
+    */
+   default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
+         boolean removeConsumers, boolean autoDeleteAddress) {
+
+   }
+
+   /**
+    * Before a message is sent
+    *
+    * @param tx
+    * @param message
+    * @param direct
+    * @param noAutoCreateQueue
+    */
+   default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
+
+   }
+
+   /**
+    * After a message is sent
+    *
+    * @param tx
+    * @param message
+    * @param direct
+    * @param noAutoCreateQueue
+    * @param result
+    */
+   default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
+         RoutingStatus result) {
+
+   }
+
+   /**
+    * Before a message is routed
+    *
+    * @param message
+    * @param context
+    * @param direct
+    * @param rejectDuplicates
+    */
+   default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
+
+   }
+
+   /**
+    * After a message is routed
+    *
+    * @param message
+    * @param context
+    * @param direct
+    * @param rejectDuplicates
+    * @param result
+    */
+   default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
+         RoutingStatus result) {
+
+   }
+
+   /**
+    * Before a message is delivered to a client consumer
+    *
+    * @param reference
+    */
+   default void beforeDeliver(MessageReference reference) {
+
+   }
+
+   /**
+    * After a message is delivered to a client consumer
+    *
+    * @param reference
+    */
+   default void afterDeliver(MessageReference reference) {
+
+   }
+
+   /**
+    * A message has been expired
+    *
+    * @param message The expired message
+    * @param messageExpiryAddress The message expiry address if exists
+    */
+   default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
+
+   }
+
+   /**
+    * A message has been acknowledged
+    *
+    * @param ref The acked message
+    * @param reason The ack reason
+    */
+   default void messageAcknowledged(MessageReference ref, AckReason reason) {
+
+   }
+
+   /**
+    * Before a bridge is deployed
+    *
+    * @param config The bridge configuration
+    */
+   default void beforeDeployBridge(BridgeConfiguration config) {
+
+   }
+
+   /**
+    * After a bridge has been deployed
+    *
+    * @param bridge The newly deployed bridge
+    */
+   default void afterDeployBridge(Bridge bridge) {
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 8d27895..60b9b74 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -166,6 +166,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       // Add optional security for tests that need it
       configureBrokerSecurity(server);
 
+      // Add extra configuration
+      addConfiguration(server);
+
       server.start();
 
       // Prepare all addresses and queues for client tests.
@@ -174,6 +177,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       return server;
    }
 
+   protected void addConfiguration(ActiveMQServer server) {
+
+   }
+
    protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
       HashMap<String, Object> params = new HashMap<>();
       params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 201a96b..da695ca 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
-import javax.management.MBeanServer;
 import java.lang.management.ManagementFactory;
 import java.util.LinkedList;
 import java.util.Map;
@@ -26,10 +25,12 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+import javax.management.MBeanServer;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Interceptor;
-
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -53,7 +54,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
@@ -234,8 +234,10 @@ public class HangConsumerTest extends ActiveMQTestBase {
                              final PostOffice postOffice,
                              final StorageManager storageManager,
                              final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                             final Executor executor) {
-            super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+                             final Executor executor, final ActiveMQServer server) {
+            super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode,
+                  maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager,
+                  addressSettingsRepository, executor, server);
          }
 
          @Override
@@ -256,13 +258,18 @@ public class HangConsumerTest extends ActiveMQTestBase {
          LocalFactory(final ExecutorFactory executorFactory,
                       final ScheduledExecutorService scheduledExecutor,
                       final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                      final StorageManager storageManager) {
-            super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager);
+                      final StorageManager storageManager, final ActiveMQServer server) {
+            super(executorFactory, scheduledExecutor, addressSettingsRepository, storageManager, server);
          }
 
          @Override
          public Queue createQueueWith(final QueueConfig config) {
-            queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+            queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(),
+                                            config.user(), config.pageSubscription(), config.isDurable(),
+                                            config.isTemporary(), config.isAutoCreated(), config.deliveryMode(),
+                                            config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor,
+                                            postOffice, storageManager, addressSettingsRepository,
+                                            executorFactory.getExecutor(), server);
             return queue;
          }
 
@@ -277,13 +284,18 @@ public class HangConsumerTest extends ActiveMQTestBase {
                                   final boolean durable,
                                   final boolean temporary,
                                   final boolean autoCreated) {
-            queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+            queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable,
+                                            temporary, autoCreated, RoutingType.MULTICAST, null, null,
+                                            scheduledExecutor, postOffice, storageManager, addressSettingsRepository,
+                                            executorFactory.getExecutor(), server);
             return queue;
          }
 
       }
 
-      LocalFactory queueFactory = new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(), server.getAddressSettingsRepository(), server.getStorageManager());
+      LocalFactory queueFactory =
+               new LocalFactory(server.getExecutorFactory(), server.getScheduledPool(),
+                                server.getAddressSettingsRepository(), server.getStorageManager(), server);
 
       queueFactory.setPostOffice(server.getPostOffice());
 
@@ -359,7 +371,10 @@ public class HangConsumerTest extends ActiveMQTestBase {
       long txID = server.getStorageManager().generateID();
 
       // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
-      LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID());
+      LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE,
+                                                           new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false,
+                                                                         false, null, null, null, null, null, null),
+                                                           server.getNodeID());
       server.getStorageManager().addQueueBinding(txID, newBinding);
       server.getStorageManager().commitBindings(txID);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
index 540baf6..44015e1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
@@ -29,9 +23,17 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -52,7 +54,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -518,7 +519,8 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
                         StorageManager storageManager,
                         HierarchicalRepository<AddressSettings> addressSettingsRepository,
                         Executor executor) {
-            super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+            super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor,
+                  postOffice, storageManager, addressSettingsRepository, executor, null);
          }
 
          @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
index f8094a1..63743ed 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
@@ -16,14 +16,15 @@
  */
 package org.apache.activemq.artemis.tests.integration.jms.client;
 
+import java.util.List;
+import java.util.Map;
+
 import javax.jms.Connection;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
@@ -81,7 +82,12 @@ public class TopicCleanupTest extends JMSTestBase {
          for (int i = 0; i < 100; i++) {
             long txid = storage.generateID();
 
-            final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"), SimpleString.toSimpleString("topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor());
+            final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("topic"),
+                                              SimpleString.toSimpleString("topic"),
+                                              FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null,
+                                              true, false, false, server.getScheduledPool(), server.getPostOffice(),
+                                              storage, server.getAddressSettingsRepository(),
+                                              server.getExecutorFactory().getExecutor(), server);
 
             LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
index 056891a..bd8cfd8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JmsResourceProvider.java
@@ -45,7 +45,7 @@ public class JmsResourceProvider {
    /**
     * Creates a connection.
     *
-    * @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory)
+    * @see org.apache.activemq.test.JmsResourceProvider#afterCreateConnection(javax.jms.ConnectionFactory)
     */
    public Connection createConnection(ConnectionFactory cf) throws JMSException {
       Connection connection = cf.createConnection();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
new file mode 100644
index 0000000..d918b27
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ */
+public class AmqpPluginTest extends AmqpClientTestSupport {
+
+   protected static final Logger LOG = LoggerFactory.getLogger(AmqpPluginTest.class);
+
+   private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+   private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      super.addConfiguration(server);
+      server.registerBrokerPlugin(verifier);
+   }
+
+   @Test(timeout = 60000)
+   public void testQueueReceiverReadAndAckMessage() throws Exception {
+      sendMessages(getQueueName(), 1);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(message);
+      message.accept();
+      receiver.close();
+      connection.close();
+
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
+            BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION,
+            BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+            BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND,
+            AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
+   }
+
+   @Override
+   public void sendMessages(String destinationName, int count) throws Exception {
+      sendMessages(destinationName, count, null);
+   }
+
+   @Override
+   public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("MessageID:" + i);
+            if (routingType != null) {
+               message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
+            }
+            sender.send(message);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+}


Mime
View raw message