qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1162120 - /qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
Date Fri, 26 Aug 2011 14:01:53 GMT
Author: kwall
Date: Fri Aug 26 14:01:52 2011
New Revision: 1162120

URL: http://svn.apache.org/viewvc?rev=1162120&view=rev
Log:
QPID-3454: System test StreamMessageTest would not detect regression in StreamMessage buffer
expansion

Modified:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?rev=1162120&r1=1162119&r2=1162120&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
(original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
Fri Aug 26 14:01:52 2011
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.test.unit.message;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQHeadersExchange;
 import org.apache.qpid.client.AMQQueue;
@@ -50,21 +54,8 @@ import javax.jms.StreamMessage;
  */
 public class StreamMessageTest extends QpidBrokerTestCase
 {
-
     private static final Logger _logger = LoggerFactory.getLogger(StreamMessageTest.class);
 
-    public String _connectionString = "vm://:1";
-
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception
-    {
-        super.tearDown();
-    }
-
     public void testStreamMessageEOF() throws Exception
     {
         Connection con = (AMQConnection) getConnection("guest", "guest");
@@ -114,6 +105,7 @@ public class StreamMessageTest extends Q
         try
         {
             msg2.readByte();
+            fail("Expected exception not thrown");
         }
         catch (Exception e)
         {
@@ -125,6 +117,9 @@ public class StreamMessageTest extends Q
 
     public void testModifyReceivedMessageExpandsBuffer() throws Exception
     {
+        final CountDownLatch awaitMessages = new CountDownLatch(1);
+        final AtomicReference<Throwable> listenerCaughtException = new AtomicReference<Throwable>();
+
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
         AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         AMQQueue queue = new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("testQ"));
@@ -134,28 +129,38 @@ public class StreamMessageTest extends Q
 
                 public void onMessage(Message message)
                 {
-                    StreamMessage sm = (StreamMessage) message;
+                    final StreamMessage sm = (StreamMessage) message;
                     try
                     {
                         sm.clearBody();
+                        // it is legal to extend a stream message's content
                         sm.writeString("dfgjshfslfjshflsjfdlsjfhdsljkfhdsljkfhsd");
                     }
-                    catch (JMSException e)
+                    catch (Throwable t)
+                    {
+                        listenerCaughtException.set(t);
+                    }
+                    finally
                     {
-                        _logger.error("Error when writing large string to received msg: "
+ e, e);
-                        fail("Error when writing large string to received msg" + e);
+                        awaitMessages.countDown();
                     }
                 }
             });
 
         Connection con2 = (AMQConnection) getConnection("guest", "guest");
         AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+        MessageProducer producer = producerSession.createProducer(queue);
         con.start();
         StreamMessage sm = producerSession.createStreamMessage();
         sm.writeInt(42);
-        mandatoryProducer.send(sm);
-        Thread.sleep(2000);
+        producer.send(sm);
+
+        // Allow up to five seconds for the message to arrive with the consumer
+        final boolean completed = awaitMessages.await(5, TimeUnit.SECONDS);
+        assertTrue("Message did not arrive with consumer within a reasonable time", completed);
+        final Throwable listenerException = listenerCaughtException.get();
+        assertNull("No exception should be caught by listener : " + listenerException, listenerException);
+
         con.close();
         con2.close();
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message