activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1429909 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/streams/
Date Mon, 07 Jan 2013 17:22:30 GMT
Author: tabish
Date: Mon Jan  7 17:22:30 2013
New Revision: 1429909

URL: http://svn.apache.org/viewvc?rev=1429909&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4241

Add a configuration option to control whether properties are set on every output message or
only the first one. 

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java?rev=1429909&r1=1429908&r2=1429909&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
Mon Jan  7 17:22:30 2013
@@ -21,9 +21,11 @@ import java.io.InputStream;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
+
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -39,7 +41,7 @@ import org.apache.activemq.util.Introspe
 import org.apache.activemq.util.JMSExceptionSupport;
 
 /**
- * 
+ *
  */
 public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
 
@@ -57,7 +59,7 @@ public class ActiveMQInputStream extends
 
     private ProducerId producerId;
     private long nextSequenceId;
-    private long timeout;
+    private final long timeout;
     private boolean firstReceived;
 
     public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination
dest, String selector, boolean noLocal, String name, int prefetch,  long timeout)
@@ -86,7 +88,7 @@ public class ActiveMQInputStream extends
 
         if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
         this.timeout = timeout;
-        
+
         this.info = new ConsumerInfo(consumerId);
         this.info.setSubscriptionName(name);
 
@@ -155,6 +157,21 @@ public class ActiveMQInputStream extends
         return jmsProperties;
     }
 
+    /**
+     * This method allows the client to receive the Stream data as unaltered ActiveMQMessage
+     * object which is how the split stream data is sent.  Each message will contains one
+     * chunk of the written bytes as well as a valid message group sequence id.  The EOS
+     * message will have a message group sequence id of -1.
+     *
+     * This method is useful for testing, but should never be mixed with calls to the
+     * normal stream receive methods as it will break the normal stream processing flow
+     * and can lead to loss of data.
+     *
+     * @return an ActiveMQMessage object that either contains byte data or an end of strem
+     *         marker.
+     * @throws JMSException
+     * @throws ReadTimeoutException
+     */
     public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
         checkClosed();
         MessageDispatch md;
@@ -198,7 +215,7 @@ public class ActiveMQInputStream extends
     }
 
     /**
-     * 
+     *
      * @see InputStream#read()
      * @throws ReadTimeoutException if a timeout was given and the first chunk of the message
could not read within the timeout
      */
@@ -211,9 +228,9 @@ public class ActiveMQInputStream extends
 
         return buffer[pos++] & 0xff;
     }
-    
+
     /**
-     * 
+     *
      * @see InputStream#read(byte[], int, int)
      * @throws ReadTimeoutException if a timeout was given and the first chunk of the message
could not read within the timeout
      */
@@ -285,6 +302,7 @@ public class ActiveMQInputStream extends
         }
     }
 
+    @Override
     public void dispatch(MessageDispatch md) {
         unconsumedMessages.enqueue(md);
     }
@@ -294,12 +312,12 @@ public class ActiveMQInputStream extends
         return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" +
producerId + " }";
     }
 
-
     /**
      * Exception which should get thrown if the first chunk of the stream could not read
within the configured timeout
-     *
      */
     public class ReadTimeoutException extends IOException {
+        private static final long serialVersionUID = -3217758894326719909L;
+
         public ReadTimeoutException() {
             super();
         }

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java?rev=1429909&r1=1429908&r2=1429909&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
Mon Jan  7 17:22:30 2013
@@ -33,12 +33,13 @@ import org.apache.activemq.command.Produ
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/**
- *
- */
 public class ActiveMQOutputStream extends OutputStream implements Disposable {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class);
+
     protected int count;
 
     final byte buffer[];
@@ -53,6 +54,7 @@ public class ActiveMQOutputStream extend
     private final int priority;
     private final long timeToLive;
     private boolean alwaysSyncSend = false;
+    private boolean addPropertiesOnFirstMsgOnly = false;
 
     /**
      * JMS Property which is used to specify the size (in kb) which is used as chunk size
when splitting the stream. Default is 64kb
@@ -91,6 +93,15 @@ public class ActiveMQOutputStream extend
             Map<String, String> options = new HashMap<String, String>(destination.getOptions());
             IntrospectionSupport.setProperties(this, options, "producer.");
             IntrospectionSupport.setProperties(this.info, options, "producer.");
+            if (options.size() > 0) {
+                String msg = "There are " + options.size()
+                    + " producer options that couldn't be set on the producer."
+                    + " Check the options are spelled correctly."
+                    + " Unknown parameters=[" + options + "]."
+                    + " This producer cannot be started.";
+                LOG.warn(msg);
+                throw new ConfigurationException(msg);
+            }
         }
 
         this.info.setDestination(destination);
@@ -99,6 +110,7 @@ public class ActiveMQOutputStream extend
         this.connection.asyncSendPacket(info);
     }
 
+    @Override
     public void close() throws IOException {
         if (!closed) {
             flushBuffer();
@@ -113,6 +125,7 @@ public class ActiveMQOutputStream extend
         }
     }
 
+    @Override
     public void dispose() {
         if (!closed) {
             this.connection.removeOutputStream(this);
@@ -120,6 +133,7 @@ public class ActiveMQOutputStream extend
         }
     }
 
+    @Override
     public synchronized void write(int b) throws IOException {
         buffer[count++] = (byte) b;
         if (count == buffer.length) {
@@ -127,6 +141,7 @@ public class ActiveMQOutputStream extend
         }
     }
 
+    @Override
     public synchronized void write(byte b[], int off, int len) throws IOException {
         while (len > 0) {
             int max = Math.min(len, buffer.length - count);
@@ -142,6 +157,7 @@ public class ActiveMQOutputStream extend
         }
     }
 
+    @Override
     public synchronized void flush() throws IOException {
         flushBuffer();
     }
@@ -164,7 +180,7 @@ public class ActiveMQOutputStream extend
      * @throws JMSException
      */
     private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
-        if (properties != null) {
+        if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly))
{
             for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();)
{
                 String key = iter.next();
                 Object value = properties.get(key);
@@ -182,6 +198,7 @@ public class ActiveMQOutputStream extend
         connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive,
!eosMessage && !isAlwaysSyncSend());
     }
 
+    @Override
     public String toString() {
         return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
     }
@@ -194,4 +211,11 @@ public class ActiveMQOutputStream extend
         this.alwaysSyncSend = alwaysSyncSend;
     }
 
+    public boolean isAddPropertiesOnFirstMsgOnly() {
+        return addPropertiesOnFirstMsgOnly;
+    }
+
+    public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) {
+        this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java?rev=1429909&r1=1429908&r2=1429909&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
Mon Jan  7 17:22:30 2013
@@ -19,7 +19,6 @@ package org.apache.activemq.streams;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -30,9 +29,12 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 
 import junit.framework.Test;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQInputStream;
+import org.apache.activemq.ActiveMQOutputStream;
 import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 
@@ -47,8 +49,8 @@ public class JMSInputStreamTest extends 
     private ActiveMQConnection connection2;
 
     private ActiveMQInputStream amqIn;
+    private ActiveMQOutputStream amqOut;
 
-    
     public static Test suite() {
         return suite(JMSInputStreamTest.class);
     }
@@ -58,32 +60,24 @@ public class JMSInputStreamTest extends 
     }
 
     public void initCombos() {
-        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST.QUEUE"),
new ActiveMQTopic("TEST.TOPIC")});
+        addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST.QUEUE"),
new ActiveMQTopic("TEST.TOPIC") });
     }
 
-    /*
-     * @see TestCase#setUp()
-     */
+    @Override
     protected void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
     }
 
-    /**
-     * Setup connection and streams
-     * 
-     * @param props
-     * @throws JMSException
-     */
     private void setUpConnection(Map<String, Object> props, long timeout) throws JMSException
{
-        connection2 = (ActiveMQConnection)factory.createConnection(userName, password);
+        connection2 = (ActiveMQConnection) factory.createConnection(userName, password);
         connections.add(connection2);
-        OutputStream amqOut;
         if (props != null) {
-            amqOut = connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE,
Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+            amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination, props,
Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
         } else {
-            amqOut = connection.createOutputStream(destination);
+            amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination);
         }
+
         out = new DataOutputStream(amqOut);
         if (timeout == -1) {
             amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
@@ -92,9 +86,11 @@ public class JMSInputStreamTest extends 
         }
         in = new DataInputStream(amqIn);
     }
+
     /*
      * @see TestCase#tearDown()
      */
+    @Override
     protected void tearDown() throws Exception {
         super.tearDown();
     }
@@ -104,7 +100,7 @@ public class JMSInputStreamTest extends 
      */
     public void testInputStreamTimeout() throws Exception {
         long timeout = 500;
-        
+
         setUpConnection(null, timeout);
         try {
             in.read();
@@ -113,7 +109,6 @@ public class JMSInputStreamTest extends 
             // timeout reached, everything ok
         }
         in.close();
-        
     }
 
     // Test for AMQ-2988
@@ -122,11 +117,58 @@ public class JMSInputStreamTest extends 
         String name2 = "PROPERTY_2";
         String value1 = "VALUE_1";
         String value2 = "VALUE_2";
-        Map<String,Object> jmsProperties = new HashMap<String, Object>();
+        Map<String, Object> jmsProperties = new HashMap<String, Object>();
+        jmsProperties.put(name1, value1);
+        jmsProperties.put(name2, value2);
+        setUpConnection(jmsProperties, -1);
+
+        out.writeInt(4);
+        out.flush();
+        assertTrue(in.readInt() == 4);
+        out.writeFloat(2.3f);
+        out.flush();
+        assertTrue(in.readFloat() == 2.3f);
+        String str = "this is a test string";
+        out.writeUTF(str);
+        out.flush();
+        assertTrue(in.readUTF().equals(str));
+        for (int i = 0; i < 100; i++) {
+            out.writeLong(i);
+        }
+        out.flush();
+
+        // check properties before we try to read the stream
+        checkProperties(jmsProperties);
+
+        for (int i = 0; i < 100; i++) {
+            assertTrue(in.readLong() == i);
+        }
+
+        // check again after read was done
+        checkProperties(jmsProperties);
+    }
+
+    public void testStreamsWithPropertiesOnlyOnFirstMessage() throws Exception {
+        String name1 = "PROPERTY_1";
+        String name2 = "PROPERTY_2";
+        String value1 = "VALUE_1";
+        String value2 = "VALUE_2";
+        Map<String, Object> jmsProperties = new HashMap<String, Object>();
         jmsProperties.put(name1, value1);
         jmsProperties.put(name2, value2);
+
+        ActiveMQDestination dest = (ActiveMQDestination) destination;
+
+        if (dest.isQueue()) {
+            destination = new ActiveMQQueue(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
+        } else {
+            destination = new ActiveMQTopic(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
+        }
+
         setUpConnection(jmsProperties, -1);
-        
+
+        assertTrue(amqOut.isAddPropertiesOnFirstMsgOnly());
+
         out.writeInt(4);
         out.flush();
         assertTrue(in.readInt() == 4);
@@ -141,39 +183,38 @@ public class JMSInputStreamTest extends 
             out.writeLong(i);
         }
         out.flush();
-     
+
         // check properties before we try to read the stream
         checkProperties(jmsProperties);
-        
+
         for (int i = 0; i < 100; i++) {
             assertTrue(in.readLong() == i);
         }
-        
+
         // check again after read was done
         checkProperties(jmsProperties);
     }
-    
+
     // check if the received stream has the properties set
     // Test for AMQ-2988
     private void checkProperties(Map<String, Object> jmsProperties) throws IOException
{
         Map<String, Object> receivedJmsProps = amqIn.getJMSProperties();
-        
+
         // we should at least have the same amount or more properties
         assertTrue(jmsProperties.size() <= receivedJmsProps.size());
-        
 
         // check the properties to see if we have everything in there
-        Iterator<String>  propsIt = jmsProperties.keySet().iterator();
-        while(propsIt.hasNext()) {
+        Iterator<String> propsIt = jmsProperties.keySet().iterator();
+        while (propsIt.hasNext()) {
             String key = propsIt.next();
             assertTrue(receivedJmsProps.containsKey(key));
             assertEquals(jmsProperties.get(key), receivedJmsProps.get(key));
         }
     }
-    
+
     public void testLarge() throws Exception {
         setUpConnection(null, -1);
-        
+
         final int testData = 23;
         final int dataLength = 4096;
         final int count = 1024;
@@ -183,6 +224,7 @@ public class JMSInputStreamTest extends 
         }
         final AtomicBoolean complete = new AtomicBoolean(false);
         Thread runner = new Thread(new Runnable() {
+            @Override
             public void run() {
                 try {
                     for (int x = 0; x < count; x++) {
@@ -213,7 +255,7 @@ public class JMSInputStreamTest extends 
         }
         assertTrue(complete.get());
     }
-    
+
     public void testStreams() throws Exception {
         setUpConnection(null, -1);
         out.writeInt(4);
@@ -230,7 +272,7 @@ public class JMSInputStreamTest extends 
             out.writeLong(i);
         }
         out.flush();
-        
+
         for (int i = 0; i < 100; i++) {
             assertTrue(in.readLong() == i);
         }



Mime
View raw message