From commits-return-15399-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Thu Jan 20 14:03:56 2011 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 64750 invoked from network); 20 Jan 2011 14:03:56 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 20 Jan 2011 14:03:56 -0000 Received: (qmail 925 invoked by uid 500); 20 Jan 2011 14:03:56 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 839 invoked by uid 500); 20 Jan 2011 14:03:54 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 831 invoked by uid 99); 20 Jan 2011 14:03:53 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Jan 2011 14:03:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Jan 2011 14:03:51 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8B3A223889E0; Thu, 20 Jan 2011 14:03:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1061304 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/streams/ Date: Thu, 20 Jan 2011 14:03:31 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110120140331.8B3A223889E0@eris.apache.org> Author: dejanb Date: Thu Jan 20 14:03:30 2011 New Revision: 1061304 URL: http://svn.apache.org/viewvc?rev=1061304&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3010 - timeout for ActiveMQInputStream Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/StreamConnection.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1061304&r1=1061303&r2=1061304&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Jan 20 14:03:30 2011 @@ -2040,9 +2040,15 @@ public class ActiveMQConnection implemen } public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { - return doCreateInputStream(dest, messageSelector, noLocal, null); + return createInputStream(dest, messageSelector, noLocal, -1); } + + + public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException { + return doCreateInputStream(dest, messageSelector, noLocal, null, timeout); + } + public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { return createInputStream(dest, null, false); } @@ -2052,13 +2058,17 @@ public class ActiveMQConnection implemen } public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { - return doCreateInputStream(dest, messageSelector, noLocal, name); + return createDurableInputStream(dest, name, messageSelector, noLocal, -1); } - private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException { + public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException { + return doCreateInputStream(dest, messageSelector, noLocal, name, timeout); + } + + private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); - return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch()); + return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout); } /** @@ -2367,4 +2377,5 @@ public class ActiveMQConnection implemen public void setCheckForDuplicates(boolean checkForDuplicates) { this.checkForDuplicates = checkForDuplicates; } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java?rev=1061304&r1=1061303&r2=1061304&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java Thu Jan 20 14:03:30 2011 @@ -57,8 +57,10 @@ public class ActiveMQInputStream extends private ProducerId producerId; private long nextSequenceId; + private long timeout; + private boolean firstReceived; - public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch) + public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout) throws JMSException { this.connection = connection; @@ -82,6 +84,9 @@ public class ActiveMQInputStream extends } } + if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1"); + this.timeout = timeout; + this.info = new ConsumerInfo(consumerId); this.info.setSubscriptionName(name); @@ -150,11 +155,17 @@ public class ActiveMQInputStream extends return jmsProperties; } - public ActiveMQMessage receive() throws JMSException { + public ActiveMQMessage receive() throws JMSException, ReadTimeoutException { checkClosed(); MessageDispatch md; try { - md = unconsumedMessages.dequeue(-1); + if (firstReceived || timeout == -1) { + md = unconsumedMessages.dequeue(-1); + firstReceived = true; + } else { + md = unconsumedMessages.dequeue(timeout); + if (md == null) throw new ReadTimeoutException(); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw JMSExceptionSupport.create(e); @@ -186,6 +197,11 @@ public class ActiveMQInputStream extends } } + /** + * + * @see InputStream#read() + * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout + */ @Override public int read() throws IOException { fillBuffer(); @@ -195,7 +211,12 @@ public class ActiveMQInputStream extends return buffer[pos++] & 0xff; } - + + /** + * + * @see InputStream#read(byte[], int, int) + * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout + */ @Override public int read(byte[] b, int off, int len) throws IOException { fillBuffer(); @@ -273,4 +294,14 @@ public class ActiveMQInputStream extends return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }"; } + + /** + * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout + * + */ + public class ReadTimeoutException extends IOException { + public ReadTimeoutException() { + super(); + } + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/StreamConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/StreamConnection.java?rev=1061304&r1=1061303&r2=1061304&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/StreamConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/StreamConnection.java Thu Jan 20 14:03:30 2011 @@ -42,11 +42,15 @@ public interface StreamConnection extend InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException; + InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException; + InputStream createDurableInputStream(Topic dest, String name) throws JMSException; InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException; InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException; + + InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException; OutputStream createOutputStream(Destination dest) throws JMSException; Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java?rev=1061304&r1=1061303&r2=1061304&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java Thu Jan 20 14:03:30 2011 @@ -75,7 +75,7 @@ public class JMSInputStreamTest extends * @param props * @throws JMSException */ - private void setUpConnection(Map props) throws JMSException { + private void setUpConnection(Map props, long timeout) throws JMSException { connection2 = (ActiveMQConnection)factory.createConnection(userName, password); connections.add(connection2); OutputStream amqOut; @@ -85,7 +85,11 @@ public class JMSInputStreamTest extends amqOut = connection.createOutputStream(destination); } out = new DataOutputStream(amqOut); - amqIn = (ActiveMQInputStream) connection2.createInputStream(destination); + if (timeout == -1) { + amqIn = (ActiveMQInputStream) connection2.createInputStream(destination); + } else { + amqIn = (ActiveMQInputStream) connection2.createInputStream(destination, null, false, timeout); + } in = new DataInputStream(amqIn); } /* @@ -95,26 +99,21 @@ public class JMSInputStreamTest extends super.tearDown(); } - public void testStreams() throws Exception { - setUpConnection(null); - out.writeInt(4); - out.flush(); - assertTrue(in.readInt() == 4); - out.writeFloat(2.3f); - out.flush(); - assertTrue(in.readFloat() == 2.3f); - String str = "this is a test string"; - out.writeUTF(str); - out.flush(); - assertTrue(in.readUTF().equals(str)); - for (int i = 0; i < 100; i++) { - out.writeLong(i); - } - out.flush(); + /** + * Test for AMQ-3010 + */ + public void testInputStreamTimeout() throws Exception { + long timeout = 500; - for (int i = 0; i < 100; i++) { - assertTrue(in.readLong() == i); + setUpConnection(null, timeout); + try { + in.read(); + fail(); + } catch (ActiveMQInputStream.ReadTimeoutException e) { + // timeout reached, everything ok } + in.close(); + } // Test for AMQ-2988 @@ -126,7 +125,7 @@ public class JMSInputStreamTest extends Map jmsProperties = new HashMap(); jmsProperties.put(name1, value1); jmsProperties.put(name2, value2); - setUpConnection(jmsProperties); + setUpConnection(jmsProperties, -1); out.writeInt(4); out.flush(); @@ -173,7 +172,7 @@ public class JMSInputStreamTest extends } public void testLarge() throws Exception { - setUpConnection(null); + setUpConnection(null, -1); final int testData = 23; final int dataLength = 4096; @@ -214,4 +213,26 @@ public class JMSInputStreamTest extends } assertTrue(complete.get()); } + + public void testStreams() throws Exception { + setUpConnection(null, -1); + out.writeInt(4); + out.flush(); + assertTrue(in.readInt() == 4); + out.writeFloat(2.3f); + out.flush(); + assertTrue(in.readFloat() == 2.3f); + String str = "this is a test string"; + out.writeUTF(str); + out.flush(); + assertTrue(in.readUTF().equals(str)); + for (int i = 0; i < 100; i++) { + out.writeLong(i); + } + out.flush(); + + for (int i = 0; i < 100; i++) { + assertTrue(in.readLong() == i); + } + } }