activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-346 - Add Management send text message functionality similar to ActiveMQ
Date Wed, 20 Jan 2016 15:36:49 GMT
ARTEMIS-346 - Add Management send text message functionality similar to ActiveMQ

https://issues.apache.org/jira/browse/ARTEMIS-346


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c1de710e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c1de710e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c1de710e

Branch: refs/heads/master
Commit: c1de710eb3d75e486379547669b48cf03942fcec
Parents: 0a9a6c9
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Fri Jan 15 13:42:47 2016 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Jan 20 10:35:36 2016 -0500

----------------------------------------------------------------------
 .../api/core/management/QueueControl.java       | 21 +++++++
 .../api/jms/management/JMSQueueControl.java     | 63 ++++++++++++++++++++
 .../management/impl/JMSQueueControlImpl.java    | 53 ++++++++++++++++
 .../core/management/impl/QueueControlImpl.java  | 51 ++++++++++++++++
 .../core/server/impl/ActiveMQServerImpl.java    |  2 +-
 .../server/management/ManagementService.java    |  2 +
 .../management/impl/ManagementServiceImpl.java  |  7 ++-
 .../security/ActiveMQJAASSecurityManager.java   |  2 +-
 .../group/impl/ClusteredResetMockTest.java      |  2 +
 .../server/management/JMSQueueControlTest.java  | 19 ++++++
 .../management/JMSQueueControlUsingJMSTest.java | 25 ++++++++
 .../management/QueueControlTest.java            | 55 +++++++++++++++++
 .../management/QueueControlUsingCoreTest.java   | 12 +++-
 13 files changed, 310 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 298f1e7..76ad6a5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -307,6 +307,27 @@ public interface QueueControl {
    int sendMessagesToDeadLetterAddress(@Parameter(name = "filter", desc = "A message filter
(can be empty)") String filterStr) throws Exception;
 
    /**
+    *
+    * @param headers the message headers and properties to set. Can only
+    *                container Strings maped to primitive types.
+    * @param body the text to send
+    * @param userID
+    * @param durable
+    *@param user
+    * @param password   @return
+    * @throws Exception
+    */
+   @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact =
MBeanOperationInfo.ACTION)
+   String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message")
Map<String, String> headers,
+                      @Parameter(name = "headers", desc = "A type for the message") final
int type,
+                      @Parameter(name = "body", desc = "The body (byte[]) of the message
encoded as a string using Base64") String body,
+                      @Parameter(name = "body", desc = "The user ID to set on the message")
String userID,
+                      @Parameter(name = "durable", desc = "Whether the message is durable")
boolean durable,
+                      @Parameter(name = "user", desc = "The user to authenticate with") String
user,
+                      @Parameter(name = "password", desc = "The users password to authenticate
with") String password) throws Exception;
+
+
+   /**
     * Changes the message's priority corresponding to the specified message ID to the specified
priority.
     *
     * @param newPriority between 0 and 9 inclusive.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
index d1251fa..941b5d0 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
@@ -166,6 +166,69 @@ public interface JMSQueueControl extends DestinationControl {
    int sendMessagesToDeadLetterAddress(@Parameter(name = "filter", desc = "A message filter
(can be empty)") String filterStr) throws Exception;
 
    /**
+    * Sends a TextMesage to the destination.
+    *
+    * @param body the text to send
+    * @return the message id of the message sent.
+    * @throws Exception
+    */
+   @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact =
MBeanOperationInfo.ACTION)
+   String sendTextMessage(@Parameter(name = "body") String body) throws Exception;
+
+   /**
+    * Sends a TextMessage to the destination.
+    *
+    * @param properties the message properties to set as a comma sep name=value list. Can
only
+    *                contain Strings maped to primitive types or JMS properties. eg: body=hi,JMSReplyTo=Queue2
+    * @return the message id of the message sent.
+    * @throws Exception
+    */
+   @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact =
MBeanOperationInfo.ACTION)
+   String sendTextMessageWithProperties(String properties) throws Exception;
+
+   /**
+    * Sends a TextMesage to the destination.
+    *
+    * @param headers the message headers and properties to set. Can only
+    *                container Strings maped to primitive types.
+    * @param body the text to send
+    * @return the message id of the message sent.
+    * @throws Exception
+    */
+   @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact =
MBeanOperationInfo.ACTION)
+   String sendTextMessage(@Parameter(name = "headers") Map<String,String> headers,
+                          @Parameter(name = "body") String body) throws Exception;
+
+   /**
+    * Sends a TextMesage to the destination.
+    * @param body the text to send
+    * @param user
+    * @param password
+    * @return
+    * @throws Exception
+    */
+   @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact =
MBeanOperationInfo.ACTION)
+   String sendTextMessage(@Parameter(name = "body") String body,
+                          @Parameter(name = "user") String user,
+                          @Parameter(name = "password") String password) throws Exception;
+
+   /**
+   *
+   * @param headers the message headers and properties to set. Can only
+   *                container Strings maped to primitive types.
+   * @param body the text to send
+   * @param user
+   * @param password
+   * @return
+   * @throws Exception
+   */
+   @Operation(desc = "Sends a TextMessage to a password-protected destination.", impact =
MBeanOperationInfo.ACTION)
+   String sendTextMessage(@Parameter(name = "headers") Map<String,String> headers,
+                         @Parameter(name = "body") String body,
+                         @Parameter(name = "user") String user,
+                         @Parameter(name = "password") String password) throws Exception;
+
+   /**
     * Changes the message's priority corresponding to the specified message ID to the specified
priority.
     *
     * @param newPriority between 0 and 9 inclusive.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
index 130f418..adbe488 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
@@ -22,12 +22,17 @@ import javax.management.StandardMBean;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException;
 import org.apache.activemq.artemis.api.core.FilterConstants;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
 import org.apache.activemq.artemis.api.core.management.Operation;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
@@ -40,6 +45,8 @@ import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
 import org.apache.activemq.artemis.jms.client.SelectorTranslator;
 import org.apache.activemq.artemis.jms.management.impl.openmbean.JMSOpenTypeSupport;
 import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.artemis.utils.json.JSONArray;
 import org.apache.activemq.artemis.utils.json.JSONObject;
 
@@ -295,6 +302,52 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
    }
 
    @Override
+   public String sendTextMessageWithProperties(String properties) throws Exception {
+      String[] kvs = properties.split(",");
+      Map<String, String> props = new HashMap<String, String>();
+      for (String kv : kvs) {
+         String[] it = kv.split("=");
+         if (it.length == 2) {
+            props.put(it[0],it[1]);
+         }
+      }
+      return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password"));
+   }
+
+   @Override
+   public String sendTextMessage(String body) throws Exception {
+      return sendTextMessage(Collections.EMPTY_MAP, body);
+   }
+
+   @Override
+   public String sendTextMessage(Map headers, String body) throws Exception {
+      return sendTextMessage(headers, body, null, null);
+   }
+
+   @Override
+   public String sendTextMessage(String body, String user, String password) throws Exception
{
+      return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
+   }
+
+   @Override
+   public String sendTextMessage(Map<String, String> headers, String body, String user,
String password) throws Exception {
+      boolean durable = false;
+      if (headers.containsKey("JMSDeliveryMode")) {
+         String jmsDeliveryMode = headers.remove("JMSDeliveryMode");
+         if (jmsDeliveryMode != null && (jmsDeliveryMode.equals("2") || jmsDeliveryMode.equalsIgnoreCase("PERSISTENT")))
{
+            durable = true;
+         }
+      }
+      String userID = UUIDGenerator.getInstance().generateStringUUID();
+      ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(56);
+      buffer.writeNullableSimpleString(new SimpleString(body));
+      byte[] bytes = new byte[buffer.readableBytes()];
+      buffer.readBytes(bytes);
+      coreQueueControl.sendMessage(headers, Message.TEXT_TYPE, Base64.encodeBytes(bytes),
userID, durable, user, password);
+      return userID;
+   }
+
+   @Override
    public boolean changeMessagePriority(final String messageID, final int newPriority) throws
Exception {
       String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
       int changed = coreQueueControl.changeMessagesPriority(filter, newPriority);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index a606a0d..b9f0964 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -38,15 +38,22 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.LinkedListIterator;
+import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.json.JSONArray;
 import org.apache.activemq.artemis.utils.json.JSONException;
 import org.apache.activemq.artemis.utils.json.JSONObject;
@@ -64,6 +71,8 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
{
 
    private final PostOffice postOffice;
 
+   private final StorageManager storageManager;
+   private final SecurityStore securityStore;
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
    private MessageCounter counter;
@@ -106,11 +115,14 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
{
                            final String address,
                            final PostOffice postOffice,
                            final StorageManager storageManager,
+                           final SecurityStore securityStore,
                            final HierarchicalRepository<AddressSettings> addressSettingsRepository)
throws Exception {
       super(QueueControl.class, storageManager);
       this.queue = queue;
       this.address = address;
       this.postOffice = postOffice;
+      this.storageManager = storageManager;
+      this.securityStore = securityStore;
       this.addressSettingsRepository = addressSettingsRepository;
    }
 
@@ -704,6 +716,45 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
{
    }
 
    @Override
+   public String sendMessage(final Map<String, String> headers,
+                             final int type,
+                             final String body,
+                             final String userID,
+                             boolean durable, final String user,
+                             final String password) throws Exception {
+      securityStore.check(queue.getAddress(), CheckType.SEND, new SecurityAuth() {
+         @Override
+         public String getUsername() {
+            return user;
+         }
+
+         @Override
+         public String getPassword() {
+            return password;
+         }
+
+         @Override
+         public RemotingConnection getRemotingConnection() {
+            return null;
+         }
+      });
+      ServerMessageImpl message = new ServerMessageImpl(storageManager.generateID(), 50);
+      for (String header : headers.keySet()) {
+         message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
+      }
+      message.setType((byte) type);
+      message.setDurable(durable);
+      message.setTimestamp(System.currentTimeMillis());
+      message.setUserID(new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(userID)));
+      if (body != null) {
+         message.getBodyBuffer().writeBytes(Base64.decode(body));
+      }
+      message.setAddress(queue.getAddress());
+      postOffice.route(message, null, true);
+      return ""  + message.getMessageID();
+   }
+
+   @Override
    public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
       checkStarted();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index abdf428..4963ed8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1637,7 +1637,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService,
scheduledPool, protocolManagerFactories, executorFactory.getExecutor(), serviceRegistry);
 
-      messagingServerControl = managementService.registerServer(postOffice, storageManager,
configuration, addressSettingsRepository, securityRepository, resourceManager, remotingService,
this, queueFactory, scheduledPool, pagingManager, haPolicy.isBackup());
+      messagingServerControl = managementService.registerServer(postOffice, securityStore,
storageManager, configuration, addressSettingsRepository, securityRepository, resourceManager,
remotingService, this, queueFactory, scheduledPool, pagingManager, haPolicy.isBackup());
 
       // Address settings need to deployed initially, since they're require on paging manager.start()
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
index 3e77d15..c98c22e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/ManagementService.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.remoting.server.RemotingService;
 import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -66,6 +67,7 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
    void setStorageManager(StorageManager storageManager);
 
    ActiveMQServerControlImpl registerServer(final PostOffice postOffice,
+                                            final SecurityStore securityStore,
                                             final StorageManager storageManager,
                                             final Configuration configuration,
                                             final HierarchicalRepository<AddressSettings>
addressSettingsRepository,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index e780ce3..c4d0cd6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -64,6 +64,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.remoting.server.RemotingService;
 import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@@ -100,6 +101,8 @@ public class ManagementServiceImpl implements ManagementService {
 
    private PostOffice postOffice;
 
+   private SecurityStore securityStore;
+
    private PagingManager pagingManager;
 
    private StorageManager storageManager;
@@ -166,6 +169,7 @@ public class ManagementServiceImpl implements ManagementService {
 
    @Override
    public ActiveMQServerControlImpl registerServer(final PostOffice postOffice,
+                                                   final SecurityStore securityStore,
                                                    final StorageManager storageManager1,
                                                    final Configuration configuration,
                                                    final HierarchicalRepository<AddressSettings>
addressSettingsRepository,
@@ -178,6 +182,7 @@ public class ManagementServiceImpl implements ManagementService {
                                                    final PagingManager pagingManager,
                                                    final boolean backup) throws Exception
{
       this.postOffice = postOffice;
+      this.securityStore = securityStore;
       this.addressSettingsRepository = addressSettingsRepository;
       this.securityRepository = securityRepository;
       this.storageManager = storageManager1;
@@ -229,7 +234,7 @@ public class ManagementServiceImpl implements ManagementService {
    public synchronized void registerQueue(final Queue queue,
                                           final SimpleString address,
                                           final StorageManager storageManager) throws Exception
{
-      QueueControlImpl queueControl = new QueueControlImpl(queue, address.toString(), postOffice,
storageManager, addressSettingsRepository);
+      QueueControlImpl queueControl = new QueueControlImpl(queue, address.toString(), postOffice,
storageManager, securityStore, addressSettingsRepository);
       if (messageCounterManager != null) {
          MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue,
false, queue.isDurable(), messageCounterManager.getMaxDayCount());
          queueControl.setMessageCounter(counter);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java
index eebc6ab..c814600 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/ActiveMQJAASSecurityManager.java
@@ -90,7 +90,7 @@ public class ActiveMQJAASSecurityManager implements ActiveMQSecurityManager2
{
                                       final String address,
                                       final RemotingConnection connection) {
       X509Certificate[] certificates = null;
-      if (connection.getTransportConnection() instanceof NettyConnection) {
+      if (connection != null && connection.getTransportConnection() instanceof NettyConnection)
{
          certificates = CertificateUtil.getCertsFromChannel(((NettyConnection) connection.getTransportConnection()).getChannel());
       }
       Subject localSubject;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
index 3ccb311..aea4ae2 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/group/impl/ClusteredResetMockTest.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.remoting.server.RemotingService;
 import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -207,6 +208,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
 
       @Override
       public ActiveMQServerControlImpl registerServer(PostOffice postOffice,
+                                                      SecurityStore securityStore,
                                                       StorageManager storageManager,
                                                       Configuration configuration,
                                                       HierarchicalRepository<AddressSettings>
addressSettingsRepository,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
index d86e0bd..7636248 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
@@ -29,6 +29,7 @@ import javax.management.openmbean.CompositeData;
 import javax.naming.Context;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -156,6 +157,24 @@ public class JMSQueueControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testSendTextMessage() throws Exception {
+      JMSQueueControl queueControl = createManagementControl();
+
+      Assert.assertEquals(0, getMessageCount(queueControl));
+
+      String id = queueControl.sendTextMessage(new HashMap<String, String>(), "theBody",
"myUser", "myPassword");
+
+      Assert.assertEquals(1, getMessageCount(queueControl));
+
+      CompositeData[] data = queueControl.browse();
+      Assert.assertEquals(1, data.length);
+      Assert.assertEquals("ID:" + id, data[0].get("JMSMessageID"));
+      Assert.assertEquals("theBody", data[0].get("Text"));
+      System.out.println(data[0]);
+
+   }
+
+   @Test
    public void testBrowseMessagesWithNullFilter() throws Exception {
       JMSQueueControl queueControl = createManagementControl();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
index a4aec4a..2a966cf 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
@@ -290,6 +290,31 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
{
             return (Integer) proxy.invokeOperation("sendMessagesToDeadLetterAddress", filterStr);
          }
 
+         @Override
+         public String sendTextMessage(@Parameter(name = "body") String body) throws Exception
{
+            return null;
+         }
+
+         @Override
+         public String sendTextMessageWithProperties(String properties) throws Exception
{
+            return null;
+         }
+
+         @Override
+         public String sendTextMessage(Map<String, String> headers, String body) throws
Exception {
+            return null;
+         }
+
+         @Override
+         public String sendTextMessage(String body, String user, String password) throws
Exception {
+            return null;
+         }
+
+         @Override
+         public String sendTextMessage(Map<String, String> headers, String body, String
user, String password) throws Exception {
+            return (String) proxy.invokeOperation("sendTextMessage", headers, body, user,
password);
+         }
+
          public void setDeadLetterAddress(final String deadLetterAddress) throws Exception
{
             proxy.invokeOperation("setDeadLetterAddress", deadLetterAddress);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 52c5edb..12c63dd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.artemis.tests.integration.management;
 
 import javax.management.Notification;
+import javax.management.openmbean.CompositeData;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -39,12 +41,14 @@ import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
+import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.json.JSONArray;
 import org.apache.activemq.artemis.utils.json.JSONObject;
@@ -2017,6 +2021,57 @@ public class QueueControlTest extends ManagementTestBase {
       assertEquals(CoreNotificationType.BINDING_REMOVED.toString(), notif.getType());
    }
 
+   @Test
+   public void testSendMessage() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+
+      queueControl.sendMessage(new HashMap<String, String>(), MessageImpl.TEXT_TYPE,
Base64.encodeBytes("theBody".getBytes()), "myID", true, "myUser", "myPassword");
+
+      Assert.assertEquals(1, getMessageCount(queueControl));
+
+      // the message IDs are set on the server
+      CompositeData[] browse = queueControl.browse(null);
+
+      Assert.assertEquals(1, browse.length);
+
+      byte[] body = (byte[]) browse[0].get("body");
+
+      Assert.assertNotNull(body);
+
+      Assert.assertEquals(new String(body), "theBody");
+   }
+
+   @Test
+   public void testSendNullMessage() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+
+      queueControl.sendMessage(new HashMap<String, String>(), MessageImpl.TEXT_TYPE,
null, "myID", true, "myUser", "myPassword");
+
+      Assert.assertEquals(1, getMessageCount(queueControl));
+
+      // the message IDs are set on the server
+      CompositeData[] browse = queueControl.browse(null);
+
+      Assert.assertEquals(1, browse.length);
+
+      byte[] body = (byte[]) browse[0].get("body");
+
+      Assert.assertNotNull(body);
+
+      Assert.assertEquals(new String(body), "");
+   }
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1de710e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 1553255..68dfd48 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -307,6 +307,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
             return (Integer) proxy.invokeOperation("sendMessagesToDeadLetterAddress", filterStr);
          }
 
+         @Override
+         public String sendMessage(Map<String, String> headers, int type, String body,
String userID, boolean durable, String user, String password) throws Exception {
+            return (String) proxy.invokeOperation("sendMessage", headers, type, body, userID,
durable, user, password);
+         }
+
          public void setDeadLetterAddress(final String deadLetterAddress) throws Exception
{
             proxy.invokeOperation("setDeadLetterAddress", deadLetterAddress);
          }
@@ -332,7 +337,12 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
 
          @Override
          public CompositeData[] browse(String filter) throws Exception {
-            return null;
+            Map map = (Map) proxy.invokeOperation("browse", filter);
+            CompositeData[] compositeDatas = (CompositeData[]) map.get(CompositeData.class.getName());
+            if (compositeDatas == null) {
+               compositeDatas = new CompositeData[0];
+            }
+            return compositeDatas;
          }
 
          @Override


Mime
View raw message