activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [15/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:45 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSConnectionFactoryControlImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSConnectionFactoryControlImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSConnectionFactoryControlImpl.java
new file mode 100644
index 0000000..74fa619
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSConnectionFactoryControlImpl.java
@@ -0,0 +1,471 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.management.impl;
+
+import javax.management.MBeanInfo;
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+
+import org.apache.activemq6.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.api.core.management.Parameter;
+import org.apache.activemq6.api.jms.management.ConnectionFactoryControl;
+import org.apache.activemq6.core.management.impl.MBeanInfoHelper;
+import org.apache.activemq6.jms.client.HornetQConnectionFactory;
+import org.apache.activemq6.jms.server.JMSServerManager;
+import org.apache.activemq6.jms.server.config.ConnectionFactoryConfiguration;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class JMSConnectionFactoryControlImpl extends StandardMBean implements ConnectionFactoryControl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final ConnectionFactoryConfiguration cfConfig;
+
+   private HornetQConnectionFactory cf;
+
+   private final String name;
+
+   private final JMSServerManager jmsManager;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public JMSConnectionFactoryControlImpl(final ConnectionFactoryConfiguration cfConfig,
+                                          final HornetQConnectionFactory cf,
+                                          final JMSServerManager jmsManager,
+                                          final String name) throws NotCompliantMBeanException
+   {
+      super(ConnectionFactoryControl.class);
+      this.cfConfig = cfConfig;
+      this.cf = cf;
+      this.name = name;
+      this.jmsManager = jmsManager;
+   }
+
+   // Public --------------------------------------------------------
+
+   // ManagedConnectionFactoryMBean implementation ------------------
+
+   public String[] getJNDIBindings()
+   {
+      return jmsManager.getJNDIOnConnectionFactory(name);
+   }
+
+   public boolean isCompressLargeMessages()
+   {
+      return cf.isCompressLargeMessage();
+   }
+
+   public void setCompressLargeMessages(final boolean compress)
+   {
+      cfConfig.setCompressLargeMessages(compress);
+      recreateCF();
+   }
+
+   public boolean isHA()
+   {
+      return cfConfig.isHA();
+   }
+
+   public int getFactoryType()
+   {
+      return cfConfig.getFactoryType().intValue();
+   }
+
+   public String getClientID()
+   {
+      return cfConfig.getClientID();
+   }
+
+   public long getClientFailureCheckPeriod()
+   {
+      return cfConfig.getClientFailureCheckPeriod();
+   }
+
+   public void setClientID(String clientID)
+   {
+      cfConfig.setClientID(clientID);
+      recreateCF();
+   }
+
+   public void setDupsOKBatchSize(int dupsOKBatchSize)
+   {
+      cfConfig.setDupsOKBatchSize(dupsOKBatchSize);
+      recreateCF();
+   }
+
+   public void setTransactionBatchSize(int transactionBatchSize)
+   {
+      cfConfig.setTransactionBatchSize(transactionBatchSize);
+      recreateCF();
+   }
+
+   public void setClientFailureCheckPeriod(long clientFailureCheckPeriod)
+   {
+      cfConfig.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+      recreateCF();
+   }
+
+   public void setConnectionTTL(long connectionTTL)
+   {
+      cfConfig.setConnectionTTL(connectionTTL);
+      recreateCF();
+   }
+
+   public void setCallTimeout(long callTimeout)
+   {
+      cfConfig.setCallTimeout(callTimeout);
+      recreateCF();
+   }
+
+   public void setCallFailoverTimeout(long callTimeout)
+   {
+      cfConfig.setCallFailoverTimeout(callTimeout);
+      recreateCF();
+   }
+
+   public void setConsumerWindowSize(int consumerWindowSize)
+   {
+      cfConfig.setConsumerWindowSize(consumerWindowSize);
+      recreateCF();
+   }
+
+   public void setConsumerMaxRate(int consumerMaxRate)
+   {
+      cfConfig.setConsumerMaxRate(consumerMaxRate);
+      recreateCF();
+   }
+
+   public void setConfirmationWindowSize(int confirmationWindowSize)
+   {
+      cfConfig.setConfirmationWindowSize(confirmationWindowSize);
+      recreateCF();
+   }
+
+   public void setProducerMaxRate(int producerMaxRate)
+   {
+      cfConfig.setProducerMaxRate(producerMaxRate);
+      recreateCF();
+   }
+
+   public int getProducerWindowSize()
+   {
+      return cfConfig.getProducerWindowSize();
+   }
+
+   public void setProducerWindowSize(int producerWindowSize)
+   {
+      cfConfig.setProducerWindowSize(producerWindowSize);
+      recreateCF();
+   }
+
+   public void setCacheLargeMessagesClient(boolean cacheLargeMessagesClient)
+   {
+      cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient);
+      recreateCF();
+   }
+
+   public boolean isCacheLargeMessagesClient()
+   {
+      return cfConfig.isCacheLargeMessagesClient();
+   }
+
+   public void setMinLargeMessageSize(int minLargeMessageSize)
+   {
+      cfConfig.setMinLargeMessageSize(minLargeMessageSize);
+      recreateCF();
+   }
+
+   public void setBlockOnNonDurableSend(boolean blockOnNonDurableSend)
+   {
+      cfConfig.setBlockOnNonDurableSend(blockOnNonDurableSend);
+      recreateCF();
+   }
+
+   public void setBlockOnAcknowledge(boolean blockOnAcknowledge)
+   {
+      cfConfig.setBlockOnAcknowledge(blockOnAcknowledge);
+      recreateCF();
+   }
+
+   public void setBlockOnDurableSend(boolean blockOnDurableSend)
+   {
+      cfConfig.setBlockOnDurableSend(blockOnDurableSend);
+      recreateCF();
+   }
+
+   public void setAutoGroup(boolean autoGroup)
+   {
+      cfConfig.setAutoGroup(autoGroup);
+      recreateCF();
+   }
+
+   public void setPreAcknowledge(boolean preAcknowledge)
+   {
+      cfConfig.setPreAcknowledge(preAcknowledge);
+      recreateCF();
+   }
+
+   public void setMaxRetryInterval(long retryInterval)
+   {
+      cfConfig.setMaxRetryInterval(retryInterval);
+      recreateCF();
+   }
+
+   public void setRetryIntervalMultiplier(double retryIntervalMultiplier)
+   {
+      cfConfig.setRetryIntervalMultiplier(retryIntervalMultiplier);
+      recreateCF();
+   }
+
+   public void setReconnectAttempts(int reconnectAttempts)
+   {
+      cfConfig.setReconnectAttempts(reconnectAttempts);
+      recreateCF();
+   }
+
+   public void setFailoverOnInitialConnection(boolean failover)
+   {
+      cfConfig.setFailoverOnInitialConnection(failover);
+      recreateCF();
+   }
+
+   public boolean isUseGlobalPools()
+   {
+      return cfConfig.isUseGlobalPools();
+   }
+
+   public void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize)
+   {
+      cfConfig.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
+      recreateCF();
+   }
+
+   public int getThreadPoolMaxSize()
+   {
+      return cfConfig.getThreadPoolMaxSize();
+   }
+
+   public void setThreadPoolMaxSize(int threadPoolMaxSize)
+   {
+      cfConfig.setThreadPoolMaxSize(threadPoolMaxSize);
+      recreateCF();
+   }
+
+   public int getInitialMessagePacketSize()
+   {
+      return cf.getInitialMessagePacketSize();
+   }
+
+   public void setGroupID(String groupID)
+   {
+      cfConfig.setGroupID(groupID);
+      recreateCF();
+   }
+
+   public String getGroupID()
+   {
+      return cfConfig.getGroupID();
+   }
+
+   public void setUseGlobalPools(boolean useGlobalPools)
+   {
+      cfConfig.setUseGlobalPools(useGlobalPools);
+      recreateCF();
+   }
+
+   public int getScheduledThreadPoolMaxSize()
+   {
+      return cfConfig.getScheduledThreadPoolMaxSize();
+   }
+
+   public void setRetryInterval(long retryInterval)
+   {
+      cfConfig.setRetryInterval(retryInterval);
+      recreateCF();
+   }
+
+   public long getMaxRetryInterval()
+   {
+      return cfConfig.getMaxRetryInterval();
+   }
+
+   public String getConnectionLoadBalancingPolicyClassName()
+   {
+      return cfConfig.getLoadBalancingPolicyClassName();
+   }
+
+   public void setConnectionLoadBalancingPolicyClassName(String name)
+   {
+      cfConfig.setLoadBalancingPolicyClassName(name);
+      recreateCF();
+   }
+
+   public TransportConfiguration[] getStaticConnectors()
+   {
+      return cf.getStaticConnectors();
+   }
+
+   public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
+   {
+      return cf.getDiscoveryGroupConfiguration();
+   }
+
+   public void addJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception
+   {
+      jmsManager.addConnectionFactoryToJNDI(name, jndi);
+   }
+
+   public void removeJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception
+   {
+      jmsManager.removeConnectionFactoryFromJNDI(name, jndi);
+   }
+
+   public long getCallTimeout()
+   {
+      return cfConfig.getCallTimeout();
+   }
+
+   public long getCallFailoverTimeout()
+   {
+      return cfConfig.getCallFailoverTimeout();
+   }
+
+   public int getConsumerMaxRate()
+   {
+      return cfConfig.getConsumerMaxRate();
+   }
+
+   public int getConsumerWindowSize()
+   {
+      return cfConfig.getConsumerWindowSize();
+   }
+
+   public int getProducerMaxRate()
+   {
+      return cfConfig.getProducerMaxRate();
+   }
+
+   public int getConfirmationWindowSize()
+   {
+      return cfConfig.getConfirmationWindowSize();
+   }
+
+   public int getDupsOKBatchSize()
+   {
+      return cfConfig.getDupsOKBatchSize();
+   }
+
+   public boolean isBlockOnAcknowledge()
+   {
+      return cfConfig.isBlockOnAcknowledge();
+   }
+
+   public boolean isBlockOnNonDurableSend()
+   {
+      return cfConfig.isBlockOnNonDurableSend();
+   }
+
+   public boolean isBlockOnDurableSend()
+   {
+      return cfConfig.isBlockOnDurableSend();
+   }
+
+   public boolean isPreAcknowledge()
+   {
+      return cfConfig.isPreAcknowledge();
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public long getConnectionTTL()
+   {
+      return cfConfig.getConnectionTTL();
+   }
+
+   public int getReconnectAttempts()
+   {
+      return cfConfig.getReconnectAttempts();
+   }
+
+   public boolean isFailoverOnInitialConnection()
+   {
+      return cfConfig.isFailoverOnInitialConnection();
+   }
+
+   public int getMinLargeMessageSize()
+   {
+      return cfConfig.getMinLargeMessageSize();
+   }
+
+   public long getRetryInterval()
+   {
+      return cfConfig.getRetryInterval();
+   }
+
+   public double getRetryIntervalMultiplier()
+   {
+      return cfConfig.getRetryIntervalMultiplier();
+   }
+
+   public int getTransactionBatchSize()
+   {
+      return cfConfig.getTransactionBatchSize();
+   }
+
+   public boolean isAutoGroup()
+   {
+      return cfConfig.isAutoGroup();
+   }
+
+   @Override
+   public MBeanInfo getMBeanInfo()
+   {
+      MBeanInfo info = super.getMBeanInfo();
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(ConnectionFactoryControl.class),
+                           info.getNotifications());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   private void recreateCF()
+   {
+      try
+      {
+         this.cf = jmsManager.recreateCF(this.name, this.cfConfig);
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e.getMessage(), e);
+      }
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSQueueControlImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSQueueControlImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSQueueControlImpl.java
new file mode 100644
index 0000000..f15eaff
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSQueueControlImpl.java
@@ -0,0 +1,389 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.management.impl;
+
+import javax.management.MBeanInfo;
+import javax.management.StandardMBean;
+import java.util.Map;
+
+import org.apache.activemq6.api.core.FilterConstants;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.management.MessageCounterInfo;
+import org.apache.activemq6.api.core.management.Operation;
+import org.apache.activemq6.api.core.management.QueueControl;
+import org.apache.activemq6.api.jms.management.JMSQueueControl;
+import org.apache.activemq6.core.management.impl.MBeanInfoHelper;
+import org.apache.activemq6.core.messagecounter.MessageCounter;
+import org.apache.activemq6.core.messagecounter.impl.MessageCounterHelper;
+import org.apache.activemq6.jms.client.HornetQDestination;
+import org.apache.activemq6.jms.client.HornetQMessage;
+import org.apache.activemq6.jms.client.SelectorTranslator;
+import org.apache.activemq6.jms.server.JMSServerManager;
+import org.apache.activemq6.utils.json.JSONArray;
+import org.apache.activemq6.utils.json.JSONObject;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class JMSQueueControlImpl extends StandardMBean implements JMSQueueControl
+{
+   private final HornetQDestination managedQueue;
+
+   private final JMSServerManager jmsServerManager;
+
+   private final QueueControl coreQueueControl;
+
+   private final MessageCounter counter;
+
+   // Static --------------------------------------------------------
+
+   /**
+    * Returns null if the string is null or empty
+    */
+   public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
+   {
+      return selectorStr == null || selectorStr.trim().length() == 0 ? null
+         : SelectorTranslator.convertToHornetQFilterString(selectorStr);
+   }
+
+   private static String createFilterForJMSMessageID(final String jmsMessageID) throws Exception
+   {
+      return FilterConstants.HORNETQ_USERID + " = '" + jmsMessageID + "'";
+   }
+
+   static String toJSON(final Map<String, Object>[] messages)
+   {
+      JSONArray array = new JSONArray();
+      for (Map<String, Object> message : messages)
+      {
+         array.put(new JSONObject(message));
+      }
+      return array.toString();
+   }
+
+   // Constructors --------------------------------------------------
+
+   public JMSQueueControlImpl(final HornetQDestination managedQueue,
+                              final QueueControl coreQueueControl,
+                              final JMSServerManager jmsServerManager,
+                              final MessageCounter counter) throws Exception
+   {
+      super(JMSQueueControl.class);
+      this.managedQueue = managedQueue;
+      this.jmsServerManager = jmsServerManager;
+      this.coreQueueControl = coreQueueControl;
+      this.counter = counter;
+   }
+
+   // Public --------------------------------------------------------
+
+   // ManagedJMSQueueMBean implementation ---------------------------
+
+   public String getName()
+   {
+      return managedQueue.getName();
+   }
+
+   public String getAddress()
+   {
+      return managedQueue.getAddress();
+   }
+
+   public boolean isTemporary()
+   {
+      return managedQueue.isTemporary();
+   }
+
+   public long getMessageCount()
+   {
+      return coreQueueControl.getMessageCount();
+   }
+
+   public long getMessagesAdded()
+   {
+      return coreQueueControl.getMessagesAdded();
+   }
+
+   public int getConsumerCount()
+   {
+      return coreQueueControl.getConsumerCount();
+   }
+
+   public int getDeliveringCount()
+   {
+      return coreQueueControl.getDeliveringCount();
+   }
+
+   public long getScheduledCount()
+   {
+      return coreQueueControl.getScheduledCount();
+   }
+
+   public boolean isDurable()
+   {
+      return coreQueueControl.isDurable();
+   }
+
+   public String getDeadLetterAddress()
+   {
+      return coreQueueControl.getDeadLetterAddress();
+   }
+
+   public void setDeadLetterAddress(final String deadLetterAddress) throws Exception
+   {
+      coreQueueControl.setDeadLetterAddress(deadLetterAddress);
+   }
+
+   public String getExpiryAddress()
+   {
+      return coreQueueControl.getExpiryAddress();
+   }
+
+   public void setExpiryAddress(final String expiryAddress) throws Exception
+   {
+      coreQueueControl.setExpiryAddress(expiryAddress);
+   }
+
+   @Override
+   public void addJNDI(String jndi) throws Exception
+   {
+      jmsServerManager.addQueueToJndi(managedQueue.getName(), jndi);
+   }
+
+   public void removeJNDI(String jndi) throws Exception
+   {
+      jmsServerManager.removeQueueFromJNDI(managedQueue.getName(), jndi);
+   }
+
+   public String[] getJNDIBindings()
+   {
+      return jmsServerManager.getJNDIOnQueue(managedQueue.getName());
+   }
+
+   public boolean removeMessage(final String messageID) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
+      int removed = coreQueueControl.removeMessages(filter);
+      if (removed != 1)
+      {
+         throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
+      }
+      return true;
+   }
+
+   public int removeMessages(final String filterStr) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
+      return coreQueueControl.removeMessages(filter);
+   }
+
+   public Map<String, Object>[] listMessages(final String filterStr) throws Exception
+   {
+      try
+      {
+         String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
+         Map<String, Object>[] coreMessages = coreQueueControl.listMessages(filter);
+
+         Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
+
+         int i = 0;
+
+         for (Map<String, Object> coreMessage : coreMessages)
+         {
+            Map<String, Object> jmsMessage = HornetQMessage.coreMaptoJMSMap(coreMessage);
+            jmsMessages[i++] = jmsMessage;
+         }
+         return jmsMessages;
+      }
+      catch (HornetQException e)
+      {
+         throw new IllegalStateException(e.getMessage());
+      }
+   }
+
+   public String listMessagesAsJSON(final String filter) throws Exception
+   {
+      return JMSQueueControlImpl.toJSON(listMessages(filter));
+   }
+
+   public long countMessages(final String filterStr) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
+      return coreQueueControl.countMessages(filter);
+   }
+
+   public boolean expireMessage(final String messageID) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
+      int expired = coreQueueControl.expireMessages(filter);
+      if (expired != 1)
+      {
+         throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
+      }
+      return true;
+   }
+
+   public int expireMessages(final String filterStr) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
+      return coreQueueControl.expireMessages(filter);
+   }
+
+   public boolean sendMessageToDeadLetterAddress(final String messageID) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
+      int dead = coreQueueControl.sendMessagesToDeadLetterAddress(filter);
+      if (dead != 1)
+      {
+         throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
+      }
+      return true;
+   }
+
+   public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
+      return coreQueueControl.sendMessagesToDeadLetterAddress(filter);
+   }
+
+   public boolean changeMessagePriority(final String messageID, final int newPriority) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
+      int changed = coreQueueControl.changeMessagesPriority(filter, newPriority);
+      if (changed != 1)
+      {
+         throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
+      }
+      return true;
+   }
+
+   public int changeMessagesPriority(final String filterStr, final int newPriority) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
+      return coreQueueControl.changeMessagesPriority(filter, newPriority);
+   }
+
+   public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception
+   {
+      return moveMessage(messageID, otherQueueName, false);
+   }
+
+   public boolean moveMessage(final String messageID, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
+      HornetQDestination otherQueue = HornetQDestination.createQueue(otherQueueName);
+      int moved = coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates);
+      if (moved != 1)
+      {
+         throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
+      }
+
+      return true;
+   }
+
+   public int moveMessages(final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+   {
+      String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
+      HornetQDestination otherQueue = HornetQDestination.createQueue(otherQueueName);
+      return coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates);
+   }
+
+
+   public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
+   {
+      return moveMessages(filterStr, otherQueueName, false);
+   }
+
+   @Operation(desc = "List all the existent consumers on the Queue")
+   public String listConsumersAsJSON() throws Exception
+   {
+      return coreQueueControl.listConsumersAsJSON();
+   }
+
+   public String listMessageCounter()
+   {
+      try
+      {
+         return MessageCounterInfo.toJSon(counter);
+      }
+      catch (Exception e)
+      {
+         throw new IllegalStateException(e);
+      }
+   }
+
+   public void resetMessageCounter() throws Exception
+   {
+      coreQueueControl.resetMessageCounter();
+   }
+
+   public String listMessageCounterAsHTML()
+   {
+      return MessageCounterHelper.listMessageCounterAsHTML(new MessageCounter[]{counter});
+   }
+
+   public String listMessageCounterHistory() throws Exception
+   {
+      return MessageCounterHelper.listMessageCounterHistory(counter);
+   }
+
+   public String listMessageCounterHistoryAsHTML()
+   {
+      return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[]{counter});
+   }
+
+   public boolean isPaused() throws Exception
+   {
+      return coreQueueControl.isPaused();
+   }
+
+   public void pause() throws Exception
+   {
+      coreQueueControl.pause();
+   }
+
+   public void resume() throws Exception
+   {
+      coreQueueControl.resume();
+   }
+
+   public String getSelector()
+   {
+      return coreQueueControl.getFilter();
+   }
+
+   public void flushExecutor()
+   {
+      coreQueueControl.flushExecutor();
+   }
+
+   @Override
+   public MBeanInfo getMBeanInfo()
+   {
+      MBeanInfo info = super.getMBeanInfo();
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(JMSQueueControl.class),
+                           info.getNotifications());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSServerControlImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSServerControlImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSServerControlImpl.java
new file mode 100644
index 0000000..aa8da34
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSServerControlImpl.java
@@ -0,0 +1,1049 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.management.impl;
+
+import javax.jms.JMSRuntimeException;
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.NotificationEmitter;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq6.api.core.management.Parameter;
+import org.apache.activemq6.api.jms.JMSFactoryType;
+import org.apache.activemq6.api.jms.management.ConnectionFactoryControl;
+import org.apache.activemq6.api.jms.management.DestinationControl;
+import org.apache.activemq6.api.jms.management.JMSQueueControl;
+import org.apache.activemq6.api.jms.management.JMSServerControl;
+import org.apache.activemq6.api.jms.management.TopicControl;
+import org.apache.activemq6.core.filter.Filter;
+import org.apache.activemq6.core.management.impl.AbstractControl;
+import org.apache.activemq6.core.management.impl.MBeanInfoHelper;
+import org.apache.activemq6.core.server.ServerConsumer;
+import org.apache.activemq6.core.server.ServerSession;
+import org.apache.activemq6.jms.client.HornetQDestination;
+import org.apache.activemq6.jms.server.HornetQJMSServerLogger;
+import org.apache.activemq6.jms.server.JMSServerManager;
+import org.apache.activemq6.jms.server.config.ConnectionFactoryConfiguration;
+import org.apache.activemq6.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+import org.apache.activemq6.jms.server.management.JMSNotificationType;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+import org.apache.activemq6.utils.TypedProperties;
+import org.apache.activemq6.utils.json.JSONArray;
+import org.apache.activemq6.utils.json.JSONObject;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class JMSServerControlImpl extends AbstractControl implements JMSServerControl, NotificationEmitter,
+                                                                     org.apache.activemq6.core.server.management.NotificationListener
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final JMSServerManager server;
+
+   private final NotificationBroadcasterSupport broadcaster;
+
+   private final AtomicLong notifSeq = new AtomicLong(0);
+
+   // Static --------------------------------------------------------
+
+   private static String[] convert(final Object[] jndiBindings)
+   {
+      String[] bindings = new String[jndiBindings.length];
+      for (int i = 0, jndiBindingsLength = jndiBindings.length; i < jndiBindingsLength; i++)
+      {
+         bindings[i] = jndiBindings[i].toString().trim();
+      }
+      return bindings;
+   }
+
+   private static String[] toArray(final String commaSeparatedString)
+   {
+      if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0)
+      {
+         return new String[0];
+      }
+      String[] values = commaSeparatedString.split(",");
+      String[] trimmed = new String[values.length];
+      for (int i = 0; i < values.length; i++)
+      {
+         trimmed[i] = values[i].trim();
+         trimmed[i] = trimmed[i].replace("&comma;", ",");
+      }
+      return trimmed;
+   }
+
+   private static String[] determineJMSDestination(String coreAddress)
+   {
+      String[] result = new String[2]; // destination name & type
+      if (coreAddress.startsWith(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX))
+      {
+         result[0] = coreAddress.substring(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX.length());
+         result[1] = "queue";
+      }
+      else if (coreAddress.startsWith(HornetQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+      {
+         result[0] = coreAddress.substring(HornetQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length());
+         result[1] = "tempqueue";
+      }
+      else if (coreAddress.startsWith(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX))
+      {
+         result[0] = coreAddress.substring(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX.length());
+         result[1] = "topic";
+      }
+      else if (coreAddress.startsWith(HornetQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+      {
+         result[0] = coreAddress.substring(HornetQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length());
+         result[1] = "temptopic";
+      }
+      else
+      {
+         HornetQJMSServerLogger.LOGGER.debug("JMSServerControlImpl.determineJMSDestination()" + coreAddress);
+         // not related to JMS
+         return null;
+      }
+      return result;
+   }
+
+   public static MBeanNotificationInfo[] getNotificationInfos()
+   {
+      JMSNotificationType[] values = JMSNotificationType.values();
+      String[] names = new String[values.length];
+      for (int i = 0; i < values.length; i++)
+      {
+         names[i] = values[i].toString();
+      }
+      return new MBeanNotificationInfo[]{new MBeanNotificationInfo(names,
+                                                                   JMSServerControl.class.getName(),
+                                                                   "Notifications emitted by a JMS Server")};
+   }
+
+   // Constructors --------------------------------------------------
+
+   public JMSServerControlImpl(final JMSServerManager server) throws Exception
+   {
+      super(JMSServerControl.class, server.getHornetQServer().getStorageManager());
+      this.server = server;
+      broadcaster = new NotificationBroadcasterSupport();
+      server.getHornetQServer().getManagementService().addNotificationListener(this);
+   }
+
+   // Public --------------------------------------------------------
+
+   // JMSServerControlMBean implementation --------------------------
+
+   /**
+    * See the interface definition for the javadoc.
+    */
+   public void createConnectionFactory(String name,
+                                       boolean ha,
+                                       boolean useDiscovery,
+                                       int cfType,
+                                       String[] connectorNames,
+                                       Object[] bindings) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         if (useDiscovery)
+         {
+            if (connectorNames == null || connectorNames.length == 0)
+            {
+               throw new IllegalArgumentException("no discovery group name supplied");
+            }
+            server.createConnectionFactory(name,
+                                           ha,
+                                           JMSFactoryType.valueOf(cfType),
+                                           connectorNames[0],
+                                           JMSServerControlImpl.convert(bindings));
+         }
+         else
+         {
+            List<String> connectorList = new ArrayList<String>(connectorNames.length);
+
+            for (String str : connectorNames)
+            {
+               connectorList.add(str);
+            }
+
+            server.createConnectionFactory(name,
+                                           ha,
+                                           JMSFactoryType.valueOf(cfType),
+                                           connectorList,
+                                           JMSServerControlImpl.convert(bindings));
+         }
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public void createConnectionFactory(String name,
+                                       boolean ha,
+                                       boolean useDiscovery,
+                                       int cfType,
+                                       String connectors,
+                                       String jndiBindings,
+                                       String clientID,
+                                       long clientFailureCheckPeriod,
+                                       long connectionTTL,
+                                       long callTimeout,
+                                       long callFailoverTimeout,
+                                       int minLargeMessageSize,
+                                       boolean compressLargeMessages,
+                                       int consumerWindowSize,
+                                       int consumerMaxRate,
+                                       int confirmationWindowSize,
+                                       int producerWindowSize,
+                                       int producerMaxRate,
+                                       boolean blockOnAcknowledge,
+                                       boolean blockOnDurableSend,
+                                       boolean blockOnNonDurableSend,
+                                       boolean autoGroup,
+                                       boolean preAcknowledge,
+                                       String loadBalancingPolicyClassName,
+                                       int transactionBatchSize,
+                                       int dupsOKBatchSize,
+                                       boolean useGlobalPools,
+                                       int scheduledThreadPoolMaxSize,
+                                       int threadPoolMaxSize,
+                                       long retryInterval,
+                                       double retryIntervalMultiplier,
+                                       long maxRetryInterval,
+                                       int reconnectAttempts,
+                                       boolean failoverOnInitialConnection,
+                                       String groupId) throws Exception
+   {
+      createConnectionFactory(name,
+                              ha,
+                              useDiscovery,
+                              cfType,
+                              toArray(connectors),
+                              toArray(jndiBindings),
+                              clientID,
+                              clientFailureCheckPeriod,
+                              connectionTTL,
+                              callTimeout,
+                              callFailoverTimeout,
+                              minLargeMessageSize,
+                              compressLargeMessages,
+                              consumerWindowSize,
+                              consumerMaxRate,
+                              confirmationWindowSize,
+                              producerWindowSize,
+                              producerMaxRate,
+                              blockOnAcknowledge,
+                              blockOnDurableSend,
+                              blockOnNonDurableSend,
+                              autoGroup,
+                              preAcknowledge,
+                              loadBalancingPolicyClassName,
+                              transactionBatchSize,
+                              dupsOKBatchSize,
+                              useGlobalPools,
+                              scheduledThreadPoolMaxSize,
+                              threadPoolMaxSize,
+                              retryInterval,
+                              retryIntervalMultiplier,
+                              maxRetryInterval,
+                              reconnectAttempts,
+                              failoverOnInitialConnection,
+                              groupId);
+   }
+
+   @Override
+   public void createConnectionFactory(String name,
+                                       boolean ha,
+                                       boolean useDiscovery,
+                                       int cfType,
+                                       String[] connectorNames,
+                                       String[] bindings,
+                                       String clientID,
+                                       long clientFailureCheckPeriod,
+                                       long connectionTTL,
+                                       long callTimeout,
+                                       long callFailoverTimeout,
+                                       int minLargeMessageSize,
+                                       boolean compressLargeMessages,
+                                       int consumerWindowSize,
+                                       int consumerMaxRate,
+                                       int confirmationWindowSize,
+                                       int producerWindowSize,
+                                       int producerMaxRate,
+                                       boolean blockOnAcknowledge,
+                                       boolean blockOnDurableSend,
+                                       boolean blockOnNonDurableSend,
+                                       boolean autoGroup,
+                                       boolean preAcknowledge,
+                                       String loadBalancingPolicyClassName,
+                                       int transactionBatchSize,
+                                       int dupsOKBatchSize,
+                                       boolean useGlobalPools,
+                                       int scheduledThreadPoolMaxSize,
+                                       int threadPoolMaxSize,
+                                       long retryInterval,
+                                       double retryIntervalMultiplier,
+                                       long maxRetryInterval,
+                                       int reconnectAttempts,
+                                       boolean failoverOnInitialConnection,
+                                       String groupId) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl()
+            .setName(name)
+            .setHA(ha)
+            .setBindings(bindings)
+            .setFactoryType(JMSFactoryType.valueOf(cfType))
+            .setClientID(clientID)
+            .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+            .setConnectionTTL(connectionTTL)
+            .setCallTimeout(callTimeout)
+            .setCallFailoverTimeout(callFailoverTimeout)
+            .setMinLargeMessageSize(minLargeMessageSize)
+            .setCompressLargeMessages(compressLargeMessages)
+            .setConsumerWindowSize(consumerWindowSize)
+            .setConsumerMaxRate(consumerMaxRate)
+            .setConfirmationWindowSize(confirmationWindowSize)
+            .setProducerWindowSize(producerWindowSize)
+            .setProducerMaxRate(producerMaxRate)
+            .setBlockOnAcknowledge(blockOnAcknowledge)
+            .setBlockOnDurableSend(blockOnDurableSend)
+            .setBlockOnNonDurableSend(blockOnNonDurableSend)
+            .setAutoGroup(autoGroup)
+            .setPreAcknowledge(preAcknowledge)
+            .setTransactionBatchSize(transactionBatchSize)
+            .setDupsOKBatchSize(dupsOKBatchSize)
+            .setUseGlobalPools(useGlobalPools)
+            .setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize)
+            .setThreadPoolMaxSize(threadPoolMaxSize)
+            .setRetryInterval(retryInterval)
+            .setRetryIntervalMultiplier(retryIntervalMultiplier)
+            .setMaxRetryInterval(maxRetryInterval)
+            .setReconnectAttempts(reconnectAttempts)
+            .setFailoverOnInitialConnection(failoverOnInitialConnection)
+            .setGroupID(groupId);
+
+         if (useDiscovery)
+         {
+            configuration.setDiscoveryGroupName(connectorNames[0]);
+         }
+         else
+         {
+            ArrayList<String> connectorNamesList = new ArrayList<String>();
+            for (String nameC : connectorNames)
+            {
+               connectorNamesList.add(nameC);
+            }
+            configuration.setConnectorNames(connectorNamesList);
+         }
+
+         if (loadBalancingPolicyClassName != null && !loadBalancingPolicyClassName.trim().equals(""))
+         {
+            configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
+         }
+
+         server.createConnectionFactory(true, configuration, bindings);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   /**
+    * Create a JMS ConnectionFactory with the specified name connected to a single live-backup pair of servers.
+    * <br>
+    * The ConnectionFactory is bound to JNDI for all the specified bindings Strings.
+    */
+   public void createConnectionFactory(String name,
+                                       boolean ha,
+                                       boolean useDiscovery,
+                                       int cfType,
+                                       String connectors,
+                                       String jndiBindings) throws Exception
+   {
+      createConnectionFactory(name, ha, useDiscovery, cfType, toArray(connectors), toArray(jndiBindings));
+   }
+
+   public boolean createQueue(final String name) throws Exception
+   {
+      return createQueue(name, null, null, true);
+   }
+
+   public boolean createQueue(final String name, final String jndiBindings) throws Exception
+   {
+      return createQueue(name, jndiBindings, null, true);
+   }
+
+   @Override
+   public boolean createQueue(String name, String jndiBindings, String selector) throws Exception
+   {
+      return createQueue(name, jndiBindings, selector, true);
+   }
+
+   public boolean createQueue(@Parameter(name = "name", desc = "Name of the queue to create") String name,
+                              @Parameter(name = "jndiBindings", desc = "comma-separated list of JNDI bindings (use '&comma;' if u need to use commas in your jndi name)") String jndiBindings,
+                              @Parameter(name = "selector", desc = "the jms selector") String selector,
+                              @Parameter(name = "durable", desc = "is the queue persistent and resilient to restart") boolean durable) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.createQueue(true, name, selector, durable,
+               JMSServerControlImpl.toArray(jndiBindings));
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public boolean destroyQueue(final String name) throws Exception
+   {
+      return destroyQueue(name, false);
+   }
+
+   public boolean destroyQueue(final String name, final boolean removeConsumers) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.destroyQueue(name, removeConsumers);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public boolean createTopic(String name) throws Exception
+   {
+      return createTopic(name, null);
+   }
+
+   public boolean createTopic(final String topicName, final String jndiBindings) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.createTopic(true, topicName, JMSServerControlImpl.toArray(jndiBindings));
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public boolean destroyTopic(final String name) throws Exception
+   {
+      return destroyTopic(name, true);
+   }
+
+
+   public boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.destroyTopic(name, removeConsumers);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public void destroyConnectionFactory(final String name) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         server.destroyConnectionFactory(name);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public boolean isStarted()
+   {
+      return server.isStarted();
+   }
+
+   public String getVersion()
+   {
+      checkStarted();
+
+      return server.getVersion();
+   }
+
+   public String[] getQueueNames()
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         Object[] queueControls = server.getHornetQServer().getManagementService().getResources(JMSQueueControl.class);
+         String[] names = new String[queueControls.length];
+         for (int i = 0; i < queueControls.length; i++)
+         {
+            JMSQueueControl queueControl = (JMSQueueControl) queueControls[i];
+            names[i] = queueControl.getName();
+         }
+         return names;
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String[] getTopicNames()
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         Object[] topicControls = server.getHornetQServer().getManagementService().getResources(TopicControl.class);
+         String[] names = new String[topicControls.length];
+         for (int i = 0; i < topicControls.length; i++)
+         {
+            TopicControl topicControl = (TopicControl) topicControls[i];
+            names[i] = topicControl.getName();
+         }
+         return names;
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String[] getConnectionFactoryNames()
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         Object[] cfControls = server.getHornetQServer()
+            .getManagementService()
+            .getResources(ConnectionFactoryControl.class);
+         String[] names = new String[cfControls.length];
+         for (int i = 0; i < cfControls.length; i++)
+         {
+            ConnectionFactoryControl cfControl = (ConnectionFactoryControl) cfControls[i];
+            names[i] = cfControl.getName();
+         }
+         return names;
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   // NotificationEmitter implementation ----------------------------
+
+   public void removeNotificationListener(final NotificationListener listener,
+                                          final NotificationFilter filter,
+                                          final Object handback) throws ListenerNotFoundException
+   {
+      broadcaster.removeNotificationListener(listener, filter, handback);
+   }
+
+   public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException
+   {
+      broadcaster.removeNotificationListener(listener);
+   }
+
+   public void addNotificationListener(final NotificationListener listener,
+                                       final NotificationFilter filter,
+                                       final Object handback) throws IllegalArgumentException
+   {
+      broadcaster.addNotificationListener(listener, filter, handback);
+   }
+
+   public MBeanNotificationInfo[] getNotificationInfo()
+   {
+      return JMSServerControlImpl.getNotificationInfos();
+   }
+
+   public String[] listRemoteAddresses() throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.listRemoteAddresses();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String[] listRemoteAddresses(final String ipAddress) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.listRemoteAddresses(ipAddress);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public boolean closeConnectionsForAddress(final String ipAddress) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.closeConnectionsForAddress(ipAddress);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public boolean closeConsumerConnectionsForAddress(final String address) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.closeConsumerConnectionsForAddress(address);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public boolean closeConnectionsForUser(final String userName) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.closeConnectionsForUser(userName);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String[] listConnectionIDs() throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.listConnectionIDs();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String listConnectionsAsJSON() throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         JSONArray array = new JSONArray();
+
+         Set<RemotingConnection> connections = server.getHornetQServer().getRemotingService().getConnections();
+
+         Set<ServerSession> sessions = server.getHornetQServer().getSessions();
+
+         Map<Object, ServerSession> jmsSessions = new HashMap<Object, ServerSession>();
+
+         for (ServerSession session : sessions)
+         {
+            if (session.getMetaData("jms-session") != null)
+            {
+               jmsSessions.put(session.getConnectionID(), session);
+            }
+         }
+
+         for (RemotingConnection connection : connections)
+         {
+            ServerSession session = jmsSessions.get(connection.getID());
+            if (session != null)
+            {
+               JSONObject obj = new JSONObject();
+               obj.put("connectionID", connection.getID().toString());
+               obj.put("clientAddress", connection.getRemoteAddress());
+               obj.put("creationTime", connection.getCreationTime());
+               obj.put("clientID", session.getMetaData("jms-client-id"));
+               obj.put("principal", session.getUsername());
+               array.put(obj);
+            }
+         }
+         return array.toString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String listConsumersAsJSON(String connectionID) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         JSONArray array = new JSONArray();
+
+         Set<RemotingConnection> connections = server.getHornetQServer().getRemotingService().getConnections();
+         for (RemotingConnection connection : connections)
+         {
+            if (connectionID.equals(connection.getID().toString()))
+            {
+               List<ServerSession> sessions = server.getHornetQServer().getSessions(connectionID);
+               for (ServerSession session : sessions)
+               {
+                  Set<ServerConsumer> consumers = session.getServerConsumers();
+                  for (ServerConsumer consumer : consumers)
+                  {
+                     JSONObject obj = toJSONObject(consumer);
+                     if (obj != null)
+                     {
+                        array.put(obj);
+                     }
+                  }
+               }
+            }
+         }
+         return array.toString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String listAllConsumersAsJSON() throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         JSONArray array = new JSONArray();
+
+         Set<ServerSession> sessions = server.getHornetQServer().getSessions();
+         for (ServerSession session : sessions)
+         {
+            Set<ServerConsumer> consumers = session.getServerConsumers();
+            for (ServerConsumer consumer : consumers)
+            {
+               JSONObject obj = toJSONObject(consumer);
+               if (obj != null)
+               {
+                  array.put(obj);
+               }
+            }
+         }
+         return array.toString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String[] listSessions(final String connectionID) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.listSessions(connectionID);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String listPreparedTransactionDetailsAsJSON() throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.listPreparedTransactionDetailsAsJSON();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   public String listPreparedTransactionDetailsAsHTML() throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      try
+      {
+         return server.listPreparedTransactionDetailsAsHTML();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   /* (non-Javadoc)
+    * @see org.apache.activemq6.core.management.impl.AbstractControl#fillMBeanOperationInfo()
+    */
+   @Override
+   protected MBeanOperationInfo[] fillMBeanOperationInfo()
+   {
+      return MBeanInfoHelper.getMBeanOperationsInfo(JMSServerControl.class);
+   }
+
+   // Private -------------------------------------------------------
+
+   private void checkStarted()
+   {
+      if (!server.isStarted())
+      {
+         throw new IllegalStateException("HornetQ JMS Server is not started. it can not be managed yet");
+      }
+   }
+
+   // Inner classes -------------------------------------------------
+
+   public String[] listTargetDestinations(String sessionID) throws Exception
+   {
+      String[] addresses = server.getHornetQServer().getHornetQServerControl().listTargetAddresses(sessionID);
+      Map<String, DestinationControl> allDests = new HashMap<String, DestinationControl>();
+
+      Object[] queueControls = server.getHornetQServer().getManagementService().getResources(JMSQueueControl.class);
+      for (Object queueControl2 : queueControls)
+      {
+         JMSQueueControl queueControl = (JMSQueueControl) queueControl2;
+         allDests.put(queueControl.getAddress(), queueControl);
+      }
+
+      Object[] topicControls = server.getHornetQServer().getManagementService().getResources(TopicControl.class);
+      for (Object topicControl2 : topicControls)
+      {
+         TopicControl topicControl = (TopicControl) topicControl2;
+         allDests.put(topicControl.getAddress(), topicControl);
+      }
+
+      List<String> destinations = new ArrayList<String>();
+      for (String addresse : addresses)
+      {
+         DestinationControl control = allDests.get(addresse);
+         if (control != null)
+         {
+            destinations.add(control.getAddress());
+         }
+      }
+      return destinations.toArray(new String[0]);
+   }
+
+   public String getLastSentMessageID(String sessionID, String address) throws Exception
+   {
+      ServerSession session = server.getHornetQServer().getSessionByID(sessionID);
+      if (session != null)
+      {
+         return session.getLastSentMessageID(address);
+      }
+      return null;
+   }
+
+   public String getSessionCreationTime(String sessionID) throws Exception
+   {
+      ServerSession session = server.getHornetQServer().getSessionByID(sessionID);
+      if (session != null)
+      {
+         return String.valueOf(session.getCreationTime());
+      }
+      return null;
+   }
+
+   public String listSessionsAsJSON(final String connectionID) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+
+      JSONArray array = new JSONArray();
+      try
+      {
+         List<ServerSession> sessions = server.getHornetQServer().getSessions(connectionID);
+         for (ServerSession sess : sessions)
+         {
+            JSONObject obj = new JSONObject();
+            obj.put("sessionID", sess.getName());
+            obj.put("creationTime", sess.getCreationTime());
+            array.put(obj);
+         }
+      }
+      finally
+      {
+         blockOnIO();
+      }
+      return array.toString();
+   }
+
+   public String closeConnectionWithClientID(final String clientID) throws Exception
+   {
+      return server.getHornetQServer().destroyConnectionWithSessionMetadata("jms-client-id", clientID);
+   }
+
+   private JSONObject toJSONObject(ServerConsumer consumer) throws Exception
+   {
+      JSONObject obj = new JSONObject();
+      obj.put("consumerID", consumer.getID());
+      obj.put("connectionID", consumer.getConnectionID());
+      obj.put("sessionID", consumer.getSessionID());
+      obj.put("queueName", consumer.getQueue().getName().toString());
+      obj.put("browseOnly", consumer.isBrowseOnly());
+      obj.put("creationTime", consumer.getCreationTime());
+      // JMS consumer with message filter use the queue's filter
+      Filter queueFilter = consumer.getQueue().getFilter();
+      if (queueFilter != null)
+      {
+         obj.put("filter", queueFilter.getFilterString().toString());
+      }
+      String[] destinationInfo = determineJMSDestination(consumer.getQueue().getAddress().toString());
+      if (destinationInfo == null)
+      {
+         return null;
+      }
+      obj.put("destinationName", destinationInfo[0]);
+      obj.put("destinationType", destinationInfo[1]);
+      if (destinationInfo[1].equals("topic"))
+      {
+         try
+         {
+            HornetQDestination.decomposeQueueNameForDurableSubscription(consumer.getQueue().getName().toString());
+            obj.put("durable", true);
+         }
+         catch (IllegalArgumentException e)
+         {
+            obj.put("durable", false);
+         }
+         catch (JMSRuntimeException e)
+         {
+            obj.put("durable", false);
+         }
+      }
+      else
+      {
+         obj.put("durable", false);
+      }
+
+      return obj;
+   }
+
+   @Override
+   public void onNotification(org.apache.activemq6.core.server.management.Notification notification)
+   {
+      if (!(notification.getType() instanceof JMSNotificationType)) return;
+      JMSNotificationType type = (JMSNotificationType) notification.getType();
+      TypedProperties prop = notification.getProperties();
+
+      this.broadcaster.sendNotification(new Notification(type.toString(), this,
+            notifSeq.incrementAndGet(), prop.getSimpleStringProperty(JMSNotificationType.MESSAGE).toString()));
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSTopicControlImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSTopicControlImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSTopicControlImpl.java
new file mode 100644
index 0000000..53730cc
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/management/impl/JMSTopicControlImpl.java
@@ -0,0 +1,417 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.management.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.MBeanInfo;
+import javax.management.StandardMBean;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.management.AddressControl;
+import org.apache.activemq6.api.core.management.HornetQServerControl;
+import org.apache.activemq6.api.core.management.QueueControl;
+import org.apache.activemq6.api.core.management.ResourceNames;
+import org.apache.activemq6.api.jms.management.TopicControl;
+import org.apache.activemq6.core.management.impl.MBeanInfoHelper;
+import org.apache.activemq6.core.server.management.ManagementService;
+import org.apache.activemq6.jms.client.HornetQDestination;
+import org.apache.activemq6.jms.client.HornetQMessage;
+import org.apache.activemq6.jms.client.SelectorTranslator;
+import org.apache.activemq6.jms.server.JMSServerManager;
+import org.apache.activemq6.utils.json.JSONArray;
+import org.apache.activemq6.utils.json.JSONObject;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class JMSTopicControlImpl extends StandardMBean implements TopicControl
+{
+   private final HornetQDestination managedTopic;
+
+   private final AddressControl addressControl;
+
+   private final ManagementService managementService;
+
+   private final JMSServerManager jmsServerManager;
+
+   // Static --------------------------------------------------------
+
+   public static String createFilterFromJMSSelector(final String selectorStr) throws HornetQException
+   {
+      return selectorStr == null || selectorStr.trim().length() == 0 ? null
+                                                                    : SelectorTranslator.convertToHornetQFilterString(selectorStr);
+   }
+
+   // Constructors --------------------------------------------------
+
+   public JMSTopicControlImpl(final HornetQDestination topic,
+                              final JMSServerManager jmsServerManager,
+                              final AddressControl addressControl,
+                              final ManagementService managementService) throws Exception
+   {
+      super(TopicControl.class);
+      this.jmsServerManager = jmsServerManager;
+      managedTopic = topic;
+      this.addressControl = addressControl;
+      this.managementService = managementService;
+   }
+
+   // TopicControlMBean implementation ------------------------------
+
+   @Override
+   public void addJNDI(String jndi) throws Exception
+   {
+      jmsServerManager.addTopicToJndi(managedTopic.getName(), jndi);
+   }
+
+   public String[] getJNDIBindings()
+   {
+      return jmsServerManager.getJNDIOnTopic(managedTopic.getName());
+   }
+
+   public String getName()
+   {
+      return managedTopic.getName();
+   }
+
+   public boolean isTemporary()
+   {
+      return managedTopic.isTemporary();
+   }
+
+   public String getAddress()
+   {
+      return managedTopic.getAddress();
+   }
+
+   public long getMessageCount()
+   {
+      return getMessageCount(DurabilityType.ALL);
+   }
+
+   public int getDeliveringCount()
+   {
+      List<QueueControl> queues = getQueues(DurabilityType.ALL);
+      int count = 0;
+      for (QueueControl queue : queues)
+      {
+         count += queue.getDeliveringCount();
+      }
+      return count;
+   }
+
+   public long getMessagesAdded()
+   {
+      List<QueueControl> queues = getQueues(DurabilityType.ALL);
+      int count = 0;
+      for (QueueControl queue : queues)
+      {
+         count += queue.getMessagesAdded();
+      }
+      return count;
+   }
+
+   public int getDurableMessageCount()
+   {
+      return getMessageCount(DurabilityType.DURABLE);
+   }
+
+   public int getNonDurableMessageCount()
+   {
+      return getMessageCount(DurabilityType.NON_DURABLE);
+   }
+
+   public int getSubscriptionCount()
+   {
+      return getQueues(DurabilityType.ALL).size();
+   }
+
+   public int getDurableSubscriptionCount()
+   {
+      return getQueues(DurabilityType.DURABLE).size();
+   }
+
+   public int getNonDurableSubscriptionCount()
+   {
+      return getQueues(DurabilityType.NON_DURABLE).size();
+   }
+
+   public Object[] listAllSubscriptions()
+   {
+      return listSubscribersInfos(DurabilityType.ALL);
+   }
+
+   public String listAllSubscriptionsAsJSON() throws Exception
+   {
+      return listSubscribersInfosAsJSON(DurabilityType.ALL);
+   }
+
+   public Object[] listDurableSubscriptions()
+   {
+      return listSubscribersInfos(DurabilityType.DURABLE);
+   }
+
+   public String listDurableSubscriptionsAsJSON() throws Exception
+   {
+      return listSubscribersInfosAsJSON(DurabilityType.DURABLE);
+   }
+
+   public Object[] listNonDurableSubscriptions()
+   {
+      return listSubscribersInfos(DurabilityType.NON_DURABLE);
+   }
+
+   public String listNonDurableSubscriptionsAsJSON() throws Exception
+   {
+      return listSubscribersInfosAsJSON(DurabilityType.NON_DURABLE);
+   }
+
+   public Map<String, Object>[] listMessagesForSubscription(final String queueName) throws Exception
+   {
+      QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+      if (coreQueueControl == null)
+      {
+         throw new IllegalArgumentException("No subscriptions with name " + queueName);
+      }
+
+      Map<String, Object>[] coreMessages = coreQueueControl.listMessages(null);
+
+      Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
+
+      int i = 0;
+
+      for (Map<String, Object> coreMessage : coreMessages)
+      {
+         jmsMessages[i++] = HornetQMessage.coreMaptoJMSMap(coreMessage);
+      }
+      return jmsMessages;
+   }
+
+   public String listMessagesForSubscriptionAsJSON(final String queueName) throws Exception
+   {
+      return JMSQueueControlImpl.toJSON(listMessagesForSubscription(queueName));
+   }
+
+   public int countMessagesForSubscription(final String clientID, final String subscriptionName, final String filterStr) throws Exception
+   {
+      String queueName = HornetQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName);
+      QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+      if (coreQueueControl == null)
+      {
+         throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
+      }
+      String filter = JMSTopicControlImpl.createFilterFromJMSSelector(filterStr);
+      return coreQueueControl.listMessages(filter).length;
+   }
+
+   public int removeMessages(final String filterStr) throws Exception
+   {
+      String filter = JMSTopicControlImpl.createFilterFromJMSSelector(filterStr);
+      int count = 0;
+      String[] queues = addressControl.getQueueNames();
+      for (String queue : queues)
+      {
+         QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
+         if (coreQueueControl != null)
+         {
+            count += coreQueueControl.removeMessages(filter);
+         }
+      }
+
+      return count;
+   }
+
+   public void dropDurableSubscription(final String clientID, final String subscriptionName) throws Exception
+   {
+      String queueName = HornetQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName);
+      QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queueName);
+      if (coreQueueControl == null)
+      {
+         throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID);
+      }
+      HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
+      serverControl.destroyQueue(queueName);
+   }
+
+   public void dropAllSubscriptions() throws Exception
+   {
+      HornetQServerControl serverControl = (HornetQServerControl)managementService.getResource(ResourceNames.CORE_SERVER);
+      String[] queues = addressControl.getQueueNames();
+      for (String queue : queues)
+      {
+         // Drop all subscription shouldn't delete the dummy queue used to identify if the topic exists on the core queues.
+         // we will just ignore this queue
+         if (!queue.equals(managedTopic.getAddress()))
+         {
+            serverControl.destroyQueue(queue);
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private Object[] listSubscribersInfos(final DurabilityType durability)
+   {
+      List<QueueControl> queues = getQueues(durability);
+      List<Object[]> subInfos = new ArrayList<Object[]>(queues.size());
+
+      for (QueueControl queue : queues)
+      {
+         String clientID = null;
+         String subName = null;
+
+         if (queue.isDurable())
+         {
+            Pair<String, String> pair = HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
+                                                                                                         .toString());
+            clientID = pair.getA();
+            subName = pair.getB();
+         }
+
+         String filter = queue.getFilter() != null ? queue.getFilter() : null;
+
+         Object[] subscriptionInfo = new Object[6];
+         subscriptionInfo[0] = queue.getName();
+         subscriptionInfo[1] = clientID;
+         subscriptionInfo[2] = subName;
+         subscriptionInfo[3] = queue.isDurable();
+         subscriptionInfo[4] = queue.getMessageCount();
+         subscriptionInfo[5] = filter;
+         subInfos.add(subscriptionInfo);
+      }
+      return subInfos.toArray(new Object[subInfos.size()]);
+   }
+
+   private String listSubscribersInfosAsJSON(final DurabilityType durability) throws Exception
+   {
+      try
+      {
+         List<QueueControl> queues = getQueues(durability);
+         JSONArray array = new JSONArray();
+
+         for (QueueControl queue : queues)
+         {
+            String clientID = null;
+            String subName = null;
+
+            if (queue.isDurable() && !queue.getName().startsWith(ResourceNames.JMS_TOPIC))
+            {
+               Pair<String, String> pair = HornetQDestination.decomposeQueueNameForDurableSubscription(queue.getName()
+                                                                                                            .toString());
+               clientID = pair.getA();
+               subName = pair.getB();
+            }
+            else if (queue.getName().startsWith(ResourceNames.JMS_TOPIC))
+            {
+               // in the case of heirarchical topics the queue name will not follow the <part>.<part> pattern of normal
+               // durable subscribers so skip decomposing the name for the client ID and subscription name and just
+               // hard-code it
+               clientID = "HornetQ";
+               subName = "HornetQ";
+            }
+
+            String filter = queue.getFilter() != null ? queue.getFilter() : null;
+
+            JSONObject info = new JSONObject();
+
+            info.put("queueName", queue.getName());
+            info.put("clientID", clientID);
+            info.put("selector", filter);
+            info.put("name", subName);
+            info.put("durable", queue.isDurable());
+            info.put("messageCount", queue.getMessageCount());
+            info.put("deliveringCount", queue.getDeliveringCount());
+            info.put("consumers", new JSONArray(queue.listConsumersAsJSON()) );
+            array.put(info);
+         }
+
+         return array.toString();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return e.toString();
+      }
+   }
+
+   private int getMessageCount(final DurabilityType durability)
+   {
+      List<QueueControl> queues = getQueues(durability);
+      int count = 0;
+      for (QueueControl queue : queues)
+      {
+         count += queue.getMessageCount();
+      }
+      return count;
+   }
+
+   private List<QueueControl> getQueues(final DurabilityType durability)
+   {
+      try
+      {
+         List<QueueControl> matchingQueues = new ArrayList<QueueControl>();
+         String[] queues = addressControl.getQueueNames();
+         for (String queue : queues)
+         {
+            QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
+
+            // Ignore the "special" subscription
+            if (coreQueueControl != null && !coreQueueControl.getName().equals(addressControl.getAddress()))
+            {
+               if (durability == DurabilityType.ALL || durability == DurabilityType.DURABLE &&
+                   coreQueueControl.isDurable() ||
+                   durability == DurabilityType.NON_DURABLE &&
+                   !coreQueueControl.isDurable())
+               {
+                  matchingQueues.add(coreQueueControl);
+               }
+            }
+         }
+         return matchingQueues;
+      }
+      catch (Exception e)
+      {
+         return Collections.emptyList();
+      }
+   }
+
+   @Override
+   public MBeanInfo getMBeanInfo()
+   {
+      MBeanInfo info = super.getMBeanInfo();
+      return new MBeanInfo(info.getClassName(),
+                           info.getDescription(),
+                           info.getAttributes(),
+                           info.getConstructors(),
+                           MBeanInfoHelper.getMBeanOperationsInfo(TopicControl.class),
+                           info.getNotifications());
+   }
+
+   // Inner classes -------------------------------------------------
+
+   private enum DurabilityType
+   {
+      ALL, DURABLE, NON_DURABLE
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/JMSStorageManager.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/JMSStorageManager.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/JMSStorageManager.java
new file mode 100644
index 0000000..921c352
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/JMSStorageManager.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.persistence;
+
+import java.util.List;
+
+import org.apache.activemq6.core.server.HornetQComponent;
+import org.apache.activemq6.jms.persistence.config.PersistedConnectionFactory;
+import org.apache.activemq6.jms.persistence.config.PersistedDestination;
+import org.apache.activemq6.jms.persistence.config.PersistedJNDI;
+import org.apache.activemq6.jms.persistence.config.PersistedType;
+
+/**
+ * A JMSPersistence
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JMSStorageManager extends HornetQComponent
+{
+
+   void load() throws Exception;
+
+   void storeDestination(PersistedDestination destination) throws Exception;
+
+   void deleteDestination(PersistedType type, String name) throws Exception;
+
+   List<PersistedDestination> recoverDestinations();
+
+   void deleteConnectionFactory(String connectionFactory) throws Exception;
+
+   void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception;
+
+   List<PersistedConnectionFactory> recoverConnectionFactories();
+
+   void addJNDI(PersistedType type, String name, String ... address) throws Exception;
+
+   List<PersistedJNDI> recoverPersistedJNDI() throws Exception;
+
+   void deleteJNDI(PersistedType type, String name, String address) throws Exception;
+
+   void deleteJNDI(PersistedType type, String name) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedConnectionFactory.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedConnectionFactory.java
new file mode 100644
index 0000000..33e4bdb
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedConnectionFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.persistence.config;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.jms.server.config.ConnectionFactoryConfiguration;
+import org.apache.activemq6.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+
+/**
+ * A PersistedConnectionFactory
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PersistedConnectionFactory implements EncodingSupport
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long id;
+
+   private ConnectionFactoryConfiguration config;
+
+   public PersistedConnectionFactory()
+   {
+      super();
+   }
+
+   /**
+    * @param config
+    */
+   public PersistedConnectionFactory(final ConnectionFactoryConfiguration config)
+   {
+      super();
+      this.config = config;
+   }
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   /**
+    * @return the id
+    */
+   public long getId()
+   {
+      return id;
+   }
+
+   public void setId(final long id)
+   {
+      this.id = id;
+   }
+
+   public String getName()
+   {
+      return config.getName();
+   }
+
+   /**
+    * @return the config
+    */
+   public ConnectionFactoryConfiguration getConfig()
+   {
+      return config;
+   }
+
+   @Override
+   public void decode(final HornetQBuffer buffer)
+   {
+      config = new ConnectionFactoryConfigurationImpl();
+      config.decode(buffer);
+   }
+
+   @Override
+   public void encode(final HornetQBuffer buffer)
+   {
+      config.encode(buffer);
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return config.getEncodeSize();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedDestination.java
----------------------------------------------------------------------
diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedDestination.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedDestination.java
new file mode 100644
index 0000000..3653773
--- /dev/null
+++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/persistence/config/PersistedDestination.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.jms.persistence.config;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.utils.BufferHelper;
+import org.apache.activemq6.utils.DataConstants;
+
+/**
+ * A PersistedDestination
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class PersistedDestination implements EncodingSupport
+{
+
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long id;
+
+   private PersistedType type;
+
+   private String name;
+
+   private String selector;
+
+   private boolean durable;
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public PersistedDestination()
+   {
+   }
+
+   public PersistedDestination(final PersistedType type, final String name)
+   {
+      this(type, name, null, true);
+   }
+
+   public PersistedDestination(final PersistedType type, final String name, final String selector, final boolean durable)
+   {
+      this.type = type;
+      this.name = name;
+      this.selector = selector;
+      this.durable = durable;
+   }
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+
+   public long getId()
+   {
+      return id;
+   }
+
+   public void setId(final long id)
+   {
+      this.id = id;
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public PersistedType getType()
+   {
+      return type;
+   }
+
+   public String getSelector()
+   {
+      return selector;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   public int getEncodeSize()
+   {
+      return DataConstants.SIZE_BYTE +
+            BufferHelper.sizeOfSimpleString(name) +
+            BufferHelper.sizeOfNullableSimpleString(selector) +
+            DataConstants.SIZE_BOOLEAN;
+   }
+
+   public void encode(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(type.getType());
+      buffer.writeSimpleString(SimpleString.toSimpleString(name));
+      buffer.writeNullableSimpleString(SimpleString.toSimpleString(selector));
+      buffer.writeBoolean(durable);
+   }
+
+   public void decode(final HornetQBuffer buffer)
+   {
+      type = PersistedType.getType(buffer.readByte());
+      name = buffer.readSimpleString().toString();
+      SimpleString selectorStr = buffer.readNullableSimpleString();
+      selector = (selectorStr == null) ? null : selectorStr.toString();
+      durable = buffer.readBoolean();
+   }
+}


Mime
View raw message