Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 59F811012F for ; Fri, 7 Mar 2014 17:40:07 +0000 (UTC) Received: (qmail 27895 invoked by uid 500); 7 Mar 2014 17:40:07 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 27799 invoked by uid 500); 7 Mar 2014 17:40:05 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 27791 invoked by uid 99); 7 Mar 2014 17:40:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Mar 2014 17:40:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Mar 2014 17:39:58 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8DF2723888E2; Fri, 7 Mar 2014 17:39:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1575338 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/common/ hedwig-server/src/main/java/org/apache/hedwig/server/topics/ hedwig-server/src/test/java/org/apache/hedwig/server/topics/ Date: Fri, 07 Mar 2014 17:39:38 -0000 To: commits@zookeeper.apache.org From: ivank@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140307173938.8DF2723888E2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ivank Date: Fri Mar 7 17:39:37 2014 New Revision: 1575338 URL: http://svn.apache.org/r1575338 Log: BOOKKEEPER-363: Re-distributing topics among newly added hubs. (aniruddha via ivank) Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1575338&r1=1575337&r2=1575338&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Mar 7 17:39:37 2014 @@ -182,6 +182,8 @@ Trunk (unreleased changes) BOOKKEEPER-683: TestSubAfterCloseSub fails on 4.2 (jiannan via ivank) + BOOKKEEPER-363: Re-distributing topics among newly added hubs. (aniruddha via ivank) + hedwig-client: BOOKKEEPER-598: Fails to compile - RESUBSCRIBE_EXCEPTION conflict (Matthew Farrellee via sijie) Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1575338&r1=1575337&r2=1575338&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Fri Mar 7 17:39:37 2014 @@ -27,15 +27,16 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import org.apache.bookkeeper.util.ReflectionUtils; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.lang.StringUtils; - -import com.google.protobuf.ByteString; -import org.apache.bookkeeper.util.ReflectionUtils; import org.apache.hedwig.conf.AbstractConfiguration; import org.apache.hedwig.server.meta.MetadataManagerFactory; +import org.apache.hedwig.server.topics.HubLoad; import org.apache.hedwig.util.HedwigSocketAddress; +import com.google.protobuf.ByteString; + public class ServerConfiguration extends AbstractConfiguration { public final static String REGION = "region"; protected final static String MAX_MESSAGE_SIZE = "max_message_size"; @@ -75,6 +76,9 @@ public class ServerConfiguration extends protected final static String NUM_DELIVERY_THREADS = "num_delivery_threads"; protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger"; + protected final static String REBALANCE_TOLERANCE_PERCENTAGE = "rebalance_tolerance"; + protected final static String REBALANCE_MAX_SHED = "rebalance_max_shed"; + protected final static String REBALANCE_INTERVAL_SEC = "rebalance_interval_sec"; // manager related settings protected final static String METADATA_MANAGER_BASED_TOPIC_MANAGER_ENABLED = "metadata_manager_based_topic_manager_enabled"; @@ -153,7 +157,7 @@ public class ServerConfiguration extends /** * Maximum number of messages to read ahead. Default is 10. - * + * * @return int */ public int getReadAheadCount() { @@ -162,7 +166,7 @@ public class ServerConfiguration extends /** * Maximum number of bytes to read ahead. Default is 4MB. - * + * * @return long */ public long getReadAheadSizeBytes() { @@ -172,7 +176,7 @@ public class ServerConfiguration extends /** * Maximum cache size. By default is the smallest of 2G or * half the heap size. - * + * * @return long */ public long getMaximumCacheSize() { @@ -193,16 +197,16 @@ public class ServerConfiguration extends /** * After a scan of a log fails, how long before we retry (in msec) - * + * * @return long */ public long getScanBackoffPeriodMs() { return conf.getLong(SCAN_BACKOFF_MSEC, 1000); } - + /** * Returns server port. - * + * * @return int */ public int getServerPort() { @@ -211,7 +215,7 @@ public class ServerConfiguration extends /** * Returns SSL server port. - * + * * @return int */ public int getSSLServerPort() { @@ -220,7 +224,7 @@ public class ServerConfiguration extends /** * Returns ZooKeeper path prefix. - * + * * @return string */ public String getZkPrefix() { @@ -263,7 +267,7 @@ public class ServerConfiguration extends /** * Return ZooKeeper list of servers. Default is localhost. - * + * * @return String */ public String getZkHost() { @@ -276,16 +280,16 @@ public class ServerConfiguration extends /** * Return ZooKeeper session timeout. Default is 2s. - * + * * @return int */ public int getZkTimeout() { return conf.getInt(ZK_TIMEOUT, 2000); } - /** + /** * Returns true if read-ahead enabled. Default is true. - * + * * @return boolean */ public boolean getReadAheadEnabled() { @@ -296,7 +300,7 @@ public class ServerConfiguration extends /** * Returns true if standalone. Default is false. - * + * * @return boolean */ public boolean isStandalone() { @@ -304,8 +308,8 @@ public class ServerConfiguration extends } /** - * Returns list of regions. - * + * Returns list of regions. + * * @return List */ public List getRegions() { @@ -317,7 +321,7 @@ public class ServerConfiguration extends /** * Returns the name of the SSL certificate if available as a resource. - * + * * @return String */ public String getCertName() { @@ -326,7 +330,7 @@ public class ServerConfiguration extends /** * This is the path to the SSL certificate if it is available as a file. - * + * * @return String */ public String getCertPath() { @@ -351,7 +355,7 @@ public class ServerConfiguration extends /** * Returns the password used for BookKeeper ledgers. Default * is the empty string. - * + * * @return */ public String getPassword() { @@ -360,7 +364,7 @@ public class ServerConfiguration extends /** * Returns true if SSL is enabled. Default is false. - * + * * @return boolean */ public boolean isSSLEnabled() { @@ -372,7 +376,7 @@ public class ServerConfiguration extends * information about consumed messages. A value greater than * one avoids persisting information about consumed messages * upon every consumed message. Default is 50. - * + * * @return int */ public int getConsumeInterval() { @@ -383,7 +387,7 @@ public class ServerConfiguration extends * Returns the interval to release a topic. If this * parameter is greater than zero, then schedule a * task to release an owned topic. Default is 0 (never released). - * + * * @return int */ public int getRetentionSecs() { @@ -422,7 +426,7 @@ public class ServerConfiguration extends /** * True if SSL is enabled across regions. - * + * * @return boolean */ public boolean isInterRegionSSLEnabled() { @@ -430,10 +434,10 @@ public class ServerConfiguration extends } /** - * This parameter is used to determine how often we run the - * SubscriptionManager's Messages Consumed timer task thread + * This parameter is used to determine how often we run the + * SubscriptionManager's Messages Consumed timer task thread * (in milliseconds). - * + * * @return int */ public int getMessagesConsumedThreadRunInterval() { @@ -444,7 +448,7 @@ public class ServerConfiguration extends * This parameter is used to determine how often we run a thread * to retry those failed remote subscriptions in asynchronous mode * (in milliseconds). - * + * * @return int */ public int getRetryRemoteSubscribeThreadRunInterval() { @@ -455,7 +459,7 @@ public class ServerConfiguration extends * This parameter is for setting the default maximum number of messages which * can be delivered to a subscriber without being consumed. * we pause messages delivery to a subscriber when reaching the window size - * + * * @return int */ public int getDefaultMessageWindowSize() { @@ -466,7 +470,7 @@ public class ServerConfiguration extends * This parameter is used when Bookkeeper is the persistence * store and indicates what the ensemble size is (i.e. how * many bookie servers to stripe the ledger entries across). - * + * * @return int */ public int getBkEnsembleSize() { @@ -478,7 +482,7 @@ public class ServerConfiguration extends * This parameter is used when Bookkeeper is the persistence store * and indicates what the quorum size is (i.e. how many redundant * copies of each ledger entry is written). - * + * * @return int * @deprecated please use #getBkWriteQuorumSize() and #getBkAckQuorumSize() */ @@ -525,6 +529,33 @@ public class ServerConfiguration extends return conf.getLong(MAX_ENTRIES_PER_LEDGER, 0L); } + /** + * Get the tolerance percentage for the rebalancer. The rebalancer will not + * shed load if it's current load is less than average + average*tolerancePercentage/100.0 + * + * @return the tolerance percentage for the rebalancer. + */ + public double getRebalanceTolerance() { + return conf.getDouble(REBALANCE_TOLERANCE_PERCENTAGE, 10.0); + } + + /** + * Get the maximum load the rebalancer can shed at once. Default is 50. + * @return + */ + public HubLoad getRebalanceMaxShed() { + return new HubLoad(conf.getLong(REBALANCE_MAX_SHED, 50)); + } + + /** + * Get the interval(in seconds) between rebalancing attempts. The default is + * 5 minutes. + * @return + */ + public long getRebalanceInterval() { + return conf.getLong(REBALANCE_INTERVAL_SEC, 300); + } + /* * Is this a valid configuration that we can run with? This code might grow * over time. @@ -553,7 +584,14 @@ public class ServerConfiguration extends throw new ConfigurationException("BK write quorum size (" + getBkWriteQuorumSize() + ") is less than the ack quorum size (" + getBkAckQuorumSize() + ")"); } - + // Validate that the rebalance tolerance percentage is not negative. + if (getRebalanceTolerance() < 0.0) { + throw new ConfigurationException("The rebalance tolerance percentage cannot be negative."); + } + // Validate that the maximum load to shed during a rebalance is not negative. + if (getRebalanceMaxShed().getNumTopics() < 0L) { + throw new ConfigurationException("The maximum load to shed during a rebalance cannot be negative."); + } // add other checks here } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Fri Mar 7 17:39:37 2014 @@ -20,25 +20,25 @@ package org.apache.hedwig.server.topics; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.hedwig.exceptions.PubSubException; +import org.apache.hedwig.server.common.ServerConfiguration; +import org.apache.hedwig.server.common.TopicOpQueuer; +import org.apache.hedwig.util.Callback; +import org.apache.hedwig.util.CallbackUtils; +import org.apache.hedwig.util.HedwigSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.protobuf.ByteString; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.common.TopicOpQueuer; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.CallbackUtils; -import org.apache.hedwig.util.HedwigSocketAddress; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; public abstract class AbstractTopicManager implements TopicManager { @@ -204,15 +204,17 @@ public abstract class AbstractTopicManag public void operationFailed(final Object ctx, final PubSubException exception) { // TODO: optimization: we can release this as soon as we experience the first error. Callback cb = new Callback() { + @Override public void operationFinished(Object _ctx, Void _resultOfOperation) { originalCallback.operationFailed(ctx, exception); } + @Override public void operationFailed(Object _ctx, PubSubException _exception) { logger.error("Exception releasing topic", _exception); originalCallback.operationFailed(ctx, exception); } }; - + realReleaseTopic(topic, cb, originalContext); } }; @@ -241,6 +243,52 @@ public abstract class AbstractTopicManag queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, cb, ctx)); } + @Override + public final void releaseTopics(int numTopics, final Callback callback, final Object ctx) { + // This is a best effort function. We sacrifice accuracy to not hold a lock on the topics set. + List topicList = getTopicList(); + // Make sure we release only as many topics as we own. + final long numTopicsToRelease = Math.min(topicList.size(), numTopics); + // Shuffle the list of topics we own, so that we release a random subset. + Collections.shuffle(topicList); + Callback mcb = CallbackUtils.multiCallback((int)numTopicsToRelease, new Callback() { + @Override + public void operationFinished(Object ctx, Void ignoreVal) { + callback.operationFinished(ctx, numTopicsToRelease); + } + + @Override + public void operationFailed(Object ctx, PubSubException e) { + long notReleased = 0; + if (e instanceof PubSubException.CompositeException) { + notReleased = ((PubSubException.CompositeException)e).getExceptions().size(); + } + callback.operationFinished(ctx, numTopicsToRelease - notReleased); + } + }, ctx); + + // Try to release "numTopicsToRelease" topics. It's okay if we're not + // able to release some topics. We signal that we tried by invoking the callback's + // operationFinished() with the actual number of topics released. + logger.info("This hub is releasing {} topics", numTopicsToRelease); + long releaseCount = 0; + for (ByteString topic : topicList) { + if (++releaseCount > numTopicsToRelease) { + break; + } + releaseTopic(topic, mcb, ctx); + } + } + + @Override + public List getTopicList() { + List topicList; + synchronized (this.topics) { + topicList = Lists.newArrayList(this.topics.asMap().keySet()); + } + return topicList; + } + /** * This method should "return" the owner of the topic if one has been chosen * already. If there is no pre-chosen owner, either this hub or some other Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java?rev=1575338&r1=1575337&r2=1575338&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java Fri Mar 7 17:39:37 2014 @@ -38,6 +38,8 @@ public class HubLoad implements Comparab public static final HubLoad MIN_LOAD = new HubLoad(0); public static class InvalidHubLoadException extends Exception { + private static final long serialVersionUID = 5870487176956413387L; + public InvalidHubLoadException(String msg) { super(msg); } @@ -48,7 +50,7 @@ public class HubLoad implements Comparab } // how many topics that a hub server serves - long numTopics; + long numTopics; public HubLoad(long num) { this.numTopics = num; @@ -58,11 +60,16 @@ public class HubLoad implements Comparab this.numTopics = data.getNumTopics(); } + // TODO: Make this threadsafe (BOOKKEEPER-379) public HubLoad setNumTopics(long numTopics) { this.numTopics = numTopics; return this; } + public long getNumTopics() { + return this.numTopics; + } + public HubLoadData toHubLoadData() { return HubLoadData.newBuilder().setNumTopics(numTopics).build(); } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java Fri Mar 7 17:39:37 2014 @@ -105,4 +105,20 @@ interface HubServerManager { * Callback context. */ public void chooseLeastLoadedHub(Callback callback, Object ctx); + + /** + * Try to rebalance the load within the cluster. This function will get + * the {@link HubLoad} from all available hubs within the cluster, and then + * shed additional load. + * + * @param tolerancePercentage + * the percentage of load above average that is permissible. + * @param maxLoadToShed + * the maximum amount of load to shed per call. + * @param callback + * Callback indicating whether we reduced load or not. + * @param ctx + */ + public void rebalanceCluster(double tolerancePercentage, HubLoad maxLoadToShed, + Callback callback, Object ctx); } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java Fri Mar 7 17:39:37 2014 @@ -60,14 +60,14 @@ public class MMTopicManager extends Abst // all of the Ops put into the queuer will fail automatically. protected volatile boolean isSuspended = false; - public MMTopicManager(ServerConfiguration cfg, ZooKeeper zk, + public MMTopicManager(ServerConfiguration cfg, ZooKeeper zk, MetadataManagerFactory mmFactory, ScheduledExecutorService scheduler) throws UnknownHostException, PubSubException { super(cfg, scheduler); // initialize topic ownership manager this.mm = mmFactory.newTopicOwnershipManager(); - this.hubManager = new ZkHubServerManager(cfg, zk, addr); + this.hubManager = new ZkHubServerManager(cfg, zk, addr, this); final SynchronousQueue> queue = new SynchronousQueue>(); @@ -289,6 +289,11 @@ public class MMTopicManager extends Abst @Override protected void postReleaseCleanup(final ByteString topic, final Callback cb, final Object ctx) { + + // Reduce load. We've removed the topic from our topic set, so do this as well. + // When we reclaim the topic, we will increment the load again. + hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size())); + mm.readOwnerInfo(topic, new Callback>() { @Override public void operationFinished(Object ctx, Versioned owner) { Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java?rev=1575338&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java (added) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java Fri Mar 7 17:39:37 2014 @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hedwig.server.topics; + +import java.util.List; +import java.util.Map; + +import org.apache.hedwig.exceptions.PubSubException; +import org.apache.hedwig.util.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; + +/** + * Shed load by releasing topics. + */ +public class TopicBasedLoadShedder { + private static final Logger logger = LoggerFactory.getLogger(TopicBasedLoadShedder.class); + private final double tolerancePercentage; + private final long maxLoadToShed; + private final TopicManager tm; + private final List topicList; + + /** + * @param tm The topic manager used to handle load shedding + * @param tolerancePercentage The tolerance percentage for shedding load + * @param maxLoadToShed The maximum amoung of load to shed in one call. + */ + public TopicBasedLoadShedder(TopicManager tm, double tolerancePercentage, + HubLoad maxLoadToShed) { + // Make sure that all functions in this class have a consistent view + // of the load. So, we use the same topic list throughout. + this(tm, tm.getTopicList(), tolerancePercentage, maxLoadToShed); + } + + /** + * This is public because it makes testing easier. + * @param tm The topic manager used to handle load shedding + * @param topicList The topic list representing topics owned by this hub. + * @param tolerancePercentage The tolerance percentage for shedding load + * @param maxLoadToShed The maximum amoung of load to shed in one call. + */ + TopicBasedLoadShedder(TopicManager tm, List topicList, + double tolerancePercentage, + HubLoad maxLoadToShed) { + this.tolerancePercentage = tolerancePercentage; + this.maxLoadToShed = maxLoadToShed.getNumTopics(); + this.tm = tm; + this.topicList = topicList; + } + + /** + * Reduce the load on the current hub so that it reaches the target load. + * We reduce load by releasing topics using the {@link TopicManager} passed + * to the constructor. We use {@link TopicManager#releaseTopics(int, org.apache.hedwig.util.Callback, Object)} + * to actually release topics. + * + * @param targetLoad + * @param callback + * a Callback that indicates how many topics we tried to release. + * @param ctx + */ + public void reduceLoadTo(HubLoad targetLoad, final Callback callback, final Object ctx) { + int targetTopics = (int)targetLoad.toHubLoadData().getNumTopics(); + int numTopicsToRelease = topicList.size() - targetTopics; + + // The number of topics we own is less than the target topic size. We don't release + // any topics in this case. + if (numTopicsToRelease <= 0) { + callback.operationFinished(ctx, 0L); + return; + } + // Call releaseTopics() on the topic manager to do this. We let the manager handle the release + // policy. + tm.releaseTopics(numTopicsToRelease, callback, ctx); + } + + /** + * Calculate the average number of topics on the currently active hubs and release topics + * if required. + * We shed topics if we currently hold topics greater than average + average * tolerancePercentage/100.0 + * We shed a maximum of maxLoadToShed topics + * We also hold on to at least one topic. + * @param loadMap + * @param callback + * A return value of true means we tried to rebalance. False means that there was + * no need to rebalance. + * @param ctx + */ + public void shedLoad(final Map loadMap, final Callback callback, + final Object ctx) { + + long totalTopics = 0L; + long myTopics = topicList.size(); + for (Map.Entry entry : loadMap.entrySet()) { + if (null == entry.getKey() || null == entry.getValue()) { + continue; + } + totalTopics += entry.getValue().toHubLoadData().getNumTopics(); + } + + double averageTopics = (double)totalTopics/loadMap.size(); + logger.info("Total topics in the cluster : {}. Average : {}.", totalTopics, averageTopics); + + // Handle the case when averageTopics == 0. We hold on to at least 1 topic. + long permissibleTopics = + Math.max(1L, (long) Math.ceil(averageTopics + averageTopics * tolerancePercentage / 100.0)); + logger.info("Permissible topics : {}. Number of topics this hub holds : {}.", permissibleTopics, myTopics); + if (myTopics <= permissibleTopics) { + // My owned topics are less than those permitted by the current tolerance level. No need to release + // any topics. + callback.operationFinished(ctx, false); + return; + } + + // The number of topics I own is more than what I should be holding. We shall now attempt to shed some load. + // We shed at most maxLoadToShed number of topics. We also hold on to at least 1 topic. + long targetNumTopics = Math.max(1L, Math.max((long)Math.ceil(averageTopics), myTopics - maxLoadToShed)); + + // Reduce the load on the current hub to the target load we calculated above. + logger.info("Reducing load on this hub to {} topics.", targetNumTopics); + reduceLoadTo(new HubLoad(targetNumTopics), new Callback() { + @Override + public void operationFinished(Object ctx, Long numReleased) { + logger.info("Released {} topics to shed load.", numReleased); + callback.operationFinished(ctx, true); + } + + @Override + public void operationFailed(Object ctx, PubSubException e) { + callback.operationFailed(ctx, e); + } + }, ctx); + } +} Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java Fri Mar 7 17:39:37 2014 @@ -23,6 +23,8 @@ import org.apache.hedwig.server.persiste import org.apache.hedwig.util.Callback; import org.apache.hedwig.util.HedwigSocketAddress; +import java.util.List; + /** * An implementor of this interface is basically responsible for ensuring that * there is at most a single host responsible for a given topic at a given time. @@ -81,6 +83,23 @@ public interface TopicManager { public void releaseTopic(ByteString topic, Callback cb, Object ctx); /** + * Release numTopics topics. If you hold fewer, release all. + * @param numTopics + * Number of topics to release. + * @param callback + * The callback should be invoked with the number of topics the hub + * released successfully. + * @param ctx + */ + public void releaseTopics(int numTopics, Callback callback, Object ctx); + + /** + * Get the list of topics this hub believes it is responsible for. + * @return + */ + public List getTopicList(); + + /** * Stop topic manager */ public void stop(); Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java Fri Mar 7 17:39:37 2014 @@ -17,11 +17,16 @@ */ package org.apache.hedwig.server.topics; +import static com.google.common.base.Charsets.UTF_8; + import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; - -import static com.google.common.base.Charsets.UTF_8; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.hedwig.exceptions.PubSubException; import org.apache.hedwig.server.common.ServerConfiguration; @@ -38,7 +43,6 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +58,7 @@ class ZkHubServerManager implements HubS private final ServerConfiguration conf; private final ZooKeeper zk; private final HedwigSocketAddress addr; + private final TopicManager tm; private final String ephemeralNodePath; private final String hubNodesPath; @@ -61,6 +66,7 @@ class ZkHubServerManager implements HubS protected HubInfo myHubInfo; protected volatile boolean isSuspended = false; protected ManagerListener listener = null; + protected final ScheduledExecutorService executor; // upload hub server load to zookeeper StatCallback loadReportingStatCallback = new StatCallback() { @@ -100,25 +106,90 @@ class ZkHubServerManager implements HubS if (event.getState().equals(Watcher.Event.KeeperState.Expired)) { logger.error("ZK client connection to the ZK server has expired.!"); if (null != listener) { + // Shutdown our executor NOW! + executor.shutdownNow(); listener.onShutdown(); } } } } + class RebalanceRunnable implements Runnable { + private final double tolerancePercentage; + private final HubLoad maxLoadToShed; + private final long delaySeconds; + + public RebalanceRunnable(double tolerancePercentage, + HubLoad maxLoadToShed, + long delaySeconds) { + this.tolerancePercentage = tolerancePercentage; + this.maxLoadToShed = maxLoadToShed; + this.delaySeconds = delaySeconds; + } + + @Override + public void run() { + // If we are in suspended state, don't attempt a rebalance. + if (isSuspended) { + executor.schedule(this, delaySeconds, TimeUnit.SECONDS); + return; + } + // We should attempt a rebalance. We reschedule the job at the tail so that + // two rebalances don't happen simultaneously. + rebalanceCluster(tolerancePercentage, maxLoadToShed, new Callback() { + private void reschedule(Runnable task) { + executor.schedule(task, delaySeconds, TimeUnit.SECONDS); + } + + @Override + public void operationFinished(Object ctx, Boolean didRebalance) { + if (didRebalance == true) { + logger.info("The attempt to rebalance the cluster was successful"); + } else { + logger.info("There was no need to rebalance."); + } + // Our original runnable was passed as the context. + reschedule((Runnable)ctx); + } + + @Override + public void operationFailed(Object ctx, PubSubException e) { + logger.error("The attempt to rebalance the cluster did not succeed.", e); + // Reschedule the job + reschedule((Runnable)ctx); + } + }, this); + } + + public void start() { + // Initiate only if delaySeconds > 0 + if (delaySeconds > 0) { + logger.info("Starting the rebalancer thread with tolerance={}, maxLoadToShed={} and delay={}", + new Object[] { tolerancePercentage, maxLoadToShed.getNumTopics(), delaySeconds }); + executor.schedule(this, delaySeconds, TimeUnit.SECONDS); + } + } + } + public ZkHubServerManager(ServerConfiguration conf, ZooKeeper zk, - HedwigSocketAddress addr) { + HedwigSocketAddress addr, + TopicManager tm) { this.conf = conf; this.zk = zk; this.addr = addr; - + this.tm = tm; // znode path to store all available hub servers this.hubNodesPath = this.conf.getZkHostsPrefix(new StringBuilder()).toString(); // the node's ephemeral node path this.ephemeralNodePath = getHubZkNodePath(addr); + this.executor = Executors.newSingleThreadScheduledExecutor(); // register available hub servers list watcher zk.register(new ZkHubsWatcher()); + + // Start the rebalancer here. + new RebalanceRunnable(conf.getRebalanceTolerance(), conf.getRebalanceMaxShed(), + conf.getRebalanceInterval()).start(); } @Override @@ -157,7 +228,7 @@ class ZkHubServerManager implements HubS return; } else { callback.operationFailed(ctx, - new PubSubException.ServiceDownException( + new PubSubException.ServiceDownException( "I can't state my hub node after I created it : " + ephemeralNodePath)); return; @@ -167,7 +238,7 @@ class ZkHubServerManager implements HubS return; } if (rc != Code.NODEEXISTS.intValue()) { - KeeperException ke = ZkUtils .logErrorAndCreateZKException( + KeeperException ke = ZkUtils.logErrorAndCreateZKException( "Could not create ephemeral node to register hub", ephemeralNodePath, rc); callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke)); return; @@ -283,7 +354,7 @@ class ZkHubServerManager implements HubS if (numResponses == children.size()) { if (leastLoaded == null) { - callback.operationFailed(ctx, + callback.operationFailed(ctx, new PubSubException.ServiceDownException("No hub available")); return; } @@ -305,4 +376,95 @@ class ZkHubServerManager implements HubS dataCallback, child); } } + + /** + * Get a map of all currently active hubs with their advertised load. + * @param callback + * @param originalCtx + */ + private void getActiveHubsInfoWithLoad(final Callback> callback, + final Object originalCtx) { + // Get the list of children and then for each child, get the data. All asynchronously. + zk.getChildren(hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() { + @Override + public void safeProcessResult(int rc, String path, Object ctx, final List children) { + if (rc != Code.OK.intValue()) { + KeeperException e = ZkUtils.logErrorAndCreateZKException( + "Could not get children for given path", path, rc); + callback.operationFailed(ctx, new PubSubException.ServiceDownException(e)); + return; + } + + // The data callback for every child node + SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() { + Map loadMap = new HashMap(); + int numResponse = 0; + @Override + public void safeProcessResult(int rc, String path, Object dataCtx, + byte[] data, Stat stat) { + synchronized (this) { + if (rc == Code.OK.intValue()) { + // Put this load in the map. dataCtx is actually the child string which is the + // IP:PORT:SSL representation of the hub. + try { + HubInfo hubInfo = + new HubInfo(new HedwigSocketAddress((String)dataCtx), stat.getCzxid()); + HubLoad hubLoad = HubLoad.parse(new String(data, UTF_8)); + this.loadMap.put(hubInfo, hubLoad); + } catch (HubLoad.InvalidHubLoadException e) { + logger.warn("Corrupt data found for a hub. Ignoring."); + } + } + numResponse++; + if (numResponse == children.size()) { + // We got less number of valid responses than the hubs we saw previously. + // Signal an error. + if (loadMap.size() != numResponse) { + callback.operationFailed(originalCtx, + new PubSubException.UnexpectedConditionException( + "Fewer OK responses than the number of active hubs seen previously.")); + return; + } + // We've seen all responses. All OK. + callback.operationFinished(originalCtx, loadMap); + } + } + } + }; + + for (String child : children) { + String znode = conf.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString(); + zk.getData(znode, false, dataCallback, child); + } + } + }, originalCtx); + } + + @Override + public void rebalanceCluster(final double tolerancePercentage, final HubLoad maxLoadToShed, + final Callback callback, final Object ctx) { + // Get the load on all active hubs and then shed load if required. + getActiveHubsInfoWithLoad(new Callback>() { + @Override + public void operationFinished(Object ctx, Map loadMap) { + if (null == tm) { + // No topic manager, so no load to shed. + callback.operationFinished(ctx, false); + return; + } + TopicBasedLoadShedder tbls = new TopicBasedLoadShedder(tm, + tolerancePercentage, maxLoadToShed); + tbls.shedLoad(loadMap, callback, ctx); + } + + @Override + public void operationFailed(Object ctx, PubSubException e) { + // Rebalance failed. Log this and signal failure on the callback. + logger.error("Failed to get active hubs. Cannot attempt a rebalance."); + callback.operationFailed(ctx, e); + } + }, ctx); + } + + } Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Fri Mar 7 17:39:37 2014 @@ -77,7 +77,7 @@ public class ZkTopicManager extends Abst super(cfg, scheduler); this.zk = zk; - this.hubManager = new ZkHubServerManager(cfg, zk, addr); + this.hubManager = new ZkHubServerManager(cfg, zk, addr, this); myHubLoad = new HubLoad(topics.size()); this.hubManager.registerListener(new HubServerManager.ManagerListener() { @@ -275,6 +275,10 @@ public class ZkTopicManager extends Abst @Override protected void postReleaseCleanup(final ByteString topic, final Callback cb, Object ctx) { + // Reduce load. We've removed the topic from our topic set, so do this as well. + // When we reclaim the topic, we will increment the load again. + hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size())); + zk.getData(hubPath(topic), false, new SafeAsyncZKCallback.DataCallback() { @Override public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) { Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java?rev=1575338&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java (added) +++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java Fri Mar 7 17:39:37 2014 @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hedwig.server.topics; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.SynchronousQueue; + +import junit.framework.Assert; + +import org.apache.hedwig.exceptions.PubSubException; +import org.apache.hedwig.util.Callback; +import org.apache.hedwig.util.ConcurrencyUtils; +import org.apache.hedwig.util.HedwigSocketAddress; +import org.junit.Test; + +import com.google.protobuf.ByteString; + +public class TestTopicBasedLoadShedder { + + final protected SynchronousQueue statusQueue = new SynchronousQueue(); + private int myTopics = 10; + private int numHubs = 10; + private List mockTopicList; + private final HubLoad infiniteMaxLoad = new HubLoad(10000000); + Map mockLoadMap = new HashMap(); + + class MockTopicBasedLoadShedder extends TopicBasedLoadShedder { + // This is set by the reduceLoadTo function. + public HubLoad targetLoad; + public MockTopicBasedLoadShedder(TopicManager tm, List topicList, + Double tolerancePercentage, HubLoad maxLoadToShed) { + super(tm, topicList, tolerancePercentage, maxLoadToShed); + } + @Override + public void reduceLoadTo(HubLoad targetLoad, final Callback callback, final Object ctx) { + this.targetLoad = targetLoad; + // Indicates that we released these many topics. + callback.operationFinished(ctx, targetLoad.toHubLoadData().getNumTopics()); + } + } + public Callback getShedLoadCallback(final MockTopicBasedLoadShedder ls, final HubLoad expected, + final Boolean shouldRelease, final Boolean shouldFail) { + return new Callback() { + @Override + public void operationFinished(Object o, Boolean aBoolean) { + Boolean status = false; + status = (aBoolean == shouldRelease); + if (shouldRelease) { + status &= (ls.targetLoad != null); + status &= (expected.numTopics == ls.targetLoad.numTopics); + } + final Boolean statusToPut = status; + new Thread(new Runnable() { + @Override + public void run() { + ConcurrencyUtils.put(statusQueue, statusToPut); + } + }).start(); + } + + @Override + public void operationFailed(Object o, PubSubException e) { + new Thread(new Runnable() { + @Override + public void run() { + ConcurrencyUtils.put(statusQueue, shouldFail); + } + }).start(); + } + }; + } + + private List getMockTopicList(int numTopics) { + List topics = new ArrayList(); + for (int i = 0; i < numTopics; i++) { + topics.add(ByteString.copyFromUtf8("MyTopic_" + i)); + } + return topics; + } + + private HubInfo getHubInfo(int hubNum) { + return new HubInfo(new HedwigSocketAddress("myhub.testdomain.foo"+hubNum+":4080:4080"), 0); + } + + private synchronized void initialize(int myTopics, int numHubs, int[] otherHubsLoad) { + if (null != otherHubsLoad) { + Assert.assertTrue(otherHubsLoad.length == numHubs - 1); + } + this.myTopics = myTopics; + mockTopicList = getMockTopicList(this.myTopics); + this.numHubs = numHubs; + this.mockLoadMap.clear(); + this.mockLoadMap.put(getHubInfo(0), new HubLoad(this.myTopics)); + for (int i = 1; i < this.numHubs; i++) { + this.mockLoadMap.put(getHubInfo(i), new HubLoad(otherHubsLoad[i-1])); + } + } + + private int[] getEqualLoadDistributionArray(int n, int load) { + if (n == 0) { + return null; + } + int[] retLoad = new int[n]; + Arrays.fill(retLoad, load); + return retLoad; + } + + @Test(timeout = 60000) + public synchronized void testAllHubsSameTopics() throws Exception { + // All hubs have the same number of topics. We should not release any topics even with a + // tolerance of 0.0. + initialize(10, 10, getEqualLoadDistributionArray(9, 10)); + MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad); + tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null); + Assert.assertTrue(statusQueue.take()); + } + + @Test(timeout = 60000) + public synchronized void testOneHubUnequalTopics() throws Exception { + // The hub has 20 topics while the average is 11. Should reduce the load to 11. + initialize(20, 10, getEqualLoadDistributionArray(9, 10)); + MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad); + tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null); + Assert.assertTrue(statusQueue.take()); + } + + @Test(timeout = 60000) + public synchronized void testOneHubUnequalTopicsWithTolerance() throws Exception { + // The hub has 20 topics and average is 11. Should still release as tolerance level of 50.0 is + // breached. Should get down to average. + initialize(20, 10, getEqualLoadDistributionArray(9, 10)); + MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 50.0, infiniteMaxLoad); + tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null); + Assert.assertTrue(statusQueue.take()); + + // A tolerance level of 100.0 should result in the hub not releasing topics. + tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 100.0, infiniteMaxLoad); + tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null); + Assert.assertTrue(statusQueue.take()); + } + + @Test(timeout = 60000) + public synchronized void testMaxLoadShed() throws Exception { + // The hub should not shed more than maxLoadShed topics. + initialize(20, 10, getEqualLoadDistributionArray(9, 10)); + MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, new HubLoad(5)); + // Our load should reduce to 15. + tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(15), true, false), null); + Assert.assertTrue(statusQueue.take()); + + // We should reduce to 11 even when maxLoadShed and average result in the same + // values + tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, new HubLoad(9)); + tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null); + Assert.assertTrue(statusQueue.take()); + } + + @Test(timeout = 60000) + public synchronized void testSingleHubLoadShed() throws Exception { + // If this is the only hub in the cluster, it should not release any topics. + initialize(20, 1, null); + MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad); + tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null); + Assert.assertTrue(statusQueue.take()); + } + + @Test(timeout = 60000) + public synchronized void testUnderloadedClusterLoadShed() throws Exception { + // Hold on to at least one topic while shedding load (if cluster is underloaded) + initialize(5, 10, getEqualLoadDistributionArray(9, 0)); + MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad); + tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(1), true, false), null); + Assert.assertTrue(statusQueue.take()); + } +}