activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r634488 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/AdvisoryBroker.java advisory/AdvisorySupport.java broker/BrokerService.java broker/cluster/ broker/cluster/ConnectionSplitBroker.java
Date Fri, 07 Mar 2008 00:59:04 GMT
Author: rajdavies
Date: Thu Mar  6 16:59:02 2008
New Revision: 634488

URL: http://svn.apache.org/viewvc?rev=634488&view=rev
Log:
Add foundation for monitoring split brokers from a clients view point

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=634488&r1=634487&r2=634488&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Thu Mar  6 16:59:02 2008
@@ -83,10 +83,9 @@
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
-            consumers.put(info.getConsumerId(), info);
+            //consumers.put(info.getConsumerId(), info);
             fireConsumerAdvisory(context,info.getDestination(), topic, info);
         } else {
-
             // We need to replay all the previously collected state objects
             // for this newly added consumer.
             if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
@@ -128,6 +127,7 @@
                 }
             }
         }
+        consumers.put(info.getConsumerId(), info);
         return answer;
     }
 
@@ -251,6 +251,9 @@
 
     protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command,
ConsumerId targetConsumerId) throws Exception {
         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+        advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME,
getBrokerName());
+        String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
+        advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID,
id);
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 
@@ -268,6 +271,7 @@
             }
         }
         advisoryMessage.setIntProperty("consumerCount", count);
+        
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=634488&r1=634487&r2=634488&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Thu Mar  6 16:59:02 2008
@@ -40,8 +40,9 @@
     public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX +
"NoConsumer.Topic.";
     public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX +
"NoConsumer.Queue.";
     public static final String AGENT_TOPIC = "ActiveMQ.Agent";
-
     public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
+    public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
+    public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME="originBrokerName";
     public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC
+ "," + TEMP_TOPIC_ADVISORY_TOPIC);
     private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=634488&r1=634487&r2=634488&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Mar  6 16:59:02 2008
@@ -38,6 +38,7 @@
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
 import org.apache.activemq.broker.ft.MasterConnector;
 import org.apache.activemq.broker.jmx.BrokerView;
 import org.apache.activemq.broker.jmx.ConnectorView;
@@ -159,11 +160,13 @@
     private boolean useLocalHostBrokerName;
     private CountDownLatch stoppedLatch = new CountDownLatch(1);
     private boolean supportFailOver;
-    private boolean clustered;
     private Broker regionBroker;
     private int producerSystemUsagePortion = 60;
     private int consumerSystemUsagePortion = 40;
     private boolean splitSystemUsageForProducersConsumers;
+    private boolean monitorConnectionSplits;
+    private int taskRunnerPriority = Thread.NORM_PRIORITY;
+    private boolean dedicatedTaskRunner;
    
 
     static {
@@ -757,7 +760,7 @@
 
     public TaskRunnerFactory getTaskRunnerFactory() {
         if (taskRunnerFactory == null) {
-            taskRunnerFactory = new TaskRunnerFactory();
+            taskRunnerFactory = new TaskRunnerFactory("BrokerService",getTaskRunnerPriority(),true,1000,isDedicatedTaskRunner());
         }
         return taskRunnerFactory;
     }
@@ -1217,20 +1220,6 @@
     }
 
     /**
-     * @return the clustered
-     */
-    public boolean isClustered() {
-        return this.clustered;
-    }
-
-    /**
-     * @param clustered the clustered to set
-     */
-    public void setClustered(boolean clustered) {
-        this.clustered = clustered;
-    }
-
-    /**
      * Looks up and lazily creates if necessary the destination for the given JMS name
      */
     public Destination getDestination(ActiveMQDestination destination) throws Exception {
@@ -1261,7 +1250,30 @@
             boolean splitSystemUsageForProducersConsumers) {
         this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
     }
-
+    
+    public boolean isMonitorConnectionSplits() {
+		return monitorConnectionSplits;
+	}
+
+	public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
+		this.monitorConnectionSplits = monitorConnectionSplits;
+	}
+	public int getTaskRunnerPriority() {
+		return taskRunnerPriority;
+	}
+
+	public void setTaskRunnerPriority(int taskRunnerPriority) {
+		this.taskRunnerPriority = taskRunnerPriority;
+	}
+
+	public boolean isDedicatedTaskRunner() {
+		return dedicatedTaskRunner;
+	}
+
+	public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
+		this.dedicatedTaskRunner = dedicatedTaskRunner;
+	}
+	//
     // Implementation methods
     // -------------------------------------------------------------------------
     /**
@@ -1579,6 +1591,9 @@
         if (isPopulateJMSXUserID()) {
             broker = new UserIDBroker(broker);
         }
+        if (isMonitorConnectionSplits()){
+        	broker = new ConnectionSplitBroker(broker);
+        }
         if (plugins != null) {
             for (int i = 0; i < plugins.length; i++) {
                 BrokerPlugin plugin = plugins[i];
@@ -1844,4 +1859,4 @@
     public void setRegionBroker(Broker regionBroker) {
         this.regionBroker = regionBroker;
     }
-}
+}
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java?rev=634488&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
Thu Mar  6 16:59:02 2008
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.cluster;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Monitors for client connections that may fail to another
+ * broker - but this broker isn't aware they've gone.
+ * Can occur with network glitches or client error
+ * 
+ * @version $Revision$
+ */
+public class ConnectionSplitBroker extends BrokerFilter implements MessageListener{
+	private static final Log LOG = LogFactory.getLog(ConnectionSplitBroker.class);
+    private Connection connection;
+    private Map <ConnectionId,ConnectionInfo>clientMap = new ConcurrentHashMap<ConnectionId,ConnectionInfo>();
+    public ConnectionSplitBroker(Broker next) {
+        super(next);
+    }
+
+    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception
{
+    	if (info != null){
+    		clientMap.put(info.getConnectionId(),info);
+    	}
+        super.addConnection(context, info);
+    }
+
+    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable
error)
+        throws Exception {
+    	if (info != null){
+    		clientMap.remove(info.getConnectionId());
+    	}
+    	super.removeConnection(context, info, error);
+    }
+    
+    public void start() throws Exception{
+    	super.start();
+    	ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(getBrokerService().getVmConnectorURI());
+    	fac.setCloseTimeout(1);
+    	fac.setWarnAboutUnstartedConnectionTimeout(10000);
+    	fac.setWatchTopicAdvisories(false);
+    	fac.setAlwaysSessionAsync(true);
+    	connection = fac.createConnection();
+    	connection.start();
+    	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    	MessageConsumer consumer = session.createConsumer(AdvisorySupport.getConnectionAdvisoryTopic());
+    	consumer.setMessageListener(this);
+    }
+    
+    public synchronized void stop() throws Exception{
+    	if (connection != null){
+    		connection.stop();
+    		connection = null;
+    	}
+    	super.stop();
+    }
+
+	public void onMessage(javax.jms.Message m) {
+		ActiveMQMessage message = (ActiveMQMessage) m;
+		
+		DataStructure o = message.getDataStructure();
+        if (o != null && o.getClass() == ConnectionInfo.class) {
+        	ConnectionInfo info = (ConnectionInfo) o;
+        	 String brokerId=null;
+			try {
+				brokerId = message.getStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID);
+				if (brokerId != null && brokerId.equals(getBrokerId().getValue())) {
+	        		 
+	        	 }
+			} catch (JMSException e) {
+				LOG.warn("Failed to get message property "+ AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID,e);
+			}
+        	 
+        }
+		
+	}
+        
+        protected  boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
+            if (brokerPath != null) {
+                for (int i = 0; i < brokerPath.length; i++) {
+                    if (brokerId.equals(brokerPath[i])) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
+
+
+
+}



Mime
View raw message