activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r509534 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
Date Tue, 20 Feb 2007 12:31:16 GMT
Author: rajdavies
Date: Tue Feb 20 04:31:15 2007
New Revision: 509534

URL: http://svn.apache.org/viewvc?view=rev&rev=509534
Log:
Add test case for slow durable consumer

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
  (with props)

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java?view=auto&rev=509534
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
Tue Feb 20 04:31:15 2007
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+
+/**
+ * @version $Revision: 1.5 $
+ */
+public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest {
+	private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+			.getLog(JmsDurableTopicSlowReceiveTest.class);
+
+	protected Connection connection2;
+
+	protected Session session2;
+
+	protected Session consumeSession2;
+
+	protected MessageConsumer consumer2;
+
+	protected MessageProducer producer2;
+
+	protected Destination consumerDestination2;
+
+
+	final int NMSG = 100;
+
+	final int MSIZE = 256000;
+    
+
+	private Connection connection3;
+
+	private Session consumeSession3;
+
+	private TopicSubscriber consumer3;
+
+	/**
+	 * Set up a durable suscriber test.
+	 * 
+	 * @see junit.framework.TestCase#setUp()
+	 */
+	protected void setUp() throws Exception {
+		this.durable = true;
+        createBroker();
+		super.setUp();
+	}
+    
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory result =  new ActiveMQConnectionFactory("vm://localhost");
+        
+        return result;
+    }
+    
+    protected BrokerService createBroker() throws Exception{
+        BrokerService answer=new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+    
+    protected void configureBroker(BrokerService answer) throws Exception{
+        //KahaPersistenceAdapter adapter = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
+        //answer.setPersistenceAdapter(adapter);
+        //JDBCPersistenceAdapter adapter = new JDBCPersistenceAdapter();
+        //answer.setPersistenceAdapter(adapter);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+    
+   
+	/**
+	 * Test if all the messages sent are being received.
+	 * 
+	 * @throws Exception
+	 */
+	public void testSlowReceiver() throws Exception {
+		connection2 = createConnection();
+		connection2.setClientID("test");
+		connection2.start();
+
+		consumeSession2 = connection2.createSession(false,
+				Session.AUTO_ACKNOWLEDGE);
+		session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		consumerDestination2 = session2.createTopic(getConsumerSubject() + "2");
+		consumer2 = consumeSession2.createDurableSubscriber(
+				(Topic) consumerDestination2, getName());
+		Thread.sleep(1000);
+		consumer2.close();
+		connection2.close();
+
+		new Thread(new Runnable() {
+			public void run() {
+				try {
+					for (int loop = 0; loop < 4; loop++) {
+						connection2 = createConnection();
+						connection2.start();
+						session2 = connection2.createSession(false,
+								Session.AUTO_ACKNOWLEDGE);
+						producer2 = session2.createProducer(null);
+						producer2.setDeliveryMode(deliveryMode);
+						
+						Thread.sleep(1000);
+
+						for (int i = 0; i < NMSG / 4; i++) {
+							BytesMessage message = session2
+									.createBytesMessage();
+							message.writeBytes(new byte[MSIZE]);
+							message.setStringProperty("test", "test");
+							message.setJMSType("test");
+							producer2.send(consumerDestination2, message);
+							Thread.sleep(50);
+							System.err.println("Sent(" + loop +"): " + i);
+                            
+						}
+						producer2.close();
+						connection2.stop();
+						connection2.close();
+                        
+
+					}
+				} catch (Throwable e) {
+                    e.printStackTrace();
+				}
+			}
+		},"SENDER Thread").start();
+        connection3 = createConnection();
+        connection3.setClientID("test");
+        connection3.start();
+        consumeSession3 = connection3.createSession(false,
+                Session.CLIENT_ACKNOWLEDGE);
+        consumer3 = consumeSession3.createDurableSubscriber(
+                (Topic) consumerDestination2, getName());
+        connection3.close();
+
+       
+		for (int loop= 0; loop < 4; ++loop) {
+			
+			connection3 = createConnection();
+			connection3.setClientID("test");
+			connection3.start();
+			consumeSession3 = connection3.createSession(false,
+					Session.CLIENT_ACKNOWLEDGE);
+			consumer3 = consumeSession3.createDurableSubscriber(
+					(Topic) consumerDestination2, getName());
+			Message msg = null;
+           
+			int i;
+			for (i = 0; i < NMSG / 4; i++) {
+				// System.err.println("Receive...");
+				msg = consumer3.receive(Integer.MAX_VALUE);
+				if (msg == null)
+					break;
+				System.err.println("Received(" + loop + "): " + i );
+				Thread.sleep(500);
+				msg.acknowledge();
+			}
+			consumer3.close();
+			assertEquals("Receiver " + loop , NMSG/4,i);
+			assertNotNull(msg);
+			// assertEquals(((BytesMessage) msg).getText(), "test");
+			assertEquals(msg.getJMSType(), "test");
+			assertEquals(msg.getStringProperty("test"), "test");
+
+			//connection3.stop();
+			connection3.close();
+
+		}
+	}
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message