activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From foco...@apache.org
Subject svn commit: r411085 - /incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
Date Fri, 02 Jun 2006 08:24:49 GMT
Author: foconer
Date: Fri Jun  2 01:24:47 2006
New Revision: 411085

URL: http://svn.apache.org/viewvc?rev=411085&view=rev
Log:
Added support for sync and async.

Modified:
    incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java

Modified: incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java?rev=411085&r1=411084&r2=411085&view=diff
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
(original)
+++ incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
Fri Jun  2 01:24:47 2006
@@ -29,6 +29,7 @@
     private Destination destination = null;
 
     private boolean isDurable = false;
+    private boolean isAsync = true;
 
     public JmsConsumerClient(ConnectionFactory factory) {
         this.factory = factory;
@@ -66,48 +67,53 @@
             setDestination(getDestinationName());
         }
 
-        System.out.println("Connecting to URL: " + brokerUrl);
-        System.out.println("Consuming: " + destination);
-        System.out.println("Using " + (isDurable ? "durable" : "non-durable") + " subscription");
-
-
         if (isDurable) {
             createDurableSubscriber((Topic) getDestination(), getClass().getName());
         } else {
             createMessageConsumer(getDestination());
         }
 
-        getMessageConsumer().setMessageListener(this);
-        getConnection().start();
-
-        try {
-            Thread.sleep(duration);
-        } catch (InterruptedException e) {
-            throw new JMSException("Error while consumer is sleeping " + e.getMessage());
+        if (isAsync) {
+            getMessageConsumer().setMessageListener(this);
+            getConnection().start();
+
+            try {
+                Thread.sleep(duration);
+            } catch (InterruptedException e) {
+                throw new JMSException("Error while consumer is sleeping " + e.getMessage());
+            }
+        } else {
+            getConnection().start();
+            consumeMessages(getMessageConsumer(), duration);
         }
 
-        getMessageConsumer().close();
-        getConnection().close();
-
-        System.out.println("Throughput : " + this.getThroughput());
-
+        close(); //close consumer, session, and connection.
         listener.onConfigEnd(this);
     }
 
+    //Increments throughput
     public void onMessage(Message message) {
-        try {
-            TextMessage textMessage = (TextMessage) message;
+        System.out.println(message.toString());
+        this.incThroughput();
+    }
 
-            // lets force the content to be deserialized
-            String text = textMessage.getText();
-            System.out.println("message: " + text + ":" + this.getThroughput());
-            this.incThroughput();
-        } catch (JMSException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+    protected void consumeMessages(MessageConsumer consumer, long duration) throws JMSException
{
+
+        long currentTime = System.currentTimeMillis();
+        long endTime = currentTime + duration;
+
+        while (System.currentTimeMillis() <= endTime) {
+            Message message = consumer.receive();
+            onMessage(message);
         }
     }
 
+    protected void close() throws JMSException {
+        getMessageConsumer().close();
+        getSession().close();
+        getConnection().close();
+    }
+
     public static void main(String[] args) throws Exception {
         JmsConsumerClient cons = new JmsConsumerClient("org.apache.activemq.ActiveMQConnectionFactory",
"tcp://localhost:61616", "topic://TEST.FOO");
         cons.setPerfEventListener(new PerfEventAdapter());
@@ -115,6 +121,22 @@
     }
 
     // Helper Methods
+
+    public boolean isDurable() {
+        return isDurable;
+    }
+
+    public void setDurable(boolean durable) {
+        isDurable = durable;
+    }
+
+    public boolean isAsync() {
+        return isAsync;
+    }
+
+    public void setAsync(boolean async) {
+        isAsync = async;
+    }
 
     public String getDestinationName() {
         return this.destinationName;



Mime
View raw message