activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1027282 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQInputStream.java test/java/org/apache/activemq/streams/JMSInputStreamTest.java
Date Mon, 25 Oct 2010 21:58:04 GMT
Author: tabish
Date: Mon Oct 25 21:58:04 2010
New Revision: 1027282

URL: http://svn.apache.org/viewvc?rev=1027282&view=rev
Log:
apply patch for: https://issues.apache.org/activemq/browse/AMQ-2988

with modifications to make the properties map unmodifiable and use the generic Collections.emptyMap
method instead of creating a new empty map.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java?rev=1027282&r1=1027281&r2=1027282&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
Mon Oct 25 21:58:04 2010
@@ -18,6 +18,7 @@ package org.apache.activemq;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import javax.jms.IllegalStateException;
@@ -52,6 +53,7 @@ public class ActiveMQInputStream extends
     private boolean eosReached;
     private byte buffer[];
     private int pos;
+    private Map<String, Object> jmsProperties;
 
     private ProducerId producerId;
     private long nextSequenceId;
@@ -135,6 +137,19 @@ public class ActiveMQInputStream extends
         }
     }
 
+    /**
+     * Return the JMS Properties which where used to send the InputStream
+     *
+     * @return jmsProperties
+     * @throws IOException
+     */
+    public Map<String, Object> getJMSProperties() throws IOException {
+        if (jmsProperties == null) {
+            fillBuffer();
+        }
+        return jmsProperties;
+    }
+
     public ActiveMQMessage receive() throws JMSException {
         checkClosed();
         MessageDispatch md;
@@ -227,13 +242,24 @@ public class ActiveMQInputStream extends
                     buffer = new byte[(int)bm.getBodyLength()];
                     bm.readBytes(buffer);
                     pos = 0;
+                    if (jmsProperties == null) {
+                        jmsProperties = Collections.unmodifiableMap(new HashMap<String,
Object>(bm.getProperties()));
+                    }
                 } else {
                     eosReached = true;
+                    if (jmsProperties == null) {
+                        // no properties found
+                        jmsProperties = Collections.emptyMap();
+                    }
                 }
                 return;
             }
         } catch (JMSException e) {
             eosReached = true;
+            if (jmsProperties == null) {
+                // no properties found
+                jmsProperties = Collections.emptyMap();
+            }
             throw IOExceptionSupport.create(e);
         }
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java?rev=1027282&r1=1027281&r2=1027282&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
Mon Oct 25 21:58:04 2010
@@ -18,12 +18,20 @@ package org.apache.activemq.streams;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
 
 import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQInputStream;
 import org.apache.activemq.JmsTestSupport;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -38,7 +46,9 @@ public class JMSInputStreamTest extends 
     protected DataInputStream in;
     private ActiveMQConnection connection2;
 
+    private ActiveMQInputStream amqIn;
 
+    
     public static Test suite() {
         return suite(JMSInputStreamTest.class);
     }
@@ -57,12 +67,27 @@ public class JMSInputStreamTest extends 
     protected void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
+    }
+
+    /**
+     * Setup connection and streams
+     * 
+     * @param props
+     * @throws JMSException
+     */
+    private void setUpConnection(Map<String, Object> props) throws JMSException {
         connection2 = (ActiveMQConnection)factory.createConnection(userName, password);
         connections.add(connection2);
-        out = new DataOutputStream(connection.createOutputStream(destination));
-        in = new DataInputStream(connection2.createInputStream(destination));
+        OutputStream amqOut;
+        if (props != null) {
+            amqOut = connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE,
Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+        } else {
+            amqOut = connection.createOutputStream(destination);
+        }
+        out = new DataOutputStream(amqOut);
+        amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
+        in = new DataInputStream(amqIn);
     }
-
     /*
      * @see TestCase#tearDown()
      */
@@ -71,6 +96,7 @@ public class JMSInputStreamTest extends 
     }
 
     public void testStreams() throws Exception {
+        setUpConnection(null);
         out.writeInt(4);
         out.flush();
         assertTrue(in.readInt() == 4);
@@ -85,12 +111,70 @@ public class JMSInputStreamTest extends 
             out.writeLong(i);
         }
         out.flush();
+        
         for (int i = 0; i < 100; i++) {
             assertTrue(in.readLong() == i);
         }
     }
 
+    // Test for AMQ-2988
+    public void testStreamsWithProperties() throws Exception {
+        String name1 = "PROPERTY_1";
+        String name2 = "PROPERTY_2";
+        String value1 = "VALUE_1";
+        String value2 = "VALUE_2";
+        Map<String,Object> jmsProperties = new HashMap<String, Object>();
+        jmsProperties.put(name1, value1);
+        jmsProperties.put(name2, value2);
+        setUpConnection(jmsProperties);
+        
+        out.writeInt(4);
+        out.flush();
+        assertTrue(in.readInt() == 4);
+        out.writeFloat(2.3f);
+        out.flush();
+        assertTrue(in.readFloat() == 2.3f);
+        String str = "this is a test string";
+        out.writeUTF(str);
+        out.flush();
+        assertTrue(in.readUTF().equals(str));
+        for (int i = 0; i < 100; i++) {
+            out.writeLong(i);
+        }
+        out.flush();
+     
+        // check properties before we try to read the stream
+        checkProperties(jmsProperties);
+        
+        for (int i = 0; i < 100; i++) {
+            assertTrue(in.readLong() == i);
+        }
+        
+        // check again after read was done
+        checkProperties(jmsProperties);
+    }
+    
+    // check if the received stream has the properties set
+    // Test for AMQ-2988
+    private void checkProperties(Map<String, Object> jmsProperties) throws IOException
{
+        Map<String, Object> receivedJmsProps = amqIn.getJMSProperties();
+        
+        // we should at least have the same amount or more properties
+        assertTrue(jmsProperties.size() <= receivedJmsProps.size());
+        
+
+        // check the properties to see if we have everything in there
+        Iterator<String>  propsIt = jmsProperties.keySet().iterator();
+        while(propsIt.hasNext()) {
+            String key = propsIt.next();
+            assertTrue(receivedJmsProps.containsKey(key));
+            assertEquals(jmsProperties.get(key), receivedJmsProps.get(key));
+        }
+    }
+    
     public void testLarge() throws Exception {
+        setUpConnection(null);
+        
         final int testData = 23;
         final int dataLength = 4096;
         final int count = 1024;



Mime
View raw message