activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r475210 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/AdvisorySupport.java broker/util/CommandAgent.java broker/util/CommandMessageListener.java
Date Wed, 15 Nov 2006 12:02:35 GMT
Author: jstrachan
Date: Wed Nov 15 04:02:34 2006
New Revision: 475210

URL: http://svn.apache.org/viewvc?view=rev&rev=475210
Log:
added a simple command agent that can be used to send messages to a destination (ActiveMQ.Agent)
to interact with the broker via the activemq-console commands (list, query, browse etc)

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?view=diff&rev=475210&r1=475209&r2=475210
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Wed Nov 15 04:02:34 2006
@@ -40,10 +40,11 @@
     public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Expired.Queue.";
     public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Topic.";
     public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Queue.";
-    public static final String AGENT_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Agent.";
+    public static final String AGENT_TOPIC = "ActiveMQ.Agent";
 
     public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
     public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC+","+TEMP_TOPIC_ADVISORY_TOPIC);
+    private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
 
     public static ActiveMQTopic getConnectionAdvisoryTopic() {
         return CONNECTION_ADVISORY_TOPIC;
@@ -171,7 +172,10 @@
         }
     }
 
-    public static Destination getAgentDestination(String brokerName) {
-        return new ActiveMQTopic(AGENT_TOPIC_PREFIX + brokerName);
+    /**
+     * Returns the agent topic which is used to send commands to the broker
+     */
+    public static Destination getAgentDestination() {
+        return AGENT_TOPIC_DESTINATION;
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java?view=auto&rev=475210
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
Wed Nov 15 04:02:34 2006
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.Service;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.FactoryBean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+/**
+ * An agent which listens to commands on a JMS destination
+ *
+ * @version $Revision$
+ * @org.apache.xbean.XBean
+ */
+public class CommandAgent implements Service, InitializingBean, DisposableBean, FactoryBean
{
+    private static final Log log = LogFactory.getLog(CommandAgent.class);
+
+    private String brokerUrl = "vm://localhost";
+    private ConnectionFactory connectionFactory;
+    private Connection connection;
+    private Destination commandDestination;
+    private CommandMessageListener listener;
+    private Session session;
+    private MessageConsumer consumer;
+
+
+    public void start() throws Exception {
+        session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+        listener = new CommandMessageListener(session);
+        Destination destination = getCommandDestination();
+        if (log.isDebugEnabled()) {
+            log.debug("Agent subscribing to control destination: " + destination);
+        }
+        consumer = session.createConsumer(destination);
+        consumer.setMessageListener(listener);
+    }
+
+    public void stop() throws Exception {
+        ServiceStopper stopper = new ServiceStopper();
+        if (consumer != null) {
+            try {
+                consumer.close();
+                consumer = null;
+            }
+            catch (JMSException e) {
+                stopper.onException(this, e);
+            }
+        }
+        if (session != null) {
+            try {
+                session.close();
+                session = null;
+            }
+            catch (JMSException e) {
+                stopper.onException(this, e);
+            }
+        }
+        if (connection != null) {
+            try {
+                connection.close();
+                connection = null;
+            }
+            catch (JMSException e) {
+                stopper.onException(this, e);
+            }
+        }
+        stopper.throwFirstException();
+    }
+
+    // the following methods ensure that we are created on startup and the lifecycles respected
+    // TODO there must be a simpler way?
+    public void afterPropertiesSet() throws Exception {
+        start();
+    }
+
+    public void destroy() throws Exception {
+        stop();
+    }
+
+    public Object getObject() throws Exception {
+        return this;
+    }
+
+    public Class getObjectType() {
+        return getClass();
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public void setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        if (connectionFactory == null) {
+            connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+        }
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public Connection getConnection() throws JMSException {
+        if (connection == null) {
+            connection = createConnection();
+            connection.start();
+        }
+        return connection;
+    }
+
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    public Destination getCommandDestination() {
+        if (commandDestination == null) {
+            commandDestination = createCommandDestination();
+        }
+        return commandDestination;
+    }
+
+    public void setCommandDestination(Destination commandDestination) {
+        this.commandDestination = commandDestination;
+    }
+
+    protected Connection createConnection() throws JMSException {
+        return getConnectionFactory().createConnection();
+    }
+
+    protected Destination createCommandDestination() {
+        return AdvisorySupport.getAgentDestination();
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java?view=diff&rev=475210&r1=475209&r2=475210
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java
Wed Nov 15 04:02:34 2006
@@ -16,12 +16,21 @@
  */
 package org.apache.activemq.broker.util;
 
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import javax.jms.*;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 
 /**
  * @version $Revision: $
@@ -38,6 +47,9 @@
     }
 
     public void onMessage(Message message) {
+        if (log.isDebugEnabled()) {
+            log.debug("Received command: " + message);
+        }
         if (message instanceof TextMessage) {
             TextMessage request = (TextMessage) message;
             try {
@@ -48,7 +60,7 @@
                 }
                 Message response = processCommand(request);
                 addReplyHeaders(request, response);
-
+                getProducer().send(replyTo, response);
             }
             catch (Exception e) {
                 log.error("Failed to process message due to: " + e + ". Message: " + message,
e);
@@ -66,10 +78,24 @@
         }
     }
 
-    protected Message processCommand(TextMessage request) throws Exception {
+    /**
+     * Processes an incoming JMS message returning the response message
+     */
+    public Message processCommand(TextMessage request) throws Exception {
         TextMessage response = session.createTextMessage();
         getHandler().processCommand(request, response);
         return response;
+    }
+
+    /**
+     * Processes an incoming command from a console and returning the text to output
+     */
+    public String processCommandText(String line) throws Exception {
+        TextMessage request = new ActiveMQTextMessage();
+        request.setText(line);
+        TextMessage response = new ActiveMQTextMessage();
+        getHandler().processCommand(request, response);
+        return response.getText();
     }
 
     public Session getSession() {



Mime
View raw message