activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-758 - List/Object message sent by OpenWire JMS client can't be consumed with other clients
Date Fri, 30 Sep 2016 00:32:16 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master f10752b68 -> 04925bbfc


ARTEMIS-758 - List/Object message sent by OpenWire JMS client can't be consumed with other
clients

https://issues.apache.org/jira/browse/ARTEMIS-758


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bd9b2057
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bd9b2057
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bd9b2057

Branch: refs/heads/master
Commit: bd9b2057fab907647db69f4954bcba5111d5273c
Parents: f10752b
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Wed Sep 28 12:29:25 2016 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Sep 29 20:28:31 2016 -0400

----------------------------------------------------------------------
 .../converter/jms/ServerJMSObjectMessage.java   |   6 +-
 .../message/JMSMappingOutboundTransformer.java  |   9 +-
 .../amqp/converter/TestConversions.java         |   4 +-
 .../tests/integration/amqp/ProtonTest.java      |  23 ++++
 .../crossprotocol/AMQPToOpenwireTest.java       | 120 +++++++++++++++++++
 .../crossprotocol/OpenWireToAMQPTest.java       | 109 +++++++++++++++++
 6 files changed, 267 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bd9b2057/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
index 39e0df5..c8fb003 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
@@ -62,13 +62,15 @@ public class ServerJMSObjectMessage  extends ServerJMSMessage implements
ObjectM
       ByteArrayOutputStream out = new ByteArrayOutputStream();
       ObjectOutputStream ous = new ObjectOutputStream(out);
       ous.writeObject(object);
-      getInnerMessage().getBodyBuffer().writeBytes(out.toByteArray());
+      byte[] src = out.toByteArray();
+      getInnerMessage().getBodyBuffer().writeInt(src.length);
+      getInnerMessage().getBodyBuffer().writeBytes(src);
    }
 
    @Override
    public void decode() throws Exception {
       super.decode();
-      int size = getInnerMessage().getBodyBuffer().readableBytes();
+      int size = getInnerMessage().getBodyBuffer().readInt();
       byte[] bytes = new byte[size];
       getInnerMessage().getBodyBuffer().readBytes(bytes);
       ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bytes));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bd9b2057/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
index 42ee9f2..40cbf79 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
@@ -279,7 +279,14 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer
{
             if (apMap == null) {
                apMap = new HashMap();
             }
-            apMap.put(key, msg.getObjectProperty(key));
+            Object objectProperty = msg.getObjectProperty(key);
+            if (objectProperty instanceof byte[]) {
+               Binary binary = new Binary((byte[]) objectProperty);
+               apMap.put(key, binary);
+            }
+            else {
+               apMap.put(key, objectProperty);
+            }
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bd9b2057/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 19524b0..27a533a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -97,7 +97,9 @@ public class TestConversions extends Assert {
       ByteArrayOutputStream out = new ByteArrayOutputStream();
       ObjectOutputStream ois = new ObjectOutputStream(out);
       ois.writeObject(new ABadClass());
-      serverMessage.getInnerMessage().getBodyBuffer().writeBytes(out.toByteArray());
+      byte[] src = out.toByteArray();
+      serverMessage.getInnerMessage().getBodyBuffer().writeInt(src.length);
+      serverMessage.getInnerMessage().getBodyBuffer().writeBytes(src);
 
       try {
          converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bd9b2057/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 53e676f..9ed8aaa 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -496,6 +496,29 @@ public class ProtonTest extends ProtonTestBase {
    }
 
    @Test
+   public void testObjectMessage() throws Throwable {
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = createQueue(address);
+      MessageProducer p = session.createProducer(queue);
+      ArrayList list = new ArrayList();
+      list.add("aString");
+      ObjectMessage objectMessage = session.createObjectMessage(list);
+      p.send(objectMessage);
+      session.close();
+
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      objectMessage = (ObjectMessage) cons.receive(5000);
+      assertNotNull(objectMessage);
+      list = (ArrayList) objectMessage.getObject();
+      assertEquals(list.get(0), "aString");
+      connection.close();
+   }
+
+   @Test
    public void testResourceLimitExceptionOnAddressFull() throws Exception {
       setAddressFullBlockPolicy();
       String destinationAddress = address + 1;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bd9b2057/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java
new file mode 100644
index 0000000..624c89c
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/AMQPToOpenwireTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.crossprotocol;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+
+
+public class AMQPToOpenwireTest extends ActiveMQTestBase {
+
+   public static final String OWHOST = "localhost";
+   public static final int OWPORT = 61616;
+   protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
+
+   JMSServerManager serverManager;
+   private ActiveMQServer server;
+   protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+   protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString);
+   private JmsConnectionFactory qpidfactory;
+   protected String queueName = "amqTestQueue1";
+   private SimpleString coreQueue;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, true);
+      serverManager = new JMSServerManagerImpl(server);
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(false).setDeadLetterAddress(new
SimpleString("jms.queue.ActiveMQ.DLQ")));
+      serverConfig.setSecurityEnabled(false);
+      serverManager.start();
+      coreQueue = new SimpleString("jms.queue." + queueName);
+      this.server.createQueue(coreQueue, coreQueue, null, false, false);
+      qpidfactory = new JmsConnectionFactory("amqp://localhost:61616");
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      if (serverManager != null) {
+         serverManager.stop();
+         serverManager = null;
+      }
+   }
+
+   @Test
+   public void testObjectMessage() throws Exception {
+      Connection connection = null;
+      try {
+         connection = qpidfactory.createConnection();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(coreQueue.toString());
+         MessageProducer producer = session.createProducer(queue);
+         ArrayList list = new ArrayList();
+         list.add("aString");
+         ObjectMessage objectMessage = session.createObjectMessage(list);
+         producer.send(objectMessage);
+         connection.close();
+
+         connection = factory.createConnection();
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         queue = session.createQueue(queueName);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+         BytesMessage receive = (BytesMessage) consumer.receive(5000);
+         assertNotNull(receive);
+         byte[] bytes = new byte[(int) receive.getBodyLength()];
+         receive.readBytes(bytes);
+         ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
+         list = (ArrayList) ois.readObject();
+         assertEquals(list.get(0), "aString");
+         connection.close();
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+      }
+      finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bd9b2057/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java
new file mode 100644
index 0000000..6276203
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/OpenWireToAMQPTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.crossprotocol;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+
+
+public class OpenWireToAMQPTest extends ActiveMQTestBase {
+
+   public static final String OWHOST = "localhost";
+   public static final int OWPORT = 61616;
+   protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
+
+   private ActiveMQServer server;
+   protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+   protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString);
+   private JmsConnectionFactory qpidfactory;
+   protected String queueName = "amqTestQueue1";
+   private SimpleString coreQueue;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, true);
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(false).setDeadLetterAddress(new
SimpleString("jms.queue.ActiveMQ.DLQ")));
+      serverConfig.setSecurityEnabled(false);
+      server.start();
+      coreQueue = new SimpleString("jms.queue." + queueName);
+      this.server.createQueue(coreQueue, coreQueue, null, false, false);
+      qpidfactory = new JmsConnectionFactory("amqp://localhost:61616");
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      if (server != null) {
+         server.stop();
+         server = null;
+      }
+   }
+
+   @Test
+   public void testObjectMessage() throws Exception {
+      Connection connection = null;
+      try {
+         connection = factory.createConnection();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageProducer producer = session.createProducer(queue);
+         ArrayList list = new ArrayList();
+         list.add("aString");
+         ObjectMessage objectMessage = session.createObjectMessage(list);
+         producer.send(objectMessage);
+         connection.close();
+
+         connection = qpidfactory.createConnection();
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         queue = session.createQueue(coreQueue.toString());
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+         ObjectMessage receive = (ObjectMessage) consumer.receive(5000);
+         list = (ArrayList) receive.getObject();
+         assertEquals(list.get(0), "aString");
+         connection.close();
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+      }
+      finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+}


Mime
View raw message