Return-Path: Delivered-To: apmail-geronimo-scm-archive@www.apache.org Received: (qmail 76220 invoked from network); 20 Jul 2004 00:01:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur-2.apache.org with SMTP; 20 Jul 2004 00:01:57 -0000 Received: (qmail 24473 invoked by uid 500); 20 Jul 2004 00:01:56 -0000 Delivered-To: apmail-geronimo-scm-archive@geronimo.apache.org Received: (qmail 24454 invoked by uid 500); 20 Jul 2004 00:01:56 -0000 Mailing-List: contact scm-help@geronimo.apache.org; run by ezmlm Precedence: bulk list-help: list-unsubscribe: list-post: Reply-To: dev@geronimo.apache.org Delivered-To: mailing list scm@geronimo.apache.org Received: (qmail 24428 invoked by uid 500); 20 Jul 2004 00:01:55 -0000 Delivered-To: apmail-incubator-geronimo-cvs@apache.org Received: (qmail 24425 invoked by uid 99); 20 Jul 2004 00:01:55 -0000 X-ASF-Spam-Status: No, hits=0.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.27.1) with SMTP; Mon, 19 Jul 2004 17:01:55 -0700 Received: (qmail 76198 invoked by uid 1782); 20 Jul 2004 00:01:54 -0000 Date: 20 Jul 2004 00:01:54 -0000 Message-ID: <20040720000154.76197.qmail@minotaur.apache.org> From: gdamour@apache.org To: incubator-geronimo-cvs@apache.org Subject: cvs commit: incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster ClusterHBReceiver.java X-Virus-Checked: Checked X-Spam-Rating: minotaur-2.apache.org 1.6.2 0/1000/N gdamour 2004/07/19 17:01:54 Modified: sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster ClusterHBReceiver.java Log: o queue the operations (add/remove node) to be performed on a cluster in order to have accurate timestamps associated to node heartbeats; o prevent the underlying thread of ClockDaemon to fail in case of a reconfiguration problem (such as a node failing right in the middle of a topology reconfiguration). Revision Changes Path 1.2 +49 -7 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterHBReceiver.java Index: ClusterHBReceiver.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterHBReceiver.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- ClusterHBReceiver.java 17 Jul 2004 03:44:18 -0000 1.1 +++ ClusterHBReceiver.java 20 Jul 2004 00:01:54 -0000 1.2 @@ -40,13 +40,14 @@ import org.apache.geronimo.pool.ClockPool; import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; +import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor; /** - * Heartbeats listeners. + * Heartbeats listener. *
* It joins the multicast group associated to the bound cluster and monitors * node heartbeats. When an heartbeat is received for the very first time, it - * adds it the underlying cluster. Conversely, when a configurable number of + * adds it to the underlying cluster. Conversely, when a configurable number of * heartbeats have been missed, it removes it from the underlying cluster. *
* This service must be executed by a single node of the cluster. If the node @@ -84,6 +85,11 @@ */ private final Map trackers; + /** + * Queue the cluster operations (add or remove node). + */ + private QueuedExecutor queuedExecutor; + private MulticastSocket socket; /** @@ -122,6 +128,7 @@ socket.joinGroup(info.getAddress()); running = true; new Thread(new HearbeatListener()).start(); + queuedExecutor = new QueuedExecutor(); } public void doStop() throws WaitingException, Exception { @@ -130,6 +137,7 @@ stopTrackers(); socket.leaveGroup(info.getAddress()); socket.close(); + queuedExecutor.shutdownAfterProcessingCurrentlyQueuedTasks(); } public void doFail() { @@ -142,6 +150,7 @@ log.error("Can not leave group", e); } socket.close(); + queuedExecutor.shutdownAfterProcessingCurrentlyQueuedTasks(); } /** @@ -181,20 +190,27 @@ log.error(e); break; } + long timestamp = System.currentTimeMillis(); ByteArrayInputStream memIn = new ByteArrayInputStream(buf, 0, packet.getLength()); ObjectInputStream in = new ObjectInputStream(memIn); NodeInfo nodeInfo = (NodeInfo) in.readObject(); - long tempo = in.readLong(); HeartbeatTracker tracker; synchronized(trackers) { tracker = (HeartbeatTracker) trackers.get(nodeInfo); if ( null == tracker ) { + long tempo = in.readLong(); tracker = new HeartbeatTracker(nodeInfo, tempo); - tracker.start(); + trackers.put(nodeInfo, tracker); + // Does not start the tracker in this thread + // as one wants this loop to reflect "correct" + // timestamps. When a tracker is started, it + // adds its associated node to the cluster, which + // can takes some time. + queuedExecutor.execute(new StartTracker(tracker)); } } - tracker.lastTimestamp = System.currentTimeMillis(); + tracker.lastTimestamp = timestamp; } catch (Exception e) { log.error("Error while listening heartbeat", e); } @@ -203,6 +219,26 @@ } /** + * Starts an heartbeat tracker. + */ + private class StartTracker implements Runnable { + private final HeartbeatTracker tracker; + private StartTracker(HeartbeatTracker aTracker) { + tracker = aTracker; + } + public void run() { + try { + tracker.start(); + } catch (NodeException e) { + synchronized(trackers) { + trackers.remove(tracker.node); + } + log.error("Can not start tracker", e); + } + } + } + + /** * Tracks the heartbeat of a given node. */ private class HeartbeatTracker implements Runnable { @@ -237,6 +273,13 @@ stop(); } catch (NodeException e) { log.error(e); + } catch (Throwable e) { + // Ensures that the underlying thread of ClockDaemon + // is not interrupted. + // TODO as a matter of fact, this happen if a node + // fails during a reconfiguration. There is a bug in + // the way exceptions are unwrapped via GBeans. + log.error(e); } } } @@ -244,7 +287,6 @@ cluster.addMember(node); ticket = clockPool.getClockDaemon(). executePeriodically(delay, this, false); - trackers.put(node, this); } public void stop() throws NodeException { synchronized(trackers) {