activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r378483 - /incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java
Date Fri, 17 Feb 2006 11:05:14 GMT
Author: jlim
Date: Fri Feb 17 03:05:10 2006
New Revision: 378483

URL: http://svn.apache.org/viewcvs?rev=378483&view=rev
Log:
test case to check sending and receiving of messages inside a transaction (http://forums.activemq.org/posts/list/364.page)

Added:
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java

Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java?rev=378483&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java
(added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java
Fri Feb 17 03:05:10 2006
@@ -0,0 +1,180 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+
+import javax.jms.*;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.io.File;
+
+
+public final class PublishOnQueueConsumedMessageInTransactionTest extends TestCase implements
MessageListener {
+
+    private Session producerSession;
+    private Session consumerSession;
+    private Destination queue;
+    private ActiveMQConnectionFactory factory;
+    private MessageProducer producer;
+    private MessageConsumer consumer;
+    private Connection connection;
+    private ObjectMessage objectMessage = null;
+    private List messages = createConcurrentList();
+    private final Object lock = new Object();
+    private String[] data;
+    private String DATAFILE_ROOT =  "activemq-data";
+    private int messageCount = 3;
+    private String url = "vm://localhost";
+
+    // Invalid acknowledgment warning can be viewed on the console of  a remote broker
+    // The warning message is not thrown back to the client
+    //private String url = "tcp://localhost:61616";
+
+
+    protected void setUp() throws Exception {
+        File dataFile = new File(DATAFILE_ROOT);
+        recursiveDelete(dataFile);
+        try {
+            factory = new ActiveMQConnectionFactory(url);
+            connection = factory.createConnection();
+            producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            queue = new ActiveMQQueue("FOO.BAR");
+            data = new String[messageCount];
+
+            for (int i = 0; i < messageCount; i++) {
+                data[i] = "Message : " + i;
+            }
+        } catch (JMSException je) {
+            fail("Error setting up connection : " + je.toString());
+        }
+    }
+
+
+    public void testSendReceive() throws Exception {
+        sendMessage();
+
+        connection.start();
+        consumer = consumerSession.createConsumer(queue);
+        consumer.setMessageListener(this);
+        waitForMessagesToBeDelivered();
+        assertEquals("Messages received doesn't equal messages sent", messages.size(),data.length);
+
+    }
+
+
+    protected void sendMessage() throws JMSException {
+        messages.clear();
+        try {
+            for (int i = 0; i < data.length; ++i) {
+                producer = producerSession.createProducer(queue);
+                objectMessage = producerSession.createObjectMessage(data[i]);
+                producer.send(objectMessage);
+                producerSession.commit();
+                System.out.println("sending message :" + objectMessage);
+            }
+        } catch (Exception e) {
+            if (producerSession != null) {
+                producerSession.rollback();
+                System.out.println("rollback");
+                producerSession.close();
+            }
+
+            e.printStackTrace();
+        }
+    }
+
+
+    public synchronized void onMessage(Message m) {
+        try {
+            objectMessage = (ObjectMessage) m;
+            consumeMessage(objectMessage,messages);
+
+            System.out.println("consumer received message :" + objectMessage);
+            consumerSession.commit();
+
+        } catch (Exception e) {
+            try {
+                consumerSession.rollback();
+                System.out.println("rolled back transaction");
+            } catch (JMSException e1) {
+                System.out.println(e1);
+                e1.printStackTrace();
+            }
+            System.out.println(e);
+            e.printStackTrace();
+        }
+    }
+
+
+    protected void consumeMessage(Message message, List messageList) {
+        messageList.add(message);
+        if (messageList.size() >= data.length) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        }
+
+    }
+
+
+    protected List createConcurrentList() {
+        return Collections.synchronizedList(new ArrayList());
+    }
+
+
+    protected void waitForMessagesToBeDelivered() {
+        long maxWaitTime = 5000;
+        long waitTime = maxWaitTime;
+        long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+        synchronized (lock) {
+            while (messages.size() <= data.length && waitTime >= 0) {
+                try {
+                    lock.wait(200);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+
+                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+            }
+        }
+    }
+
+
+    protected static void recursiveDelete(File file) {
+        if( file.isDirectory() ) {
+            File[] files = file.listFiles();
+            for (int i = 0; i < files.length; i++) {
+                recursiveDelete(files[i]);
+            }
+        }
+        file.delete();
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+
+        super.tearDown();
+    }
+}
\ No newline at end of file



Mime
View raw message