activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r472237 - in /incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback: DelegatingTransactionalMessageListener.java RollbacksWhileConsumingLargeQueueTest.java
Date Tue, 07 Nov 2006 20:30:38 GMT
Author: chirino
Date: Tue Nov  7 12:30:37 2006
New Revision: 472237

URL: http://svn.apache.org/viewvc?view=rev&rev=472237
Log:
Added message ordering assertions and also a test case that uses consumer.receive() instead
of a messsage listener

Modified:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java?view=diff&rev=472237&r1=472236&r2=472237
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
Tue Nov  7 12:30:37 2006
@@ -55,7 +55,7 @@
             underlyingListener.onMessage(message);
             session.commit();
         }
-        catch (Exception e) {
+        catch (Throwable e) {
             rollback();
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java?view=diff&rev=472237&r1=472236&r2=472237
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
Tue Nov  7 12:30:37 2006
@@ -17,79 +17,151 @@
  */
 package org.apache.activemq.test.rollback;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.springframework.jms.core.MessageCreator;
-
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.springframework.jms.core.MessageCreator;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * @version $Revision$
  */
-public class RollbacksWhileConsumingLargeQueueTest extends EmbeddedBrokerTestSupport implements
MessageListener {
-
-    protected int numberOfMessagesOnQueue = 6500;
-    private Connection connection;
-    private DelegatingTransactionalMessageListener messageListener;
-    private AtomicInteger counter = new AtomicInteger(0);
-    private CountDownLatch latch;
-
-    public void testConsumeOnFullQueue() throws Exception {
-        boolean answer = latch.await(1000, TimeUnit.SECONDS);
-
-        System.out.println("Received: " + counter.get() + "  message(s)");
-        assertTrue("Did not receive the latch!", answer);
-    }
-
-
-    protected void setUp() throws Exception {
-        super.setUp();
-
-        connection = createConnection();
-        connection.start();
-
-        // lets fill the queue up
-        for (int i = 0; i < numberOfMessagesOnQueue; i++) {
-            template.send(createMessageCreator(i));
-        }
-
-        latch = new CountDownLatch(numberOfMessagesOnQueue);
-        messageListener = new DelegatingTransactionalMessageListener(this, connection, destination);
-    }
-
-
-    protected void tearDown() throws Exception {
-        if (connection != null) {
-            connection.close();
-        }
-        super.tearDown();
-    }
-
-    protected MessageCreator createMessageCreator(final int i) {
-        return new MessageCreator() {
-            public Message createMessage(Session session) throws JMSException {
-                TextMessage answer = session.createTextMessage("Message: " + i);
-                answer.setIntProperty("Counter", i);
-                return answer;
-            }
-        };
-    }
-
-    public void onMessage(Message message) {
-        int value = counter.incrementAndGet();
-        if (value % 10 == 0) {
-            throw new RuntimeException("Dummy exception on message: " + value);
-        }
-
-        log.info("Received message: " + value + " content: " + message);
+public class RollbacksWhileConsumingLargeQueueTest extends
+		EmbeddedBrokerTestSupport implements MessageListener {
 
-        latch.countDown();
-    }
+	protected int numberOfMessagesOnQueue = 6500;
+	private Connection connection;
+	private AtomicInteger deliveryCounter = new AtomicInteger(0);
+	private AtomicInteger ackCounter = new AtomicInteger(0);
+	private CountDownLatch latch;
+	private Throwable failure;
+
+	public void xtestWithReciever() throws Throwable {
+		latch = new CountDownLatch(numberOfMessagesOnQueue);
+		Session session = connection.createSession(true, 0);
+		MessageConsumer consumer = session.createConsumer(destination);
+
+		long start = System.currentTimeMillis();
+		while ((System.currentTimeMillis() - start) < 1000*1000) {
+			if (getFailure() != null) {
+				throw getFailure();
+			}
+			
+			// Are we done receiving all the messages.
+			if( ackCounter.get() == numberOfMessagesOnQueue )
+				return;
+
+			Message message = consumer.receive(1000);
+			if (message == null)
+				continue;
+
+			try {
+				onMessage(message);
+				session.commit();
+			} catch (Throwable e) {
+				session.rollback();
+			}
+		}
+
+		fail("Did not receive all the messages.");
+	}
+
+	public void testWithMessageListener() throws Throwable {
+		latch = new CountDownLatch(numberOfMessagesOnQueue);
+		new DelegatingTransactionalMessageListener(this, connection,
+				destination);
+
+		long start = System.currentTimeMillis();
+		while ((System.currentTimeMillis() - start) < 1000*1000) {
+
+			if (getFailure() != null) {
+				throw getFailure();
+			}
+
+			if (latch.await(1, TimeUnit.SECONDS)) {
+				System.out.println("Received: " + deliveryCounter.get()
+						+ "  message(s)");
+				return;
+			}
+
+		}
+
+		fail("Did not receive all the messages.");
+	}
+
+
+	protected void setUp() throws Exception {
+		super.setUp();
+
+		connection = createConnection();
+		connection.start();
+
+		// lets fill the queue up
+		for (int i = 0; i < numberOfMessagesOnQueue; i++) {
+			template.send(createMessageCreator(i));
+		}
+
+	}
+
+	protected void tearDown() throws Exception {
+		if (connection != null) {
+			connection.close();
+		}
+		super.tearDown();
+	}
+
+	protected MessageCreator createMessageCreator(final int i) {
+		return new MessageCreator() {
+			public Message createMessage(Session session) throws JMSException {
+				TextMessage answer = session.createTextMessage("Message: " + i);
+				answer.setIntProperty("Counter", i);
+				return answer;
+			}
+		};
+	}
+
+	public void onMessage(Message message) {
+		String msgId = null;
+		String msgText = null;
+
+		try {
+			msgId = message.getJMSMessageID();
+			msgText = ((TextMessage) message).getText();
+		} catch (JMSException e) {
+			setFailure(e);
+		}
+
+		try {
+			assertEquals("Message: " + ackCounter.get(), msgText);
+		} catch (Throwable e) {
+			setFailure(e);
+		}
+
+		int value = deliveryCounter.incrementAndGet();
+		if (value % 2 == 0) {
+			log.info("Rolling Back message: " + value + " id: " + msgId + ", content: " + msgText);
+			throw new RuntimeException("Dummy exception on message: " + value);
+		}
+
+		log.info("Received message: " + value + " id: " + msgId + ", content: " + msgText);
+		ackCounter.incrementAndGet();
+		latch.countDown();
+	}
+
+	public synchronized Throwable getFailure() {
+		return failure;
+	}
+
+	public synchronized void setFailure(Throwable failure) {
+		this.failure = failure;
+	}
 }



Mime
View raw message