activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r371636 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: AbstractConnection.java TransportConnection.java TransportConnector.java TransportStatusDetector.java jmx/ManagedTransportConnector.java
Date Mon, 23 Jan 2006 19:40:47 GMT
Author: chirino
Date: Mon Jan 23 11:40:42 2006
New Revision: 371636

URL: http://svn.apache.org/viewcvs?rev=371636&view=rev
Log:
The Transport Connection now notifies it's Connector of lifecycle events so that the Connector
does not have to wrap the broker in yet another filter to notice the connection's lifecycle
events.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=371636&r1=371635&r2=371636&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
Mon Jan 23 11:40:42 2006
@@ -77,7 +77,7 @@
     
     protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
     protected final TaskRunner taskRunner;
-    protected final Connector connector;
+    protected final TransportConnector connector;
     protected BrokerInfo brokerInfo;
     private ConnectionStatistics statistics = new ConnectionStatistics();
     private boolean inServiceException=false;
@@ -107,7 +107,7 @@
      * @param broker
      * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
else commands are sent async.
      */
-    public AbstractConnection(Connector connector, Broker broker, TaskRunnerFactory taskRunnerFactory)
{
+    public AbstractConnection(TransportConnector connector, Broker broker, TaskRunnerFactory
taskRunnerFactory) {
         
         this.connector = connector;
         this.broker = broker;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=371636&r1=371635&r2=371636&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Mon Jan 23 11:40:42 2006
@@ -73,9 +73,11 @@
         transport.start();
         active = true;
         super.start();
+        connector.onStarted(this);
     }
 
     public void stop() throws Exception {
+        connector.onStopped(this);
         try {
             if (masterBroker != null){
                 masterBroker.stop();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=371636&r1=371635&r2=371636&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Mon Jan 23 11:40:42 2006
@@ -27,7 +27,6 @@
 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
 import org.apache.activemq.broker.region.ConnectorStatistics;
 import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
@@ -51,7 +50,6 @@
     private static final Log log = LogFactory.getLog(TransportConnector.class);
 
     private Broker broker;
-    private BrokerFilter brokerFilter;
     private TransportServer server;
     private URI uri;
     private BrokerInfo brokerInfo = new BrokerInfo();
@@ -195,8 +193,8 @@
         }
         this.statusDector.stop();
         for (Iterator iter = connections.iterator(); iter.hasNext();) {
-            ConnectionContext context = (ConnectionContext) iter.next();
-            ss.stop(context.getConnection());
+            TransportConnection c = (TransportConnection) iter.next();
+            ss.stop(c);
         }
         ss.throwFirstException();
     }
@@ -204,28 +202,7 @@
     // Implementation methods
     // -------------------------------------------------------------------------
     protected Connection createConnection(Transport transport) throws IOException {
-        return new TransportConnection(this, transport, getBrokerFilter(), taskRunnerFactory);
-    }
-
-    protected BrokerFilter getBrokerFilter() {
-        if (brokerFilter == null) {
-            if (broker == null) {
-                throw new IllegalArgumentException("You must specify the broker property.
Maybe this connector should be added to a broker?");
-            }
-            this.brokerFilter = new BrokerFilter(broker) {
-                public void addConnection(ConnectionContext context, ConnectionInfo info)
throws Throwable {
-                    connections.add(context);
-                    super.addConnection(context, info);
-                }
-
-                public void removeConnection(ConnectionContext context, ConnectionInfo info,
Throwable error) throws Throwable {
-                    connections.remove(context);
-                    super.removeConnection(context, info, error);
-                }
-            };
-
-        }
-        return brokerFilter;
+        return new TransportConnection(this, transport, broker, taskRunnerFactory);
     }
 
     protected TransportServer createTransportServer() throws IOException, URISyntaxException
{
@@ -276,6 +253,14 @@
 
     public void setConnectUri(URI transportUri) {
         this.connectUri = transportUri;
+    }
+
+    public void onStarted(TransportConnection connection) {
+        connections.add(connection);
+    }
+
+    public void onStopped(TransportConnection connection) {
+        connections.remove(connection);
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java?rev=371636&r1=371635&r2=371636&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java
Mon Jan 23 11:40:42 2006
@@ -75,14 +75,10 @@
     }
     protected void doSweep(){
         for(Iterator i=connector.getConnections().iterator();i.hasNext();){
-            ConnectionContext cc=(ConnectionContext) i.next();
-            Connection connection=cc.getConnection();
-            if(connection instanceof TransportConnection){
-                TransportConnection tc=(TransportConnection) connection;
-                if(tc.isMarkedCandidate()){
-                    tc.doMark();
-                    collectionCandidates.add(tc);
-                }
+            TransportConnection connection=(TransportConnection) i.next();
+            if(connection.isMarkedCandidate()){
+                connection.doMark();
+                collectionCandidates.add(connection);
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java?rev=371636&r1=371635&r2=371636&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
Mon Jan 23 11:40:42 2006
@@ -57,7 +57,7 @@
             connectionId = "" + (nextConnectionId++);
         }
 
-        return new ManagedTransportConnection(this, transport, getBrokerFilter(), getTaskRunnerFactory(),
mbeanServer, connectorName, connectionId);
+        return new ManagedTransportConnection(this, transport, getBroker(), getTaskRunnerFactory(),
mbeanServer, connectorName, connectionId);
     }
 
 }



Mime
View raw message