geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdam...@apache.org
Subject cvs commit: incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/cluster ClusterHBTest.java
Date Sat, 17 Jul 2004 03:44:19 GMT
gdamour     2004/07/16 20:44:19

  Added:       sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster
                        ClusterHBSender.java ClusterHBReceiver.java
               sandbox/messaging/src/test/org/apache/geronimo/messaging/cluster
                        ClusterHBTest.java
  Log:
  Adds two new services, namely ClusterHBReceiver and ClusterHBSender, which are used to detect
and notify
  the availability or failure of nodes:
  Each node sends periodicaly heartbeats to a given multicast group. A leader in this cluster
monitors these
  heartbeats. When an heartbeat is received from a new node, it is added to the cluster. Conversely,
when
  more than a configurable number of heartbeats are missed for a given node, it is removed
from the cluster.
  
  Revision  Changes    Path
  1.1                  incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterHBSender.java
  
  Index: ClusterHBSender.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.messaging.cluster;
  
  import java.io.ByteArrayOutputStream;
  import java.io.IOException;
  import java.io.ObjectOutputStream;
  import java.net.DatagramPacket;
  import java.net.MulticastSocket;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.geronimo.gbean.GBeanInfo;
  import org.apache.geronimo.gbean.GBeanInfoFactory;
  import org.apache.geronimo.gbean.GBeanLifecycle;
  import org.apache.geronimo.gbean.GBeanLifecycleController;
  import org.apache.geronimo.gbean.WaitingException;
  import org.apache.geronimo.kernel.management.State;
  import org.apache.geronimo.messaging.Node;
  import org.apache.geronimo.pool.ClockPool;
  
  import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
  
  /**
   * Node heartbeat sender.
   * <BR>
   * Sends heartbeats for a given node, which is a member of a given cluster. 
   *
   * @version $Revision: 1.1 $ $Date: 2004/07/17 03:44:18 $
   */
  public class ClusterHBSender
      implements GBeanLifecycle
  {
  
      private static final Log log = LogFactory.getLog(ClusterHBSender.class);
      
      /**
       * Heartbeats are sent for this node.
       */
      private final Node node;
      
      private byte[] infoAsBytes;
  
      /**
       * To execure periodical operations.
       */
      private final ClockPool clockPool;
  
      /**
       * Heartbeats are sent to this cluster.
       */
      private final Cluster cluster;
  
      /**
       * Number of milliseconds between two heartbeats.
       */
      private final long delay;
      
      /**
       * To manage the state of this component when it is not possible to send
       * an heartbeat.
       */
      private final GBeanLifecycleController controller;
      
      private MulticastSocket socket;
      
      /**
       * Opaque ClockPool ticket.
       */
      private Object ticket;
  
      /**
       * Creates a node heartbeat sender.
       * 
       * @param aNode Node.
       * @param aCluster Cluster to which this node is a member.
       * @param aClockPool To execute periodical tasks.
       * @param aDelay Number of milliseconds between two heartbeats.
       * @param aController To control the lifecycle of this component.
       */
      public ClusterHBSender(Node aNode, Cluster aCluster, ClockPool aClockPool, 
          long aDelay, GBeanLifecycleController aController) {
          if ( null == aNode ) {
              throw new IllegalArgumentException("Node is required.");
          } else if ( null == aClockPool ) {
              throw new IllegalArgumentException("ClockPool is required.");
          } else if ( null == aCluster ) {
              throw new IllegalArgumentException("Cluster is required.");
          } else if ( null == aController ) {
              throw new IllegalArgumentException("Controller is required.");
          }
          node = aNode;
          clockPool = aClockPool;
          cluster = aCluster;
          delay = aDelay;
          controller = aController;
      }
  
      public void doStart() throws WaitingException, Exception {
          ByteArrayOutputStream memOut = new ByteArrayOutputStream();
          ObjectOutputStream out = new ObjectOutputStream(memOut);
          out.writeObject(node.getNodeInfo());
          out.writeLong(delay);
          out.flush();
          out.close();
          infoAsBytes = memOut.toByteArray();
          
          socket = new MulticastSocket(cluster.getClusterInfo().getPort());
          ticket = clockPool.getClockDaemon().
              executePeriodically(delay, new HeartBeatTask(), true);
      }
  
      public void doStop() throws WaitingException, Exception {
          ClockDaemon.cancel(ticket);
          socket.close();
      }
  
      public void doFail() {
          ClockDaemon.cancel(ticket);
          socket.close();
      }
  
      private class HeartBeatTask implements Runnable {
  
          public void run() {
              ClusterInfo info = cluster.getClusterInfo();
              DatagramPacket packet = new DatagramPacket(infoAsBytes,
                  infoAsBytes.length, info.getAddress(),info.getPort());
              try {
                  socket.send(packet);
              } catch (IOException e) {
                  if ( State.RUNNING.toInt() != controller.getState() ) {
                      log.error("Can not send heartbeat packet", e);
                      controller.fail();
                  }
              }
          }
          
      }
      
      
      public static final GBeanInfo GBEAN_INFO;
  
      static {
          GBeanInfoFactory factory = new GBeanInfoFactory(ClusterHBSender.class);
          factory.setConstructor(new String[] {"Node", "Cluster", "ClockPool",
              "delay", "gbeanLifecycleController"});
          factory.addReference("Node", Node.class);
          factory.addReference("Cluster", Cluster.class);
          factory.addReference("ClockPool", ClockPool.class);
          factory.addAttribute("delay", long.class, true);
          factory.addAttribute("gbeanLifecycleController", GBeanLifecycleController.class,
false);
          GBEAN_INFO = factory.getBeanInfo();
      }
  
      public static GBeanInfo getGBeanInfo() {
          return GBEAN_INFO;
      }
      
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterHBReceiver.java
  
  Index: ClusterHBReceiver.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.messaging.cluster;
  
  import java.io.ByteArrayInputStream;
  import java.io.IOException;
  import java.io.ObjectInputStream;
  import java.net.DatagramPacket;
  import java.net.MulticastSocket;
  import java.net.SocketException;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.util.Map;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.geronimo.gbean.GBeanInfo;
  import org.apache.geronimo.gbean.GBeanInfoFactory;
  import org.apache.geronimo.gbean.GBeanLifecycle;
  import org.apache.geronimo.gbean.WaitingException;
  import org.apache.geronimo.messaging.NodeException;
  import org.apache.geronimo.messaging.NodeInfo;
  import org.apache.geronimo.pool.ClockPool;
  
  import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
  
  /**
   * Heartbeats listeners.
   * <BR>
   * It joins the multicast group associated to the bound cluster and monitors
   * node heartbeats. When an heartbeat is received for the very first time, it
   * adds it the underlying cluster. Conversely, when a configurable number of
   * heartbeats have been missed, it removes it from the underlying cluster.
   * <BR>
   * This service must be executed by a single node of the cluster. If the node
   * running this service fails, then another node of the cluster should start
   * automatically this service.
   *
   * @version $Revision: 1.1 $ $Date: 2004/07/17 03:44:18 $
   */
  public class ClusterHBReceiver
      implements GBeanLifecycle
  {
  
      private static final Log log = LogFactory.getLog(ClusterHBReceiver.class);
      
      /**
       * Cluster to be notified when new nodes have been detected or have not
       * sent their heartbeats.
       */
      private final Cluster cluster;
      
      /**
       * To execute periodical tasks.
       */
      private final ClockPool clockPool;
      
      /**
       * Number of heartbeats which can be missed safely. If more than this 
       * number of heartbeat is missed for a given node, then the underlying
       * cluster is notified of the failure of a node.
       */
      private final int nbMissed; 
  
      /**
       * NodeInfo to HeartbeatTracker map.
       */
      private final Map trackers;
  
      private MulticastSocket socket;
  
      /**
       * Is this service running?
       */
      private boolean running;
      
      /**
       * Creates an heartbeat listener.
       * 
       * @param aCluster Cluster to be notified when a new node has been detected
       * or when aNbMissed heartbeats have been missed and the associated node
       * needs to be drop from the cluster.
       * @param aClockPool To execute periodical operations.
       * @param aNbMissed Number of heartbeats which can be missed safely. If more
       * than this number of heartbeats are missed, then the associated node
       * is dropped.
       */
      public ClusterHBReceiver(Cluster aCluster, ClockPool aClockPool,
          int aNbMissed) {
          if ( null == aCluster ) {
              throw new IllegalArgumentException("Cluster is required.");
          } else if ( null == aClockPool ) {
              throw new IllegalArgumentException("ClockPool is required.");
          }
          cluster = aCluster;
          clockPool = aClockPool;
          nbMissed = aNbMissed;
          
          trackers = new HashMap();
      }
      
      public void doStart() throws WaitingException, Exception {
          ClusterInfo info = cluster.getClusterInfo();
          socket = new MulticastSocket(info.getPort());
          socket.joinGroup(info.getAddress());
          running = true;
          new Thread(new HearbeatListener()).start();
      }
  
      public void doStop() throws WaitingException, Exception {
          ClusterInfo info = cluster.getClusterInfo();
          running = false;
          stopTrackers();
          socket.leaveGroup(info.getAddress());
          socket.close();
      }
  
      public void doFail() {
          ClusterInfo info = cluster.getClusterInfo();
          running = false;
          stopTrackers();
          try {
              socket.leaveGroup(info.getAddress());
          } catch (IOException e) {
              log.error("Can not leave group", e);
          }
          socket.close();
      }
  
      /**
       * Stops the node trackers.
       */
      private void stopTrackers() {
          Collection tmpTrackers;
          synchronized(trackers) {
              tmpTrackers = new ArrayList(trackers.values());
          }
          for (Iterator iter = tmpTrackers.iterator(); iter.hasNext();) {
              HeartbeatTracker tracker = (HeartbeatTracker) iter.next();
              try {
                  tracker.stop();
              } catch (NodeException e) {
                  log.error(e);
              }
          }
      }
      
      /**
       * Listens to the heartbeat sent to this service.
       */
      private class HearbeatListener implements Runnable {
          public void run() {
              while ( running ) {
                  try {
                      byte[] buf = new byte[32768];
                      DatagramPacket packet = new DatagramPacket(buf, buf.length);
                      try {
                          socket.receive(packet);
                      } catch (SocketException e) {
                          // This can happen when the socket is closed. So we 
                          // simply break.
                          // TODO check if the state is really stopping. If it
                          // is not, then one needs to stop the service.
                          log.error(e);
                          break;
                      }
                      ByteArrayInputStream memIn =
                          new ByteArrayInputStream(buf, 0, packet.getLength());
                      ObjectInputStream in = new ObjectInputStream(memIn);
                      NodeInfo nodeInfo = (NodeInfo) in.readObject();
                      long tempo = in.readLong();
                      HeartbeatTracker tracker;
                      synchronized(trackers) {
                          tracker = (HeartbeatTracker) trackers.get(nodeInfo);
                          if ( null == tracker ) {
                              tracker = new HeartbeatTracker(nodeInfo, tempo);
                              tracker.start();
                          }
                      }
                      tracker.lastTimestamp = System.currentTimeMillis();
                  } catch (Exception e) {
                      log.error("Error while listening heartbeat", e);
                  }
              }
          }
      }
      
      /**
       * Tracks the heartbeat of a given node.
       */
      private class HeartbeatTracker implements Runnable {
          /**
           * Node to be tracked.
           */
          private final NodeInfo node;
          /**
           * Delay between two heartbeats sent by the node.
           */
          private final long delay;
          /**
           * Last time that an heartbeat has been received.
           */
          private long lastTimestamp;
          /**
           * Current number of missed heartbeats.
           */
          private int missed;
          /**
           * Opaque ClockPool ticket.
           */
          private Object ticket;
          private HeartbeatTracker(NodeInfo aNode, long aDelay) {
              node = aNode;
              delay = aDelay;
          }
          public void run() {
              long inactivePeriod = System.currentTimeMillis() - lastTimestamp;
              if ( nbMissed < inactivePeriod / delay ) {
                  try {
                      stop();
                  } catch (NodeException e) {
                      log.error(e);
                  }
              }
          }
          public void start() throws NodeException {
              cluster.addMember(node);
              ticket = clockPool.getClockDaemon().
                  executePeriodically(delay, this, false);
              trackers.put(node, this);
          }
          public void stop() throws NodeException {
              synchronized(trackers) {
                  trackers.remove(node);
              }
              ClockDaemon.cancel(ticket);
              cluster.removeMember(node);
          }
      }
      
  
      public static final GBeanInfo GBEAN_INFO;
  
      static {
          GBeanInfoFactory factory = new GBeanInfoFactory(ClusterHBReceiver.class);
          factory.setConstructor(new String[] {"Cluster", "ClockPool", "nbMissed"});
          factory.addReference("Cluster", Cluster.class);
          factory.addReference("ClockPool", ClockPool.class);
          factory.addAttribute("nbMissed", int.class, true);
          GBEAN_INFO = factory.getBeanInfo();
      }
  
      public static GBeanInfo getGBeanInfo() {
          return GBEAN_INFO;
      }
  
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/cluster/ClusterHBTest.java
  
  Index: ClusterHBTest.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.messaging.cluster;
  
  import java.net.InetAddress;
  import java.util.ArrayList;
  import java.util.Collections;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
  
  import org.apache.geronimo.gbean.GBeanLifecycleController;
  import org.apache.geronimo.messaging.MockNode;
  import org.apache.geronimo.messaging.NodeInfo;
  import org.apache.geronimo.pool.ClockPool;
  
  import junit.framework.TestCase;
  
  /**
   *
   * @version $Revision: 1.1 $ $Date: 2004/07/17 03:44:19 $
   */
  public class ClusterHBTest
      extends TestCase
  {
  
      private ClockPool cp;
      private long delay;
      private int nbMissed;
      
      private MockNode node1;
      private ClusterHBSender sender1;
  
      private MockNode node2;
      private ClusterHBSender sender2;
      
      private MockCluster cluster;
      
      private ClusterHBReceiver receiver;
      
      protected void setUp() throws Exception {
          cp = new ClockPool();
          cp.setPoolName("CP");
          cp.doStart();
  
          delay = 500;
          nbMissed = 2;
  
          InetAddress groupAddress = InetAddress.getByName("235.0.0.1");
          int port = 6667;
  
          InetAddress localhost = InetAddress.getLocalHost();
          
          ClusterInfo clusterInfo = new ClusterInfo(groupAddress, port);
          node1 = new MockNode();
          node1.setNodeInfo(new NodeInfo("node1", localhost, 1234));
          sender1 =
              new ClusterHBSender(node1, new MockCluster(clusterInfo), cp, delay,
                  new MyMockController());
          
          node2 = new MockNode();
          node2.setNodeInfo(new NodeInfo("node2", localhost, 1234));
          sender2 =
              new ClusterHBSender(node2, new MockCluster(clusterInfo), cp, delay,
                  new MyMockController());
          
          cluster = new MockCluster(clusterInfo);
          receiver = new ClusterHBReceiver(cluster, cp, nbMissed);
      }
  
      protected void tearDown() throws Exception {
          receiver.doStop();
          cp.doStop();
      }
      
      public void testRegister() throws Exception {
          sender1.doStart();
          receiver.doStart();
  
          Thread.sleep(delay * 2);
          assertEquals(1, cluster.nodes.size());
          assertEquals(node1.getNodeInfo(), cluster.nodes.get(0));
  
          sender2.doStart();
  
          Thread.sleep(delay * 2);
          assertEquals(2, cluster.nodes.size());
          assertEquals(node2.getNodeInfo(), cluster.nodes.get(1));
  
          sender2.doStop();
          sender1.doStop();
      }
      
      public void testUnregister() throws Exception {
          sender1.doStart();
          sender2.doStart();
          receiver.doStart();
  
          Thread.sleep(delay * 2);
  
          assertEquals(2, cluster.nodes.size());
  
          sender1.doStop();
  
          Thread.sleep(delay * (nbMissed + 1) );
          assertEquals(1, cluster.nodes.size());
          
          sender2.doStop();
          
          Thread.sleep(delay * (nbMissed + 1) );
          assertEquals(0, cluster.nodes.size());
      }
      
      private class MyMockController implements GBeanLifecycleController {
          public int getState() {
              return 0;
          }
          public void start() throws Exception {
          }
          public void stop() throws Exception {
          }
          public void fail() {
          }
      }
      
      private class MockCluster implements Cluster {
          private List nodes = Collections.synchronizedList(new ArrayList());
          private final ClusterInfo info;
          private MockCluster(ClusterInfo anInfo) {
              info = anInfo;
          }
          public Set getMembers() {
              return new HashSet(nodes);
          }
          public void addMember(NodeInfo aNode) {
              nodes.add(aNode);
          }
          public void removeMember(NodeInfo aNode) {
              nodes.remove(aNode);
          }
          public void addListener(ClusterEventListener aListener) {
          }
          public void removeListener(ClusterEventListener aListener) {
          }
          public ClusterInfo getClusterInfo() {
              return info;
          }
      }
      
  }
  
  
  

Mime
View raw message