geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dblev...@apache.org
Subject svn commit: r331867 - /geronimo/gbuild/trunk/src/main/java/org/apache/geronimo/gbuild/App.java
Date Tue, 08 Nov 2005 19:21:03 GMT
Author: dblevins
Date: Tue Nov  8 11:21:00 2005
New Revision: 331867

URL: http://svn.apache.org/viewcvs?rev=331867&view=rev
Log:
Prototyping the queue consumer/producers

Modified:
    geronimo/gbuild/trunk/src/main/java/org/apache/geronimo/gbuild/App.java

Modified: geronimo/gbuild/trunk/src/main/java/org/apache/geronimo/gbuild/App.java
URL: http://svn.apache.org/viewcvs/geronimo/gbuild/trunk/src/main/java/org/apache/geronimo/gbuild/App.java?rev=331867&r1=331866&r2=331867&view=diff
==============================================================================
--- geronimo/gbuild/trunk/src/main/java/org/apache/geronimo/gbuild/App.java (original)
+++ geronimo/gbuild/trunk/src/main/java/org/apache/geronimo/gbuild/App.java Tue Nov  8 11:21:00
2005
@@ -1,8 +1,6 @@
 package org.apache.geronimo.gbuild;
 
-import org.activemq.ActiveMQConnection;
 import org.activemq.ActiveMQConnectionFactory;
-import org.activemq.util.IndentPrinter;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -11,31 +9,47 @@
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.jms.Topic;
-import java.io.IOException;
 
 /**
  * Hello world!
  */
 public class App {
 
-    public static void main(final String[] args) {
-        Runnable broker = new Runnable() {
-            public void run() {
-                org.activemq.broker.impl.Main.main(args);
-            }
-        };
-        thread(broker, true);
+    public static void main(String[] args) throws Exception {
+        thread(new HelloWorldBroker(args), true);
 
-        thread(new HelloWorldProducer(), false);
+        Thread.sleep(1000);
 
+        thread(new HelloWorldProducer(), false);
+        thread(new HelloWorldProducer(), false);
         thread(new HelloWorldConsumer(), false);
-
-        System.out.println("Hello World!");
+        Thread.sleep(1000);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldProducer(), false);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldProducer(), false);
+        Thread.sleep(1000);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldProducer(), false);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldProducer(), false);
+        thread(new HelloWorldProducer(), false);
+        Thread.sleep(1000);
+        thread(new HelloWorldProducer(), false);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldProducer(), false);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldProducer(), false);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldProducer(), false);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldConsumer(), false);
+        thread(new HelloWorldProducer(), false);
     }
 
     public static void thread(Runnable runnable, boolean daemon) {
@@ -44,65 +58,45 @@
         brokerThread.start();
     }
 
-    public static class HelloWorldProducer implements Runnable {
+    public static class HelloWorldBroker implements Runnable {
+        private final String[] args;
+
+        public HelloWorldBroker(String[] args) {
+            this.args = args;
+        }
 
         public void run() {
-            try {
-                String url = "tcp://localhost:61616";
-                String subject = "EXAMPLES";
-                String clientID = "superConsumer";
-                boolean topic = false;
-                boolean durable = false;
-                boolean transacted = false;
-                int ackMode = Session.AUTO_ACKNOWLEDGE;
-                int timeToLive = 0;
+            org.activemq.broker.impl.Main.main(args);
+        }
+    }
 
+    public static class HelloWorldProducer implements Runnable {
+        public void run() {
+            try {
                 // Create a ConnectionFactory
-                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
 
                 // Create a Connection
                 Connection connection = connectionFactory.createConnection();
-                if (durable && clientID != null) {
-                    connection.setClientID(clientID);
-                }
                 connection.start();
 
                 // Create a Session
-                Session session = connection.createSession(transacted, ackMode);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
                 // Create the destination (Topic or Queue)
-                Destination destination = null;
-                if (topic) {
-                    destination = session.createTopic(subject);
-                } else {
-                    destination = session.createQueue(subject);
-                }
+                Destination destination = session.createQueue("TEST.FOO");
 
                 // Create a MessageProducer from the Session to the Topic or Queue
                 MessageProducer producer = session.createProducer(destination);
-                if (durable) {
-                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-                } else {
-                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-                }
-                if (timeToLive != 0) {
-                    producer.setTimeToLive(timeToLive);
-                }
+                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
                 // Create a messages
-                TextMessage message = session.createTextMessage("Hello world! From: " + this.hashCode());
+                TextMessage message = session.createTextMessage("Hello world! From: " + Thread.currentThread().getName()
+ " : " + this.hashCode());
 
                 // Tell the producer to send the message
+                System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
                 producer.send(message);
 
-                // If we are running with transaction support, commit
-                if (transacted) {
-                    session.commit();
-                }
-
-                // lets dump the stats for fun, optional
-                dumpStats(connection);
-
                 // Clean up
                 session.close();
                 connection.close();
@@ -112,144 +106,52 @@
                 e.printStackTrace();
             }
         }
-
-        protected void dumpStats(Connection connection) {
-            ActiveMQConnection c = (ActiveMQConnection) connection;
-            c.getConnectionStats().dump(new IndentPrinter());
-        }
     }
 
-    public static class HelloWorldConsumer implements Runnable, MessageListener, ExceptionListener
{
-
-        String url = "tcp://localhost:61616";
-        String subject = "EXAMPLES";
-        String consumerName = "james";
-        String clientID = "superConsumer";
-        boolean topic = false;
-        boolean durable = false;
-        boolean transacted = false;
-        int ackMode = Session.AUTO_ACKNOWLEDGE;
-        int timeToLive = 0;
-
-        protected int count = 0;
-        protected int dumpCount = 10;
-        protected boolean verbose = true;
-        protected int maxiumMessages = 1;
-        private boolean running;
-        private Session session;
-        private long sleepTime = 0;
-
+    public static class HelloWorldConsumer implements Runnable, ExceptionListener {
         public void run() {
             try {
-                running = true;
 
                 // Create a ConnectionFactory
-                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
 
                 // Create a Connection
                 Connection connection = connectionFactory.createConnection();
-                if (durable && clientID != null) {
-                    connection.setClientID(clientID);
-                }
+                connection.start();
+
                 connection.setExceptionListener(this);
 
                 // Create a Session
-                session = connection.createSession(transacted, ackMode);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
                 // Create the destination (Topic or Queue)
-                Destination destination = null;
-                if (topic) {
-                    destination = session.createTopic(subject);
-                } else {
-                    destination = session.createQueue(subject);
-                }
+                Destination destination = session.createQueue("TEST.FOO");
 
-                MessageConsumer consumer = null;
-                if (durable && topic) {
-                    consumer = session.createDurableSubscriber((Topic) destination, consumerName);
-                } else {
-                    consumer = session.createConsumer(destination);
-                }
-
-                if (maxiumMessages <= 0) {
-                    consumer.setMessageListener(this);
-                }
+                // Create a MessageConsumer from the Session to the Topic or Queue
+                MessageConsumer consumer = session.createConsumer(destination);
 
-                if (maxiumMessages > 0) {
-                    consumeMessagesAndClose(connection, session, consumer);
-                }
-            }
-            catch (Exception e) {
-                System.out.println("Caught: " + e);
-                e.printStackTrace();
-            }
-        }
+                // Wait for a message
+                Message message = consumer.receive(1000);
 
-        public void onMessage(Message message) {
-            try {
                 if (message instanceof TextMessage) {
-                    TextMessage txtMsg = (TextMessage) message;
-                    if (verbose) {
-
-                        String msg = txtMsg.getText();
-                        if (msg.length() > 50) {
-                            msg = msg.substring(0, 50) + "...";
-                        }
-
-                        System.out.println("Received: " + msg);
-                    }
+                    TextMessage textMessage = (TextMessage) message;
+                    String text = textMessage.getText();
+                    System.out.println("Received: " + text);
                 } else {
-                    if (verbose) {
-                        System.out.println("Received: " + message);
-                    }
-                }
-                if (transacted) {
-                    session.commit();
-                }
-                /*
-                if (++count % dumpCount == 0) {
-                    dumpStats(connection);
+                    System.out.println("Received: " + message);
                 }
-                */
-            }
-            catch (JMSException e) {
+
+                consumer.close();
+                session.close();
+                connection.close();
+            } catch (Exception e) {
                 System.out.println("Caught: " + e);
                 e.printStackTrace();
-            } finally {
-                if (sleepTime > 0) {
-                    try {
-                        Thread.sleep(sleepTime);
-                    } catch (InterruptedException e) {
-                    }
-                }
             }
         }
 
-        synchronized public void onException(JMSException ex) {
+        public synchronized void onException(JMSException ex) {
             System.out.println("JMS Exception occured.  Shutting down client.");
-            running = false;
         }
-
-        synchronized boolean isRunning() {
-            return running;
-        }
-
-        protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer
consumer) throws JMSException, IOException {
-            System.out.println("We are about to wait until we consume: " + maxiumMessages
+ " message(s) then we will shutdown");
-
-            for (int i = 0; i < maxiumMessages && isRunning();) {
-                Message message = consumer.receive(1000);
-                if (message != null) {
-                    i++;
-                    onMessage(message);
-                }
-            }
-            System.out.println("Closing connection");
-            consumer.close();
-            session.close();
-            connection.close();
-        }
-
-
     }
 }



Mime
View raw message