activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1061304 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/streams/
Date Thu, 20 Jan 2011 14:03:31 GMT
Author: dejanb
Date: Thu Jan 20 14:03:30 2011
New Revision: 1061304

URL: http://svn.apache.org/viewvc?rev=1061304&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3010 - timeout for ActiveMQInputStream

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/StreamConnection.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1061304&r1=1061303&r2=1061304&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Jan 20 14:03:30 2011
@@ -2040,9 +2040,15 @@ public class ActiveMQConnection implemen
     }
 
     public InputStream createInputStream(Destination dest, String messageSelector, boolean
noLocal) throws JMSException {
-        return doCreateInputStream(dest, messageSelector, noLocal, null);
+        return createInputStream(dest, messageSelector, noLocal,  -1);
     }
 
+
+
+    public InputStream createInputStream(Destination dest, String messageSelector, boolean
noLocal, long timeout) throws JMSException {
+        return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
+    }
+    
     public InputStream createDurableInputStream(Topic dest, String name) throws JMSException
{
         return createInputStream(dest, null, false);
     }
@@ -2052,13 +2058,17 @@ public class ActiveMQConnection implemen
     }
 
     public InputStream createDurableInputStream(Topic dest, String name, String messageSelector,
boolean noLocal) throws JMSException {
-        return doCreateInputStream(dest, messageSelector, noLocal, name);
+        return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
     }
 
-    private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean
noLocal, String subName) throws JMSException {
+    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector,
boolean noLocal, long timeout) throws JMSException {
+        return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
+    }
+    
+    private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean
noLocal, String subName, long timeout) throws JMSException {
         checkClosedOrFailed();
         ensureConnectionInfoSent();
-        return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest),
messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch());
+        return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest),
messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
     }
 
     /**
@@ -2367,4 +2377,5 @@ public class ActiveMQConnection implemen
     public void setCheckForDuplicates(boolean checkForDuplicates) {
         this.checkForDuplicates = checkForDuplicates;
     }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java?rev=1061304&r1=1061303&r2=1061304&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
Thu Jan 20 14:03:30 2011
@@ -57,8 +57,10 @@ public class ActiveMQInputStream extends
 
     private ProducerId producerId;
     private long nextSequenceId;
+    private long timeout;
+    private boolean firstReceived;
 
-    public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination
dest, String selector, boolean noLocal, String name, int prefetch)
+    public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination
dest, String selector, boolean noLocal, String name, int prefetch,  long timeout)
         throws JMSException {
         this.connection = connection;
 
@@ -82,6 +84,9 @@ public class ActiveMQInputStream extends
             }
         }
 
+        if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
+        this.timeout = timeout;
+        
         this.info = new ConsumerInfo(consumerId);
         this.info.setSubscriptionName(name);
 
@@ -150,11 +155,17 @@ public class ActiveMQInputStream extends
         return jmsProperties;
     }
 
-    public ActiveMQMessage receive() throws JMSException {
+    public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
         checkClosed();
         MessageDispatch md;
         try {
-            md = unconsumedMessages.dequeue(-1);
+            if (firstReceived || timeout == -1) {
+                md = unconsumedMessages.dequeue(-1);
+                firstReceived = true;
+            } else {
+                md = unconsumedMessages.dequeue(timeout);
+                if (md == null) throw new ReadTimeoutException();
+            }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw JMSExceptionSupport.create(e);
@@ -186,6 +197,11 @@ public class ActiveMQInputStream extends
         }
     }
 
+    /**
+     * 
+     * @see InputStream#read()
+     * @throws ReadTimeoutException if a timeout was given and the first chunk of the message
could not read within the timeout
+     */
     @Override
     public int read() throws IOException {
         fillBuffer();
@@ -195,7 +211,12 @@ public class ActiveMQInputStream extends
 
         return buffer[pos++] & 0xff;
     }
-
+    
+    /**
+     * 
+     * @see InputStream#read(byte[], int, int)
+     * @throws ReadTimeoutException if a timeout was given and the first chunk of the message
could not read within the timeout
+     */
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
         fillBuffer();
@@ -273,4 +294,14 @@ public class ActiveMQInputStream extends
         return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" +
producerId + " }";
     }
 
+
+    /**
+     * Exception which should get thrown if the first chunk of the stream could not read
within the configured timeout
+     *
+     */
+    public class ReadTimeoutException extends IOException {
+        public ReadTimeoutException() {
+            super();
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/StreamConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/StreamConnection.java?rev=1061304&r1=1061303&r2=1061304&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/StreamConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/StreamConnection.java Thu
Jan 20 14:03:30 2011
@@ -42,11 +42,15 @@ public interface StreamConnection extend
 
     InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal)
throws JMSException;
 
+    InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal,
long timeout) throws JMSException;
+
     InputStream createDurableInputStream(Topic dest, String name) throws JMSException;
 
     InputStream createDurableInputStream(Topic dest, String name, String messageSelector)
throws JMSException;
 
     InputStream createDurableInputStream(Topic dest, String name, String messageSelector,
boolean noLocal) throws JMSException;
+    
+    InputStream createDurableInputStream(Topic dest, String name, String messageSelector,
boolean noLocal, long timeout) throws JMSException;
 
     OutputStream createOutputStream(Destination dest) throws JMSException;
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java?rev=1061304&r1=1061303&r2=1061304&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
Thu Jan 20 14:03:30 2011
@@ -75,7 +75,7 @@ public class JMSInputStreamTest extends 
      * @param props
      * @throws JMSException
      */
-    private void setUpConnection(Map<String, Object> props) throws JMSException {
+    private void setUpConnection(Map<String, Object> props, long timeout) throws JMSException
{
         connection2 = (ActiveMQConnection)factory.createConnection(userName, password);
         connections.add(connection2);
         OutputStream amqOut;
@@ -85,7 +85,11 @@ public class JMSInputStreamTest extends 
             amqOut = connection.createOutputStream(destination);
         }
         out = new DataOutputStream(amqOut);
-        amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
+        if (timeout == -1) {
+            amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
+        } else {
+            amqIn = (ActiveMQInputStream) connection2.createInputStream(destination, null,
false, timeout);
+        }
         in = new DataInputStream(amqIn);
     }
     /*
@@ -95,26 +99,21 @@ public class JMSInputStreamTest extends 
         super.tearDown();
     }
 
-    public void testStreams() throws Exception {
-        setUpConnection(null);
-        out.writeInt(4);
-        out.flush();
-        assertTrue(in.readInt() == 4);
-        out.writeFloat(2.3f);
-        out.flush();
-        assertTrue(in.readFloat() == 2.3f);
-        String str = "this is a test string";
-        out.writeUTF(str);
-        out.flush();
-        assertTrue(in.readUTF().equals(str));
-        for (int i = 0; i < 100; i++) {
-            out.writeLong(i);
-        }
-        out.flush();
+    /**
+     * Test for AMQ-3010
+     */
+    public void testInputStreamTimeout() throws Exception {
+        long timeout = 500;
         
-        for (int i = 0; i < 100; i++) {
-            assertTrue(in.readLong() == i);
+        setUpConnection(null, timeout);
+        try {
+            in.read();
+            fail();
+        } catch (ActiveMQInputStream.ReadTimeoutException e) {
+            // timeout reached, everything ok
         }
+        in.close();
+        
     }
 
     // Test for AMQ-2988
@@ -126,7 +125,7 @@ public class JMSInputStreamTest extends 
         Map<String,Object> jmsProperties = new HashMap<String, Object>();
         jmsProperties.put(name1, value1);
         jmsProperties.put(name2, value2);
-        setUpConnection(jmsProperties);
+        setUpConnection(jmsProperties, -1);
         
         out.writeInt(4);
         out.flush();
@@ -173,7 +172,7 @@ public class JMSInputStreamTest extends 
     }
     
     public void testLarge() throws Exception {
-        setUpConnection(null);
+        setUpConnection(null, -1);
         
         final int testData = 23;
         final int dataLength = 4096;
@@ -214,4 +213,26 @@ public class JMSInputStreamTest extends 
         }
         assertTrue(complete.get());
     }
+    
+    public void testStreams() throws Exception {
+        setUpConnection(null, -1);
+        out.writeInt(4);
+        out.flush();
+        assertTrue(in.readInt() == 4);
+        out.writeFloat(2.3f);
+        out.flush();
+        assertTrue(in.readFloat() == 2.3f);
+        String str = "this is a test string";
+        out.writeUTF(str);
+        out.flush();
+        assertTrue(in.readUTF().equals(str));
+        for (int i = 0; i < 100; i++) {
+            out.writeLong(i);
+        }
+        out.flush();
+        
+        for (int i = 0; i < 100; i++) {
+            assertTrue(in.readLong() == i);
+        }
+    }
 }



Mime
View raw message