activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [activemq] branch activemq-5.15.x updated: [AMQ-7068] Advisory messages are empty when received with a AMQP subscription (#312)
Date Sun, 17 Nov 2019 16:41:08 GMT
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new 614e55d  [AMQ-7068] Advisory messages are empty when received with a AMQP subscription
(#312)
614e55d is described below

commit 614e55d04e8f3715a80a4027b622498a3f58f7ec
Author: Johannes Bäurle <johannes.baeurle@aitgmbh.de>
AuthorDate: Sun Nov 17 17:40:15 2019 +0100

    [AMQ-7068] Advisory messages are empty when received with a AMQP subscription (#312)
    
    (cherry picked from commit 6f338aa2817c221c16b6a97b7d3377daeaf42726)
---
 .../message/JMSMappingOutboundTransformer.java     | 48 ++++++++++++-
 .../message/JMSMappingOutboundTransformerTest.java | 80 ++++++++++++++++++++++
 2 files changed, 127 insertions(+), 1 deletion(-)

diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index ffe9ccc..67d0344 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -52,6 +52,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import javax.jms.JMSException;
@@ -67,7 +68,11 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.apache.activemq.command.ActiveMQStreamMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.TypeConversionSupport;
@@ -333,6 +338,15 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer
{
                 apMap = new HashMap<>();
             }
             apMap.put(key, value);
+
+            int messageType = message.getDataStructureType();
+            if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
+                // Type of command to recognize advisory message
+                Object data = message.getDataStructure();
+                if(data != null) {
+                    apMap.put("ActiveMqDataStructureType", data.getClass().getSimpleName());
+                }
+            }
         }
 
         final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
@@ -376,7 +390,39 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer
{
 
         int messageType = message.getDataStructureType();
 
-        if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
+        if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
+        	Object data = message.getDataStructure();
+            if (data instanceof ConnectionInfo) {
+    			ConnectionInfo connectionInfo = (ConnectionInfo)data;
+        		final HashMap<String, Object> connectionMap = new LinkedHashMap<String,
Object>();
+        		
+        		connectionMap.put("ConnectionId", connectionInfo.getConnectionId().getValue());
+        		connectionMap.put("ClientId", connectionInfo.getClientId());
+        		connectionMap.put("ClientIp", connectionInfo.getClientIp());
+        		connectionMap.put("UserName", connectionInfo.getUserName());
+        		connectionMap.put("BrokerMasterConnector", connectionInfo.isBrokerMasterConnector());
+        		connectionMap.put("Manageable", connectionInfo.isManageable());
+        		connectionMap.put("ClientMaster", connectionInfo.isClientMaster());
+        		connectionMap.put("FaultTolerant", connectionInfo.isFaultTolerant());
+        		connectionMap.put("FailoverReconnect", connectionInfo.isFailoverReconnect());
+        		
+    			body = new AmqpValue(connectionMap);
+            } else if (data instanceof RemoveInfo) {
+    			RemoveInfo removeInfo = (RemoveInfo)message.getDataStructure();
+        		final HashMap<String, Object> removeMap = new LinkedHashMap<String, Object>();
+        		
+            	if (removeInfo.isConnectionRemove()) {
+            		removeMap.put(ConnectionId.class.getSimpleName(), ((ConnectionId)removeInfo.getObjectId()).getValue());
+            	} else if (removeInfo.isConsumerRemove()) {
+            		removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue());
+            		removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId());
+            		removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId());
+            		removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId());
+            	}
+            	
+            	body = new AmqpValue(removeMap);
+            }
+        } else if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
             Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message);
 
             if (payload == null) {
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index e9da261..1d3adea 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -39,6 +39,7 @@ import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -57,6 +58,9 @@ import org.apache.activemq.command.ActiveMQTempQueue;
 import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -771,6 +775,82 @@ public class JMSMappingOutboundTransformerTest {
         String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(),
StandardCharsets.UTF_8);
         assertEquals(contentString, contents);
     }
+    
+    @Test 
+    public void testConvertConnectionInfo() throws Exception {
+    	String connectionId = "myConnectionId";
+    	String clientId = "myClientId";
+
+    	ConnectionInfo dataStructure = new ConnectionInfo();
+    	dataStructure.setConnectionId(new ConnectionId(connectionId));
+    	dataStructure.setClientId(clientId);
+    	
+    	ActiveMQMessage outbound = createMessage();
+    	Map<String, String> properties = new HashMap<String, String>();
+    	properties.put("originUrl", "localhost");
+    	outbound.setProperties(properties);
+    	outbound.setDataStructure(dataStructure);
+    	outbound.onSend();
+    	outbound.storeContent();
+    	
+    	JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+        
+        Message amqp = encoded.decode();
+
+        assertNotNull(amqp.getApplicationProperties());
+        
+        Map<String, Object> apMap = amqp.getApplicationProperties().getValue();
+        assertEquals(ConnectionInfo.class.getSimpleName(), apMap.get("ActiveMqDataStructureType"));
+        
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+        @SuppressWarnings("unchecked")
+        Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
+
+        assertTrue(connectionId.equals(amqpMap.get("ConnectionId")));
+        assertTrue(clientId.equals(amqpMap.get("ClientId")));
+    }
+
+    @Test 
+    public void testConvertRemoveInfo() throws Exception {
+    	String connectionId = "myConnectionId";
+
+    	RemoveInfo dataStructure = new RemoveInfo(new ConnectionId(connectionId));
+    	
+    	ActiveMQMessage outbound = createMessage();
+    	Map<String, String> properties = new HashMap<String, String>();
+    	properties.put("originUrl", "localhost");
+    	outbound.setProperties(properties);
+    	outbound.setDataStructure(dataStructure);
+    	outbound.onSend();
+    	outbound.storeContent();
+    	
+    	JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+        
+        Message amqp = encoded.decode();
+
+        assertNotNull(amqp.getApplicationProperties());
+        
+        Map<String, Object> apMap = amqp.getApplicationProperties().getValue();
+        assertEquals(RemoveInfo.class.getSimpleName(), apMap.get("ActiveMqDataStructureType"));
+        
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+        @SuppressWarnings("unchecked")
+        Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
+
+        assertTrue(connectionId.equals(amqpMap.get("ConnectionId")));
+    }
 
     //----- Test JMSDestination Handling -------------------------------------//
 


Mime
View raw message