hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1458199 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-server/src/main/java/org/apache/hadoop/hbase/master/ hbase-server/src/test/java/org/apache/hadoop/hbase/master/
Date Tue, 19 Mar 2013 10:07:17 GMT
Author: nkeywal
Date: Tue Mar 19 10:07:17 2013
New Revision: 1458199

URL: http://svn.apache.org/r1458199
Log:
HBASE-7590 Add a costless notifications mechanism from master to regionservers & clients
- new files

Added:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterStatusPublisher.java

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java?rev=1458199&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
(added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
Tue Mar 19 10:07:17 2013
@@ -0,0 +1,246 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.util.Threads;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.DatagramChannel;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
+import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+
+/**
+ * A class that receives the cluster status, and provide it as a set of service to the client.
+ * Today, manages only the dead server list.
+ * The class is abstract to allow multiple implementations, from ZooKeeper to multicast based.
+ */
+@InterfaceAudience.Private
+class ClusterStatusListener implements Closeable {
+  private static final Log LOG = LogFactory.getLog(ClusterStatusListener.class);
+  private final List<ServerName> deadServers = new ArrayList<ServerName>();
+  private final DeadServerHandler deadServerHandler;
+  private final Listener listener;
+
+  /**
+   * The implementation class to use to read the status. Default is null.
+   */
+  public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
+  public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS = null;
+
+  /**
+   * Class to be extended to manage a new dead server.
+   */
+  public interface DeadServerHandler {
+
+    /**
+     * Called when a server is identified as dead. Called only once even if we receive the
+     * information multiple times.
+     *
+     * @param sn - the server name
+     */
+    public void newDead(ServerName sn);
+  }
+
+
+  /**
+   * The interface to be implented by a listener of a cluster status event.
+   */
+  static interface Listener extends Closeable {
+    /**
+     * Called to close the resources, if any. Cannot throw an exception.
+     */
+    @Override
+    public void close();
+
+    /**
+     * Called to connect.
+     *
+     * @param conf Configuration to use.
+     * @throws IOException
+     */
+    public void connect(Configuration conf) throws IOException;
+  }
+
+  public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
+                               Class<? extends Listener> listenerClass) throws IOException
{
+    this.deadServerHandler = dsh;
+    try {
+      Constructor<? extends Listener> ctor =
+          listenerClass.getConstructor(ClusterStatusListener.class);
+      this.listener = ctor.newInstance(this);
+    } catch (InstantiationException e) {
+      throw new IOException("Can't create listener " + listenerClass.getName(), e);
+    } catch (IllegalAccessException e) {
+      throw new IOException("Can't create listener " + listenerClass.getName(), e);
+    } catch (NoSuchMethodException e) {
+      throw new IllegalStateException();
+    } catch (InvocationTargetException e) {
+      throw new IllegalStateException();
+    }
+
+    this.listener.connect(conf);
+  }
+
+  /**
+   * Acts upon the reception of a new cluster status.
+   *
+   * @param ncs the cluster status
+   */
+  public void receive(ClusterStatus ncs) {
+    if (ncs.getDeadServerNames() != null) {
+      for (ServerName sn : ncs.getDeadServerNames()) {
+        if (!isDeadServer(sn)) {
+          LOG.info("There is a new dead server: " + sn);
+          deadServers.add(sn);
+          if (deadServerHandler != null) {
+            deadServerHandler.newDead(sn);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    listener.close();
+  }
+
+  /**
+   * Check if we know if a server is dead.
+   *
+   * @param sn the server name to check.
+   * @return true if we know for sure that the server is dead, false otherwise.
+   */
+  public boolean isDeadServer(ServerName sn) {
+    if (sn.getStartcode() <= 0) {
+      return false;
+    }
+
+    for (ServerName dead : deadServers) {
+      if (dead.getStartcode() >= sn.getStartcode() &&
+          dead.getPort() == sn.getPort() &&
+          dead.getHostname().equals(sn.getHostname())) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+
+  /**
+   * An implementation using a multicast message between the master & the client.
+   */
+  class MultiCastListener implements Listener {
+    private DatagramChannel channel;
+    private final ExecutorService service = Executors.newSingleThreadExecutor(
+        Threads.newDaemonThreadFactory("hbase-client-clusterStatus-multiCastListener"));
+
+
+    public MultiCastListener() {
+    }
+
+    public void connect(Configuration conf) throws IOException {
+      // Can't be NiO with Netty today => not implemented in Netty.
+      DatagramChannelFactory f = new OioDatagramChannelFactory(service);
+
+      ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
+      b.setPipeline(Channels.pipeline(
+          new ProtobufDecoder(ClusterStatusProtos.ClusterStatus.getDefaultInstance()),
+          new ClusterStatusHandler()));
+
+      String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
+          HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
+      int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
+          HConstants.DEFAULT_STATUS_MULTICAST_PORT);
+      channel = (DatagramChannel) b.bind(new InetSocketAddress(mcAddress, port));
+
+      channel.getConfig().setReuseAddress(true);
+
+      InetAddress ina;
+      try {
+        ina = InetAddress.getByName(mcAddress);
+      } catch (UnknownHostException e) {
+        throw new IOException("Can't connect to " + mcAddress, e);
+      }
+      channel.joinGroup(ina);
+    }
+
+    @Override
+    public void close() {
+      if (channel != null) {
+        channel.close();
+        channel = null;
+      }
+      service.shutdown();
+    }
+
+
+    /**
+     * Class, conforming to the Netty framework, that manages the message received.
+     */
+    private class ClusterStatusHandler extends SimpleChannelUpstreamHandler {
+
+      @Override
+      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
+        ClusterStatusProtos.ClusterStatus csp = (ClusterStatusProtos.ClusterStatus) e.getMessage();
+        ClusterStatus ncs = ClusterStatus.convert(csp);
+        receive(ncs);
+      }
+
+      /**
+       * Invoked when an exception was raised by an I/O thread or a
+       * {@link org.jboss.netty.channel.ChannelHandler}.
+       */
+      @Override
+      public void exceptionCaught(
+          ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+        LOG.error("Unexpected exception, continuing.", e.getCause());
+      }
+    }
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java?rev=1458199&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
(added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
Tue Mar 19 10:07:17 2013
@@ -0,0 +1,279 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hbase.master;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.DatagramChannel;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
+import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Class to publish the cluster status to the client. This allows them to know immediately
+ *  the dead region servers, hence to cut the connection they have with them, eventually
stop
+ *  waiting on the socket. This improves the mean time to recover, and as well allows to
increase
+ *  on the client the different timeouts, as the dead servers will be detected separately.
+ */
+@InterfaceAudience.Private
+public class ClusterStatusPublisher extends Chore {
+  /**
+   * The implementation class used to publish the status. Default is null (no publish).
+   * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast
the
+   * status.
+   */
+  public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
+  public static final Class<? extends ClusterStatusPublisher.Publisher>
+      DEFAULT_STATUS_PUBLISHER_CLASS = null;
+
+  /**
+   * The minimum time between two status messages, in milliseconds.
+   */
+  public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
+  public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
+
+  private long lastMessageTime = 0;
+  private final HMaster master;
+  private final int messagePeriod; // time between two message
+  private final ConcurrentMap<ServerName, Integer> lastSent =
+      new ConcurrentHashMap<ServerName, Integer>();
+  private Publisher publisher;
+  private boolean connected = false;
+
+  /**
+   * We want to limit the size of the protobuf message sent, do fit into a single packet.
+   * a reasonable size for ip / ethernet is less than 1Kb.
+   */
+  public static int MAX_SERVER_PER_MESSAGE = 10;
+
+  /**
+   * If a server dies, we're sending the information multiple times in case a receiver misses
the
+   * message.
+   */
+  public static int NB_SEND = 5;
+
+  public ClusterStatusPublisher(HMaster master, Configuration conf,
+                                Class<? extends Publisher> publisherClass)
+      throws IOException {
+    super("HBase clusterStatusPublisher for " + master.getName(),
+        conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master);
+    this.master = master;
+    this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
+    try {
+      this.publisher = publisherClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IOException("Can't create publisher " + publisherClass.getName(), e);
+    } catch (IllegalAccessException e) {
+      throw new IOException("Can't create publisher " + publisherClass.getName(), e);
+    }
+    this.publisher.connect(conf);
+    connected = true;
+  }
+
+  // For tests only
+  protected ClusterStatusPublisher() {
+    master = null;
+    messagePeriod = 0;
+  }
+
+  @Override
+  protected void chore() {
+    if (!connected) {
+      return;
+    }
+
+    List<ServerName> sns = generateDeadServersListToSend();
+    if (sns.isEmpty()) {
+      // Nothing to send. Done.
+      return;
+    }
+
+    final long curTime = EnvironmentEdgeManager.currentTimeMillis();
+    if (lastMessageTime > curTime - messagePeriod) {
+      // We already sent something less than 10 second ago. Done.
+      return;
+    }
+
+    // Ok, we're going to send something then.
+    lastMessageTime = curTime;
+
+    // We're reusing an existing protobuf message, but we don't send everything.
+    // This could be extended in the future, for example if we want to send stuff like the
+    //  META server name.
+    ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(),
+        master.getMasterFileSystem().getClusterId().toString(),
+        null,
+        sns,
+        master.getServerName(),
+        null,
+        null,
+        null,
+        null);
+
+
+    publisher.publish(cs);
+  }
+
+  protected void cleanup() {
+    connected = false;
+    publisher.close();
+  }
+
+  /**
+   * Create the dead server to send. A dead server is sent NB_SEND times. We send at max
+   * MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly
+   * dead first.
+   */
+  protected List<ServerName> generateDeadServersListToSend() {
+    // We're getting the message sent since last time, and add them to the list
+    long since = EnvironmentEdgeManager.currentTimeMillis() - messagePeriod * 2;
+    for (Pair<ServerName, Long> dead : getDeadServers(since)) {
+      lastSent.putIfAbsent(dead.getFirst(), 0);
+    }
+
+    // We're sending the new deads first.
+    List<Map.Entry<ServerName, Integer>> entries = new ArrayList<Map.Entry<ServerName,
Integer>>();
+    entries.addAll(lastSent.entrySet());
+    Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>()
{
+      @Override
+      public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName,
Integer> o2) {
+        return o1.getValue().compareTo(o2.getValue());
+      }
+    });
+
+    // With a limit of MAX_SERVER_PER_MESSAGE
+    int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
+    List<ServerName> res = new ArrayList<ServerName>(max);
+
+    for (int i = 0; i < max; i++) {
+      Map.Entry<ServerName, Integer> toSend = entries.get(i);
+      if (toSend.getValue() >= (NB_SEND - 1)) {
+        lastSent.remove(toSend.getKey());
+      } else {
+        lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
+      }
+
+      res.add(toSend.getKey());
+    }
+
+    return res;
+  }
+
+  /**
+   * Get the servers which died since a given timestamp.
+   * protected because it can be subclassed by the tests.
+   */
+  protected List<Pair<ServerName, Long>> getDeadServers(long since) {
+    if (master.getServerManager() == null) {
+      return Collections.emptyList();
+    }
+
+    return master.getServerManager().getDeadServers().copyDeadServersSince(since);
+  }
+
+
+  public static interface Publisher extends Closeable {
+
+    public void connect(Configuration conf) throws IOException;
+
+    public void publish(ClusterStatus cs);
+
+    @Override
+    public void close();
+  }
+
+  public static class MulticastPublisher implements Publisher {
+    private DatagramChannel channel;
+    private final ExecutorService service = Executors.newSingleThreadExecutor(
+        Threads.newDaemonThreadFactory("hbase-master-clusterStatus-worker"));
+
+    public MulticastPublisher() {
+    }
+
+    @Override
+    public void connect(Configuration conf) throws IOException {
+      String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
+          HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
+      int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
+          HConstants.DEFAULT_STATUS_MULTICAST_PORT);
+
+      // Can't be NiO with Netty today => not implemented in Netty.
+      DatagramChannelFactory f = new OioDatagramChannelFactory(service);
+
+      ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
+      b.setPipeline(Channels.pipeline(new ProtobufEncoder()));
+
+
+      channel = (DatagramChannel) b.bind(new InetSocketAddress(0));
+      channel.getConfig().setReuseAddress(true);
+
+      InetAddress ina;
+      try {
+        ina = InetAddress.getByName(mcAddress);
+      } catch (UnknownHostException e) {
+        throw new IOException("Can't connect to " + mcAddress, e);
+      }
+      channel.joinGroup(ina);
+      channel.connect(new InetSocketAddress(mcAddress, port));
+    }
+
+    @Override
+    public void publish(ClusterStatus cs) {
+      ClusterStatusProtos.ClusterStatus csp = cs.convert();
+      channel.write(csp);
+    }
+
+    @Override
+    public void close() {
+      if (channel != null) {
+        channel.close();
+      }
+      service.shutdown();
+    }
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterStatusPublisher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterStatusPublisher.java?rev=1458199&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterStatusPublisher.java
(added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClusterStatusPublisher.java
Tue Mar 19 10:07:17 2013
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master;
+
+
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Category(MediumTests.class) // Plays with the ManualEnvironmentEdge
+public class TestClusterStatusPublisher {
+  private ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
+
+  @Before
+  public void before() {
+    mee.setValue(0);
+    EnvironmentEdgeManager.injectEdge(mee);
+  }
+
+  @Test
+  public void testEmpty() {
+    ClusterStatusPublisher csp = new ClusterStatusPublisher() {
+      @Override
+      protected List<Pair<ServerName, Long>> getDeadServers(long since) {
+        return new ArrayList<Pair<ServerName, Long>>();
+      }
+    };
+
+    Assert.assertTrue(csp.generateDeadServersListToSend().isEmpty());
+  }
+
+  @Test
+  public void testMaxSend() {
+    ClusterStatusPublisher csp = new ClusterStatusPublisher() {
+      @Override
+      protected List<Pair<ServerName, Long>> getDeadServers(long since) {
+        List<Pair<ServerName, Long>> res = new ArrayList<Pair<ServerName,
Long>>();
+        switch ((int) EnvironmentEdgeManager.currentTimeMillis()) {
+          case 2:
+            res.add(new Pair<ServerName, Long>(new ServerName("hn", 10, 10), 1L));
+            break;
+          case 1000:
+            break;
+        }
+
+        return res;
+      }
+    };
+
+    mee.setValue(2);
+    for (int i = 0; i < ClusterStatusPublisher.NB_SEND; i++) {
+      Assert.assertEquals("i=" + i, 1, csp.generateDeadServersListToSend().size());
+    }
+    mee.setValue(1000);
+    Assert.assertTrue(csp.generateDeadServersListToSend().isEmpty());
+  }
+
+  @Test
+  public void testOrder() {
+    ClusterStatusPublisher csp = new ClusterStatusPublisher() {
+      @Override
+      protected List<Pair<ServerName, Long>> getDeadServers(long since) {
+        List<Pair<ServerName, Long>> res = new ArrayList<Pair<ServerName,
Long>>();
+        for (int i = 0; i < 25; i++) {
+          res.add(new Pair<ServerName, Long>(new ServerName("hn" + i, 10, 10), 20L));
+        }
+
+        return res;
+      }
+    };
+
+
+    mee.setValue(3);
+    List<ServerName> allSNS = csp.generateDeadServersListToSend();
+
+    Assert.assertEquals(10, ClusterStatusPublisher.MAX_SERVER_PER_MESSAGE);
+    Assert.assertEquals(10, allSNS.size());
+
+    List<ServerName> nextMes = csp.generateDeadServersListToSend();
+    Assert.assertEquals(10, nextMes.size());
+    for (ServerName sn : nextMes) {
+      if (!allSNS.contains(sn)) {
+        allSNS.add(sn);
+      }
+    }
+    Assert.assertEquals(20, allSNS.size());
+
+    nextMes = csp.generateDeadServersListToSend();
+    Assert.assertEquals(10, nextMes.size());
+    for (ServerName sn : nextMes) {
+      if (!allSNS.contains(sn)) {
+        allSNS.add(sn);
+      }
+    }
+    Assert.assertEquals(25, allSNS.size());
+
+    nextMes = csp.generateDeadServersListToSend();
+    Assert.assertEquals(10, nextMes.size());
+    for (ServerName sn : nextMes) {
+      if (!allSNS.contains(sn)) {
+        allSNS.add(sn);
+      }
+    }
+    Assert.assertEquals(25, allSNS.size());
+  }
+}



Mime
View raw message