activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gaohow...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-857 Add JMX endpoints to view and reset groups
Date Tue, 17 Apr 2018 08:37:47 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 7156b6140 -> 1118bdb65


ARTEMIS-857 Add JMX endpoints to view and reset groups

Expose method to return current mappings of groups to consumers
Expose methods to reset (remove) specific group mapping from groupID to Consumer
Expose methods to reset (remove) all group mappings

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/af91d3ac
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/af91d3ac
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/af91d3ac

Branch: refs/heads/master
Commit: af91d3ac8263ea5075eb0eedc5aed971b9db1e28
Parents: 7156b61
Author: Michael André Pearce <michael.andre.pearce@me.com>
Authored: Mon Apr 16 17:11:44 2018 +0100
Committer: Howard Gao <howard.gao@gmail.com>
Committed: Tue Apr 17 15:54:33 2018 +0800

----------------------------------------------------------------------
 .../api/core/management/QueueControl.java       | 24 ++++++++
 .../core/management/impl/QueueControlImpl.java  | 64 ++++++++++++++++++++
 .../activemq/artemis/core/server/Queue.java     |  8 +++
 .../artemis/core/server/impl/QueueImpl.java     | 20 ++++++
 .../impl/ScheduledDeliveryHandlerTest.java      | 20 ++++++
 .../management/ManagementTestBase.java          |  6 ++
 .../management/QueueControlTest.java            | 42 +++++++++++++
 .../management/QueueControlUsingCoreTest.java   | 32 ++++++++++
 .../unit/core/postoffice/impl/FakeQueue.java    | 21 +++++++
 9 files changed, 237 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af91d3ac/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 2578684..2aafcb1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -576,4 +576,28 @@ public interface QueueControl {
    @Operation(desc = "Flush internal executors", impact = MBeanOperationInfo.ACTION)
    void flushExecutor();
 
+   /**
+    * Will reset the all the groups.
+    * This is useful if you want a complete rebalance of the groups to consumers
+    */
+   @Operation(desc = "Resets all groups", impact = MBeanOperationInfo.ACTION)
+   void resetAllGroups();
+
+   /**
+    * Will reset the group matching the given groupID.
+    * This is useful if you want the given group to be rebalanced to the consumers
+    */
+   @Operation(desc = "Reset the specified group", impact = MBeanOperationInfo.ACTION)
+   void resetGroup(@Parameter(name = "groupID", desc = "ID of group to reset") String groupID);
+
+   /**
+    * Will return the current number of active groups.
+    */
+   @Attribute(desc = "Get the current number of active groups")
+   int getGroupCount();
+
+
+   @Operation(desc = "List all the existent group to consumers mappings on the Queue")
+   String listGroupsAsJSON() throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af91d3ac/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index e678ab8..8963b24 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1190,6 +1190,70 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
{
    }
 
    @Override
+   public void resetAllGroups() {
+      checkStarted();
+
+      clearIO();
+      try {
+         queue.resetAllGroups();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public void resetGroup(String groupID) {
+      checkStarted();
+
+      clearIO();
+      try {
+         queue.resetGroup(SimpleString.toSimpleString(groupID));
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public int getGroupCount() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getGroupCount();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public String listGroupsAsJSON() throws Exception {
+      checkStarted();
+
+      clearIO();
+      try {
+         Map<SimpleString, Consumer> groups = queue.getGroups();
+
+         JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
+
+         for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
+
+            if (group.getValue() instanceof ServerConsumer) {
+               ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
+
+               JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID",
serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID",
serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime",
serverConsumer.getCreationTime());
+
+               jsonArray.add(obj);
+            }
+
+         }
+
+         return jsonArray.build().toString();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public String listConsumersAsJSON() throws Exception {
       checkStarted();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af91d3ac/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index a235352..c549be9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -255,6 +255,14 @@ public interface Queue extends Bindable,CriticalComponent {
 
    Collection<Consumer> getConsumers();
 
+   Map<SimpleString, Consumer> getGroups();
+
+   void resetGroup(SimpleString groupID);
+
+   void resetAllGroups();
+
+   int getGroupCount();
+
    boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay)
throws Exception;
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af91d3ac/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 957c2bb..bf86823 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1039,6 +1039,26 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
+   public synchronized Map<SimpleString, Consumer> getGroups() {
+      return new HashMap<>(groups);
+   }
+
+   @Override
+   public synchronized void resetGroup(SimpleString groupId) {
+      groups.remove(groupId);
+   }
+
+   @Override
+   public synchronized void resetAllGroups() {
+      groups.clear();
+   }
+
+   @Override
+   public synchronized int getGroupCount() {
+      return groups.size();
+   }
+
+   @Override
    public boolean hasMatchingConsumer(final Message message) {
       for (ConsumerHolder holder : consumerList) {
          Consumer consumer = holder.consumer;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af91d3ac/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index fdce2d0..9005898 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1243,6 +1243,26 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public Map<SimpleString, Consumer> getGroups() {
+         return null;
+      }
+
+      @Override
+      public void resetGroup(SimpleString groupID) {
+
+      }
+
+      @Override
+      public void resetAllGroups() {
+
+      }
+
+      @Override
+      public int getGroupCount() {
+         return 0;
+      }
+
+      @Override
       public boolean checkRedelivery(MessageReference ref,
                                      long timeBase,
                                      boolean ignoreRedeliveryDelay) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af91d3ac/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
index 9967e76..baaaab3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
@@ -122,6 +122,12 @@ public abstract class ManagementTestBase extends ActiveMQTestBase {
       return control.getMessageCount();
    }
 
+   protected int getGroupCount(QueueControl control) throws Exception {
+      control.flushExecutor();
+      return control.getGroupCount();
+   }
+
+
    protected long getDurableMessageCount(QueueControl control) throws Exception {
       control.flushExecutor();
       return control.getDurableMessageCount();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af91d3ac/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 5a49e2a..7bef025 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -2511,6 +2511,48 @@ public class QueueControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testResetGroups() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+
+      ClientConsumer consumer = session.createConsumer(queue);
+      Assert.assertEquals(1, queueControl.getConsumerCount());
+      consumer.setMessageHandler(new MessageHandler() {
+         @Override
+         public void onMessage(ClientMessage message) {
+            System.out.println(message);
+         }
+      });
+      session.start();
+
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID,
"group1"));
+      producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID,
"group2"));
+      producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID,
"group3"));
+
+      Wait.assertEquals(3, () -> getGroupCount(queueControl));
+
+      queueControl.resetGroup("group1");
+
+      Wait.assertEquals(2, () -> getGroupCount(queueControl));
+
+      producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID,
"group1"));
+
+      Wait.assertEquals(3, () -> getGroupCount(queueControl));
+
+      queueControl.resetAllGroups();
+
+      Wait.assertEquals(0, () -> getGroupCount(queueControl));
+
+      consumer.close();
+      session.deleteQueue(queue);
+   }
+
+   @Test
    public void testGetScheduledCountOnRemove() throws Exception {
       long delay = Integer.MAX_VALUE;
       SimpleString address = RandomUtil.randomSimpleString();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af91d3ac/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index aafbb5b..3060799 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -50,6 +50,38 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public void resetAllGroups() {
+            try {
+               proxy.invokeOperation("resetAllGroups");
+            } catch (Exception e) {
+               throw new RuntimeException(e.getMessage(), e);
+            }
+         }
+
+         @Override
+         public void resetGroup(String groupID) {
+            try {
+               proxy.invokeOperation("resetGroup");
+            } catch (Exception e) {
+               throw new RuntimeException(e.getMessage(), e);
+            }
+         }
+
+         @Override
+         public int getGroupCount() {
+            try {
+               return (Integer) proxy.invokeOperation("groupCount");
+            } catch (Exception e) {
+               throw new RuntimeException(e.getMessage(), e);
+            }
+         }
+
+         @Override
+         public String listGroupsAsJSON() throws Exception {
+            return (String) proxy.invokeOperation("listGroupsAsJSON");
+         }
+
+         @Override
          public boolean changeMessagePriority(final long messageID, final int newPriority)
throws Exception {
             return (Boolean) proxy.invokeOperation("changeMessagePriority", messageID, newPriority);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af91d3ac/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index bc628f5..5c37aec 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -342,6 +342,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
+
    public int getConsumerCount() {
       // no-op
       return 0;
@@ -359,6 +360,26 @@ public class FakeQueue extends CriticalComponentImpl implements Queue
{
    }
 
    @Override
+   public Map<SimpleString, Consumer> getGroups() {
+      return null;
+   }
+
+   @Override
+   public void resetGroup(SimpleString groupID) {
+
+   }
+
+   @Override
+   public void resetAllGroups() {
+
+   }
+
+   @Override
+   public int getGroupCount() {
+      return 0;
+   }
+
+   @Override
    public int getDeliveringCount() {
       // no-op
       return 0;


Mime
View raw message