activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r929490 - in /activemq/activemq-systest/trunk/src/test: java/org/apache/activemq/activemq/ java/org/apache/activemq/systest/ resources/
Date Wed, 31 Mar 2010 12:08:39 GMT
Author: dejanb
Date: Wed Mar 31 12:08:39 2010
New Revision: 929490

URL: http://svn.apache.org/viewvc?rev=929490&view=rev
Log:
fixing package names

Added:
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
    activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml
Removed:
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/
Modified:
    activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml

Added: activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
URL: http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java?rev=929490&view=auto
==============================================================================
--- activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
(added)
+++ activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.systest;
+
+
+import java.util.Random;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.connection.SingleConnectionFactory;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class ConsumerThread extends Thread {	
+	private DefaultMessageListenerContainer container;
+	private MessageDrivenPojo messageListener;	
+	private boolean run;
+	private String destination;
+	private ConnectionFactory connectionFactory;
+	private boolean durable;			
+	private int concurrentConsumers;
+	private boolean sessionTransacted;
+	private boolean pubSubDomain;
+	private boolean running;
+	private Log log = LogFactory.getLog(ConsumerThread.class);
+	private int numberOfQueues;
+	private String consumerName;
+	
+	@Override
+	public void run() {		
+		run = true;
+		createContainer();		
+		container.initialize();				
+		container.start();
+		
+		running = true;
+		
+		while (run) {
+			try {
+				Thread.sleep(100);
+			} catch (InterruptedException e) {
+				e.printStackTrace();			
+			}			
+		}
+		
+		container.stop();
+		container.destroy();
+		
+		if (connectionFactory instanceof SingleConnectionFactory) {
+			((SingleConnectionFactory)connectionFactory).destroy();
+		}
+		
+		log.info("ConsumerThread closing down");
+	}
+
+	private DefaultMessageListenerContainer createContainer() {
+		Random generator = new Random(consumerName.hashCode());
+		int queueSuffix = generator.nextInt(numberOfQueues);
+		
+		
+		container = new DefaultMessageListenerContainer();				
+		container.setPubSubDomain(pubSubDomain);
+		container.setDestinationName(destination + queueSuffix);				
+		container.setMessageListener(messageListener);
+		container.setConnectionFactory(connectionFactory);	
+		container.setConcurrentConsumers(concurrentConsumers);
+		container.setSessionTransacted(sessionTransacted);
+
+		container.afterPropertiesSet();
+		log.info("subscribing to " + destination + queueSuffix);
+		return container;
+	}
+		
+	/**
+	 * @param messageListener the messageListener to set
+	 */
+	public void setMessageDrivenPojo(MessageDrivenPojo messageListener) {
+		this.messageListener = messageListener;
+	}
+
+	/**
+	 * @param run the run to set
+	 */
+	public void setRun(boolean run) {
+		this.run = run;
+	}
+
+	/**
+	 * @param destination the destination to set
+	 */
+	public void setDestination(String destination) {
+		this.destination = destination;
+	}
+	
+	public void setNumberOfQueues(int no) {
+		this.numberOfQueues = no;
+	}
+	
+	public int getNumberOfQueues() {
+		return this.numberOfQueues;
+	}
+	
+
+	public void setConsumerName(String name) {
+		this.consumerName = name;
+	}
+
+	/**
+	 * @param connectionFactory the connectionFactory to set
+	 */
+	public void setConnectionFactory(ConnectionFactory connectionFactory) {
+		this.connectionFactory = connectionFactory;
+	}
+
+	/**
+	 * @param durable the durable to set
+	 */
+	public void setDurable(boolean durable) {
+		this.durable = durable;
+	}
+
+	/**
+	 * @param concurrentConsumers the concurrentConsumers to set
+	 */
+	public void setConcurrentConsumers(int concurrentConsumers) {
+		this.concurrentConsumers = concurrentConsumers;
+	}
+
+	/**
+	 * @param sessionTransacted the sessionTransacted to set
+	 */
+	public void setSessionTransacted(boolean sessionTransacted) {
+		this.sessionTransacted = sessionTransacted;
+	}
+
+	/**
+	 * @param pubSubDomain the pubSubDomain to set
+	 */
+	public void setPubSubDomain(boolean pubSubDomain) {
+		this.pubSubDomain = pubSubDomain;
+	}
+	
+	/**
+	 * @return the messageListener
+	 */
+	public MessageDrivenPojo getMessageDrivenPojo() {
+		return messageListener;
+	}	
+	
+	public boolean isRunning() {
+		return running;
+	}
+}

Added: activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java?rev=929490&view=auto
==============================================================================
--- activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
(added)
+++ activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.systest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.ConnectionFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class JDBCSpringTest extends TestCase {	
+	
+	private static Log log = LogFactory.getLog(JDBCSpringTest.class);
+	
+      int numberOfConsumerThreads = 20;
+      int numberOfProducerThreads = 20;
+      int numberOfMessages = 50;
+      int numberOfQueues = 5;
+      String url = "tcp://localhost:61616";
+      String config = "xbean:activemq-spring-jdbc.xml";
+	
+      BrokerService broker;
+      
+    public void setUp() throws Exception {
+    	broker = BrokerFactory.createBroker(config);
+    	broker.start();
+    	broker.waitUntilStarted();
+    }
+    
+    
+	protected void tearDown() throws Exception {
+		broker.stop();
+		broker.waitUntilStopped();
+	}
+
+
+	public void testJDBCSpringTest() throws Exception {
+		log.info("Using " + numberOfConsumerThreads + " consumers, " + 
+				numberOfProducerThreads + " producers, " + 
+				numberOfMessages + " messages per publisher, and " +
+				numberOfQueues + " queues.");
+		
+		ConnectionFactory connectionFactory;
+
+		ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+		prefetch.setQueuePrefetch(1);
+		ActiveMQConnectionFactory	amq = new ActiveMQConnectionFactory(url);
+		amq.setPrefetchPolicy(prefetch);
+	  
+		connectionFactory = new PooledConnectionFactory(amq);
+		((PooledConnectionFactory)connectionFactory).setMaxConnections(5);
+
+
+		StringBuffer buffer = new StringBuffer();		
+		for (int i=0; i<2048; i++) {
+			buffer.append(".");
+		}		
+		String twoKbMessage = buffer.toString();
+		
+		List<ProducerThread> ProducerThreads = new ArrayList<ProducerThread>();
+		for (int i=0; i<numberOfProducerThreads; i++) {
+			ProducerThread thread = new ProducerThread();
+			thread.setMessage(twoKbMessage);
+			thread.setNumberOfMessagesToSend(numberOfMessages);
+			thread.setNumberOfQueues(numberOfQueues);
+			thread.setQueuePrefix("AMQ-2436.queue.");
+			thread.setConnectionFactory(connectionFactory);
+			//thread.setSendDelay(100);
+			ProducerThreads.add(thread);
+		}
+		
+		List<Thread> ConsumerThreads = new ArrayList<Thread>();
+		for (int i=0; i<numberOfConsumerThreads; i++) {
+			ConsumerThread thread = new ConsumerThread();
+			MessageDrivenPojo mdp1 = new MessageDrivenPojo();
+			thread.setMessageDrivenPojo(mdp1);
+			thread.setConcurrentConsumers(1);
+			thread.setConnectionFactory(connectionFactory);
+			thread.setDestination("AMQ-2436.queue.");
+			thread.setPubSubDomain(false);
+			thread.setSessionTransacted(true);
+			thread.setNumberOfQueues(numberOfQueues);
+			thread.setConsumerName("consumer" + i);
+			ConsumerThreads.add(thread);
+			thread.start();
+		}
+		
+		
+		for (ProducerThread thread : ProducerThreads) {
+			thread.start();
+		}
+		
+		boolean finished = false;	
+		int previous = 0;
+		while (!finished) {
+                    
+			int totalMessages = 0;	
+			for (Thread thread : ConsumerThreads) {
+				totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
+			}
+			log.info(totalMessages + " received so far...");
+			if (totalMessages != 0 && previous == totalMessages) {
+				for (Thread thread : ConsumerThreads) {
+					((ConsumerThread)thread).setRun(false);
+				}
+				fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
+			}
+			previous = totalMessages;
+			
+			if (totalMessages >= (numberOfMessages * numberOfProducerThreads)) {
+				finished = true;
+				log.info("Received all " + totalMessages + " messages. Finishing.");
+				
+				for (Thread thread : ConsumerThreads) {
+					((ConsumerThread)thread).setRun(false);
+				}
+				for (Thread thread : ConsumerThreads) {
+					thread.join();
+				}
+
+			} else {
+				Thread.sleep(1000);
+			}
+		}
+	}	
+    
+}

Added: activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java?rev=929490&view=auto
==============================================================================
--- activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
(added)
+++ activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.systest;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class MessageDrivenPojo implements MessageListener, Serializable {
+	private Log log = LogFactory.getLog(MessageDrivenPojo.class);
+	private AtomicInteger messageCount = new AtomicInteger();	
+
+	/*
+	 * (non-Javadoc)
+	 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+	 */
+	public void onMessage(Message message) {				
+		messageCount.incrementAndGet();
+		
+		if (log.isDebugEnabled()) {
+			try {
+				logMessage(message);
+			} catch (Exception e) {
+				log.error("Error:", e);
+			}
+		}
+		
+		try {
+			Thread.sleep(200);
+		} catch (InterruptedException ex ) {
+			log.error(ex);
+		}
+	}
+	
+	private void logMessage(Message message) throws Exception {
+		StringBuffer buffer = new StringBuffer();
+		buffer.append("\nJMSMessageID:");
+		buffer.append(message.getJMSMessageID());
+		buffer.append("\nJMSCorrelationID:");
+		buffer.append(message.getJMSMessageID());
+		buffer.append("\nMessage Contents:\n");
+		
+		if (message instanceof TextMessage) { 
+			buffer.append(((TextMessage)message).getText());
+		} else {
+			buffer.append(message.toString());
+		}
+		
+		log.debug(buffer.toString());
+	}
+
+	/**
+	 * @return the stats
+	 */
+	protected int getMessageCount() {
+		return messageCount.get();
+	}		
+}

Added: activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java?rev=929490&view=auto
==============================================================================
--- activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
(added)
+++ activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,13 @@
+package org.apache.activemq.systest;
+
+public class NIOSpringTest extends JDBCSpringTest {
+	
+    public void setUp() throws Exception {
+        url = "nio://localhost:61616";
+        config = "xbean:activemq-spring-nio.xml";
+		super.setUp();
+	}
+    
+    
+	
+}

Added: activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
URL: http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java?rev=929490&view=auto
==============================================================================
--- activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
(added)
+++ activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.systest;
+
+import java.util.Random;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ProducerThread extends Thread {	
+	private JmsTemplate jmsTemplate;
+	private int numberOfTopics;
+	private int numberOfMessagesToSend;	
+	private int messagesSent;
+	private Random generator;
+	private String queuePrefix;
+	private ConnectionFactory connectionFactory;
+	private String message;	
+	private MessageCreator messageCreator;
+	private int sendDelay;
+	private Log log = LogFactory.getLog(ProducerThread.class);
+	
+	@Override
+	public void run() {
+		initialize();
+		Random generator = new Random(Thread.currentThread().getName().hashCode());
+		
+		while (messagesSent < numberOfMessagesToSend) {
+			int queueSuffix = generator.nextInt(numberOfTopics);			
+			jmsTemplate.send(queuePrefix + queueSuffix, messageCreator);			
+			messagesSent++;
+			log.debug(Thread.currentThread().getName() + 
+					": sent msg #" + messagesSent);
+			try {
+				Thread.sleep(sendDelay);
+			} catch (InterruptedException e) {				
+				e.printStackTrace();
+			}
+		}
+		
+		log.info("ProducerThread shutting down.");
+	}
+	
+	private void initialize() {
+		jmsTemplate = new JmsTemplate();
+		jmsTemplate.setPubSubDomain(false);
+		jmsTemplate.setConnectionFactory(connectionFactory);
+		
+		messageCreator = new MessageCreator() {
+			public Message createMessage(Session session) throws JMSException {
+				return session.createTextMessage(message);
+			}			
+		};
+	}
+
+	/**
+	 * @param numberOfTopics the numberOfTopics to set
+	 */
+	protected void setNumberOfQueues(int numberOfTopics) {
+		this.numberOfTopics = numberOfTopics;
+	}
+	/**
+	 * @param queuePrefix the queuePrefix to set
+	 */
+	protected void setQueuePrefix(String queuePrefix) {
+		this.queuePrefix = queuePrefix;
+	}
+	/**
+	 * @param connectionFactory the connectionFactory to set
+	 */
+	protected void setConnectionFactory(ConnectionFactory connectionFactory) {
+		this.connectionFactory = connectionFactory;
+	}
+	/**
+	 * @param message the message to set
+	 */
+	protected void setMessage(String message) {
+		this.message = message;
+	}
+
+	/**
+	 * @param numberOfMessagesToSend the numberOfMessagesToSend to set
+	 */
+	protected void setNumberOfMessagesToSend(int numberOfMessagesToSend) {
+		this.numberOfMessagesToSend = numberOfMessagesToSend;
+	}
+	
+	public void setSendDelay(int sendDelay) {
+		this.sendDelay = sendDelay;
+	}
+	
+	public int getMessagesSent() {
+		return messagesSent;
+	}
+}

Added: activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java?rev=929490&view=auto
==============================================================================
--- activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
(added)
+++ activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.systest;
+
+import java.net.Socket;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import static java.lang.String.*;
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * 
+ * Simulates load on the Stomp connector. All producers/consumers open/close a
+ * connection on every command Configurable number of producers/consumers, their
+ * speed and duration of test
+ * 
+ * Start a broker with the desired configuration to test and then run this test
+ * 
+ */
+public class StompLoadTest extends TestCase {
+
+    private static final Log LOG = LogFactory.getLog(StompLoadTest.class);
+
+    final int producerSleep = 10;
+    final int consumerSleep = 10;
+    final int msgCount = 10000;
+    final int producerCount = 10;
+    final int consumerCount = 10;
+    final int testTime = 30 * 60 * 1000;
+    final int sampleInterval = 5 * 1000;
+    final String bindAddress = "stomp://0.0.0.0:61613";
+
+    AtomicLong producerCounter = new AtomicLong();
+    AtomicLong consumerCounter = new AtomicLong();
+    
+    public void testLoad() throws Exception {
+
+        for (int i = 0; i < producerCount; i++) {
+            ProducerThread producerThread = new ProducerThread("producer" + i);
+            producerThread.start();
+        }
+
+        for (int i = 0; i < consumerCount; i++) {
+            Thread consumerThread = new ConsumerThread("consumer" + i);
+            consumerThread.start();
+        }
+
+        int samples = testTime/sampleInterval;
+        long start = System.nanoTime();
+        for( int i=0; i < samples; i++ ) {
+            Thread.sleep(sampleInterval);
+            long end = System.nanoTime();
+            printRate("Producer", producerCounter, end-start);
+            printRate("Consumer", consumerCounter, end-start);
+            start = end;
+        }
+    }
+
+    static final long NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS);
+    
+    private void printRate(String name, AtomicLong counter, long nanos) {
+        long c = counter.getAndSet(0);
+        float rate_per_second = ((1.0f*c/nanos)*NANOS_PER_SECOND);
+        LOG.info(format("%s rate: %,.3f per second", name, rate_per_second));
+    }
+
+    public void connect(StompConnection conn) throws Exception {
+        URI connectUri = new URI(bindAddress);
+        conn.open(new Socket(connectUri.getHost(), connectUri.getPort()));
+        conn.connect("", "");
+    }
+
+    class ProducerThread extends Thread {
+
+        String name;
+
+        public ProducerThread(String name) {
+            this.name = name;
+        }
+
+        public void run() {
+            for (int i = 0; i < msgCount; i++) {
+                StompConnection conn = new StompConnection();
+                try {
+                    connect(conn);
+                    String msg = "Message #" + i+" from "+name;
+                    conn.send("/queue/test", msg);
+                    producerCounter.incrementAndGet();
+                    Thread.sleep(producerSleep);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    try {
+                        conn.disconnect();
+                    } catch (Exception ignore) {
+                    }
+                }
+            }
+        }
+    }
+
+    class ConsumerThread extends Thread {
+
+        String name;
+
+        public ConsumerThread(String name) {
+            this.name = name;
+        }
+
+        public void run() {
+            for (int i = 0; i < msgCount; i++) {
+                StompConnection conn = new StompConnection();
+                try {
+                    connect(conn);
+                    HashMap<String, String> headers = new HashMap<String, String>();
+                    headers.put("activemq.prefetchSize", "1");
+                    conn.subscribe("/queue/test", "client", headers);
+                    StompFrame frame = conn.receive(1*1000);
+                    conn.ack(frame);
+                    consumerCounter.incrementAndGet();
+                    Thread.sleep(consumerSleep);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    try {
+                        conn.disconnect();
+                    } catch (Exception ignore) {
+                    }
+                    try {
+                        conn.close();
+                    } catch (Exception ignore) {
+                    }
+                }
+            }
+        }
+    }
+
+}

Added: activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml?rev=929490&view=auto
==============================================================================
--- activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml (added)
+++ activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml Wed Mar 31
12:08:39 2010
@@ -0,0 +1,54 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data">
+
+        <managementContext>
+            <managementContext createConnector="false"/>
+        </managementContext>
+
+        <persistenceAdapter>
+            <kahaDB directory="target/kahadb"/>
+        </persistenceAdapter>
+              
+        <destinationPolicy>
+            <policyMap>
+              <policyEntries>
+                <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
+                  <pendingSubscriberPolicy>
+                    <vmCursor />
+                  </pendingSubscriberPolicy>
+                </policyEntry>
+                <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
+                </policyEntry>
+              </policyEntries>
+            </policyMap>
+        </destinationPolicy> 
+        
+        <transportConnectors>
+            <transportConnector name="nio" uri="nio://0.0.0.0:61616"/>
+        </transportConnectors>
+
+    </broker>
+    
+</beans>

Modified: activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml?rev=929490&r1=929489&r2=929490&view=diff
==============================================================================
--- activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml (original)
+++ activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml Wed Mar 31 12:08:39
2010
@@ -45,7 +45,7 @@
 
 	<bean
 		id="testMDB"
-		class="org.apache.activemq.activemq.systest.camel.TestMDB"
+		class="org.apache.activemq.systest.camel.TestMDB"
 		init-method="init"
 		destroy-method="destroy">
 	</bean>



Mime
View raw message