activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r509566 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
Date Tue, 20 Feb 2007 14:01:55 GMT
Author: rajdavies
Date: Tue Feb 20 06:01:55 2007
New Revision: 509566

URL: http://svn.apache.org/viewvc?view=rev&rev=509566
Log:
set the prefetch to something sensible (this is actually a key bit!)

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java?view=diff&rev=509566&r1=509565&r2=509566
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
Tue Feb 20 06:01:55 2007
@@ -1,24 +1,21 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 
 package org.apache.activemq.bugs;
 
 import java.io.File;
+import java.util.Properties;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -37,164 +34,138 @@
 /**
  * @version $Revision: 1.5 $
  */
-public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest {
-	private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
-			.getLog(JmsDurableTopicSlowReceiveTest.class);
+public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest{
 
-	protected Connection connection2;
-
-	protected Session session2;
-
-	protected Session consumeSession2;
-
-	protected MessageConsumer consumer2;
-
-	protected MessageProducer producer2;
-
-	protected Destination consumerDestination2;
-
-
-	final int NMSG = 100;
-
-	final int MSIZE = 256000;
-    
-
-	private Connection connection3;
-
-	private Session consumeSession3;
+    private static final org.apache.commons.logging.Log log=org.apache.commons.logging.LogFactory
+            .getLog(JmsDurableTopicSlowReceiveTest.class);
+    protected Connection connection2;
+    protected Session session2;
+    protected Session consumeSession2;
+    protected MessageConsumer consumer2;
+    protected MessageProducer producer2;
+    protected Destination consumerDestination2;
+    BrokerService broker;
+    final int NMSG=100;
+    final int MSIZE=256000;
+    private Connection connection3;
+    private Session consumeSession3;
+    private TopicSubscriber consumer3;
+
+    /**
+     * Set up a durable suscriber test.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception{
+        this.durable=true;
+        broker=createBroker();
+        super.setUp();
+    }
 
-	private TopicSubscriber consumer3;
+    protected void tearDown() throws Exception{
+        super.tearDown();
+        broker.stop();
+    }
 
-	/**
-	 * Set up a durable suscriber test.
-	 * 
-	 * @see junit.framework.TestCase#setUp()
-	 */
-	protected void setUp() throws Exception {
-		this.durable = true;
-        createBroker();
-		super.setUp();
-	}
-    
-    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-        ActiveMQConnectionFactory result =  new ActiveMQConnectionFactory("vm://localhost");
-        
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
+        ActiveMQConnectionFactory result=new ActiveMQConnectionFactory("vm://localhost");
+        Properties props=new Properties();
+        props.put("prefetchPolicy.durableTopicPrefetch","5");
+        props.put("prefetchPolicy.optimizeDurableTopicPrefetch","5");
+        result.setProperties(props);
         return result;
     }
-    
+
     protected BrokerService createBroker() throws Exception{
         BrokerService answer=new BrokerService();
         configureBroker(answer);
         answer.start();
         return answer;
     }
-    
+
     protected void configureBroker(BrokerService answer) throws Exception{
-        //KahaPersistenceAdapter adapter = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
-        //answer.setPersistenceAdapter(adapter);
+        //KahaPersistenceAdapter adapter=new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
         //JDBCPersistenceAdapter adapter = new JDBCPersistenceAdapter();
-        //answer.setPersistenceAdapter(adapter);
+       // answer.setPersistenceAdapter(adapter);
         answer.setDeleteAllMessagesOnStartup(true);
     }
-    
-   
-	/**
-	 * Test if all the messages sent are being received.
-	 * 
-	 * @throws Exception
-	 */
-	public void testSlowReceiver() throws Exception {
-		connection2 = createConnection();
-		connection2.setClientID("test");
-		connection2.start();
-
-		consumeSession2 = connection2.createSession(false,
-				Session.AUTO_ACKNOWLEDGE);
-		session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-		consumerDestination2 = session2.createTopic(getConsumerSubject() + "2");
-		consumer2 = consumeSession2.createDurableSubscriber(
-				(Topic) consumerDestination2, getName());
-		Thread.sleep(1000);
-		consumer2.close();
-		connection2.close();
-
-		new Thread(new Runnable() {
-			public void run() {
-				try {
-					for (int loop = 0; loop < 4; loop++) {
-						connection2 = createConnection();
-						connection2.start();
-						session2 = connection2.createSession(false,
-								Session.AUTO_ACKNOWLEDGE);
-						producer2 = session2.createProducer(null);
-						producer2.setDeliveryMode(deliveryMode);
-						
-						Thread.sleep(1000);
-
-						for (int i = 0; i < NMSG / 4; i++) {
-							BytesMessage message = session2
-									.createBytesMessage();
-							message.writeBytes(new byte[MSIZE]);
-							message.setStringProperty("test", "test");
-							message.setJMSType("test");
-							producer2.send(consumerDestination2, message);
-							Thread.sleep(50);
-							System.err.println("Sent(" + loop +"): " + i);
-                            
-						}
-						producer2.close();
-						connection2.stop();
-						connection2.close();
-                        
 
-					}
-				} catch (Throwable e) {
+    /**
+     * Test if all the messages sent are being received.
+     * 
+     * @throws Exception
+     */
+    public void testSlowReceiver() throws Exception{
+        connection2=createConnection();
+        connection2.setClientID("test");
+        connection2.start();
+        consumeSession2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        session2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        consumerDestination2=session2.createTopic(getConsumerSubject()+"2");
+        consumer2=consumeSession2.createDurableSubscriber((Topic)consumerDestination2,getName());
+        Thread.sleep(1000);
+        consumer2.close();
+        connection2.close();
+        new Thread(new Runnable(){
+
+            public void run(){
+                try{
+                    for(int loop=0;loop<4;loop++){
+                        connection2=createConnection();
+                        connection2.start();
+                        session2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE);
+                        producer2=session2.createProducer(null);
+                        producer2.setDeliveryMode(deliveryMode);
+                        Thread.sleep(1000);
+                        for(int i=0;i<NMSG/4;i++){
+                            BytesMessage message=session2.createBytesMessage();
+                            message.writeBytes(new byte[MSIZE]);
+                            message.setStringProperty("test","test");
+                            message.setJMSType("test");
+                            producer2.send(consumerDestination2,message);
+                            Thread.sleep(50);
+                            System.err.println("Sent("+loop+"): "+i);
+                        }
+                        producer2.close();
+                        connection2.stop();
+                        connection2.close();
+                    }
+                }catch(Throwable e){
                     e.printStackTrace();
-				}
-			}
-		},"SENDER Thread").start();
-        connection3 = createConnection();
+                }
+            }
+        },"SENDER Thread").start();
+        connection3=createConnection();
         connection3.setClientID("test");
         connection3.start();
-        consumeSession3 = connection3.createSession(false,
-                Session.CLIENT_ACKNOWLEDGE);
-        consumer3 = consumeSession3.createDurableSubscriber(
-                (Topic) consumerDestination2, getName());
+        consumeSession3=connection3.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+        consumer3=consumeSession3.createDurableSubscriber((Topic)consumerDestination2,getName());
         connection3.close();
-
-       
-		for (int loop= 0; loop < 4; ++loop) {
-			
-			connection3 = createConnection();
-			connection3.setClientID("test");
-			connection3.start();
-			consumeSession3 = connection3.createSession(false,
-					Session.CLIENT_ACKNOWLEDGE);
-			consumer3 = consumeSession3.createDurableSubscriber(
-					(Topic) consumerDestination2, getName());
-			Message msg = null;
-           
-			int i;
-			for (i = 0; i < NMSG / 4; i++) {
-				// System.err.println("Receive...");
-				msg = consumer3.receive(Integer.MAX_VALUE);
-				if (msg == null)
-					break;
-				System.err.println("Received(" + loop + "): " + i );
-				Thread.sleep(500);
-				msg.acknowledge();
-			}
-			consumer3.close();
-			assertEquals("Receiver " + loop , NMSG/4,i);
-			assertNotNull(msg);
-			// assertEquals(((BytesMessage) msg).getText(), "test");
-			assertEquals(msg.getJMSType(), "test");
-			assertEquals(msg.getStringProperty("test"), "test");
-
-			//connection3.stop();
-			connection3.close();
-
-		}
-	}
-
+        for(int loop=0;loop<4;++loop){
+            connection3=createConnection();
+            connection3.setClientID("test");
+            connection3.start();
+            consumeSession3=connection3.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+            consumer3=consumeSession3.createDurableSubscriber((Topic)consumerDestination2,getName());
+            Message msg=null;
+            int i;
+            for(i=0;i<NMSG/4;i++){
+                // System.err.println("Receive...");
+                msg=consumer3.receive(10000);
+                if(msg==null)
+                    break;
+                System.err.println("Received("+loop+"): "+i);
+                Thread.sleep(500);
+                msg.acknowledge();
+            }
+            consumer3.close();
+            assertEquals("Receiver "+loop,NMSG/4,i);
+            assertNotNull(msg);
+            // assertEquals(((BytesMessage) msg).getText(), "test");
+            assertEquals(msg.getJMSType(),"test");
+            assertEquals(msg.getStringProperty("test"),"test");
+            // connection3.stop();
+            connection3.close();
+        }
+    }
 }



Mime
View raw message