activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r386506 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
Date Fri, 17 Mar 2006 02:22:05 GMT
Author: chirino
Date: Thu Mar 16 18:22:04 2006
New Revision: 386506

URL: http://svn.apache.org/viewcvs?rev=386506&view=rev
Log:
Deliver the events to the DiscoveryListener async to the mutlicast/heartbeating thread so
that that if the
DiscoveryListener blocks for while (like if he is being debuged) then we don't erroniously
assume that memebrs
int the group have timed out. 

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=386506&r1=386505&r2=386506&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
Thu Mar 16 18:22:04 2006
@@ -32,7 +32,13 @@
 import org.apache.activemq.transport.discovery.DiscoveryListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
 /**
@@ -67,6 +73,14 @@
     private long keepAliveInterval=DEFAULT_IDLE_TIME;
     private long lastAdvertizeTime=0;
     private AtomicBoolean started=new AtomicBoolean(false);
+    
+    private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue(), new ThreadFactory() {
+        public Thread newThread(Runnable runable) {
+            Thread t = new Thread(runable, "Multicast Discovery Agent Notifier");
+            t.setDaemon(true);
+            return t;
+        }            
+    });
 
     /**
      * Set the discovery listener
@@ -301,14 +315,25 @@
         if(selfService == null || !service.equals(selfService)){
             AtomicLong lastKeepAlive=(AtomicLong) services.get(service);
             if(lastKeepAlive==null){
-                lastKeepAlive=new AtomicLong(System.currentTimeMillis());
-                services.put(service,lastKeepAlive);
                 brokers.put(service, brokerName);
                 if(discoveryListener!=null){
-                    DiscoveryEvent event=new DiscoveryEvent(service);
+                    final DiscoveryEvent event=new DiscoveryEvent(service);
                     event.setBrokerName(brokerName);
-                    discoveryListener.onServiceAdd(event);
+                    
+                    // Have the listener process the event async so that 
+                    // he does not block this thread since we are doing time sensitive
+                    // processing of events.
+                    executor.execute(new Runnable() {
+                        public void run() {
+                            DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
+                            if(discoveryListener!=null){
+                                discoveryListener.onServiceAdd(event);
+                            }
+                        }
+                    });
                 }
+                lastKeepAlive=new AtomicLong(System.currentTimeMillis());
+                services.put(service,lastKeepAlive);
                 doAdvertizeSelf();
                 
             }
@@ -321,9 +346,20 @@
             if(services.remove(service)!=null){
                 brokers.remove(service);
                 if(discoveryListener!=null){
-                    DiscoveryEvent event=new DiscoveryEvent(service);
+                    final DiscoveryEvent event=new DiscoveryEvent(service);
                     event.setBrokerName(brokerName);
-                    discoveryListener.onServiceRemove(event);
+                    
+                    // Have the listener process the event async so that 
+                    // he does not block this thread since we are doing time sensitive
+                    // processing of events.
+                    executor.execute(new Runnable() {
+                        public void run() {
+                            DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
+                            if(discoveryListener!=null){
+                                discoveryListener.onServiceRemove(event);
+                            }
+                        }
+                    });
                 }
             }
         }



Mime
View raw message