Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CD8F311071 for ; Sun, 3 Aug 2014 07:36:04 +0000 (UTC) Received: (qmail 98454 invoked by uid 500); 3 Aug 2014 07:36:04 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 98396 invoked by uid 500); 3 Aug 2014 07:36:04 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 98375 invoked by uid 99); 3 Aug 2014 07:36:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Aug 2014 07:36:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 57EE5883323; Sun, 3 Aug 2014 07:36:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: busbey@apache.org To: commits@accumulo.apache.org Date: Sun, 03 Aug 2014 07:36:04 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/9] git commit: ACCUMULO-2694 Fix handling of tablet migrations for offline tables. Repository: accumulo Updated Branches: refs/heads/1.5.2-SNAPSHOT f848178e7 -> 6bbe12165 refs/heads/1.6.1-SNAPSHOT 39f405a3f -> 533983f10 refs/heads/master 3c11090ff -> f34f1d892 ACCUMULO-2694 Fix handling of tablet migrations for offline tables. * Adds a funtional test that fails due to not rebalancing * Fix master to clear migrations when it learns that a table has gone offline * Update master to periodically clean up migrations for offline tables * Fix balancers to make sure they log if they can't balance. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6bbe1216 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6bbe1216 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6bbe1216 Branch: refs/heads/1.5.2-SNAPSHOT Commit: 6bbe12165d81067651cc2d8c1c545fb31580d1a3 Parents: f848178 Author: Sean Busbey Authored: Fri Apr 18 16:44:54 2014 -0500 Committer: Sean Busbey Committed: Fri Aug 1 02:46:55 2014 -0500 ---------------------------------------------------------------------- server/pom.xml | 4 + .../apache/accumulo/server/master/Master.java | 34 +++++-- .../master/balancer/ChaoticLoadBalancer.java | 10 +- .../master/balancer/DefaultLoadBalancer.java | 12 ++- .../server/master/balancer/TabletBalancer.java | 91 ++++++++++++++++++ test/system/auto/stress/migrations.py | 98 ++++++++++++++++++++ 6 files changed, 240 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index bd61fe6..fb526af 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -34,6 +34,10 @@ gson + com.google.guava + guava + + jline jline http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/Master.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java index a2ad2e6..12f8fed 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/Master.java +++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java @@ -1924,7 +1924,8 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt while (stillMaster()) { if (!migrations.isEmpty()) { try { - cleanupMutations(); + cleanupOfflineMigrations(); + cleanupNonexistentMigrations(getConnector()); } catch (Exception ex) { log.error("Error cleaning up migrations", ex); } @@ -1933,12 +1934,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } - // If a migrating tablet splits, and the tablet dies before sending the - // master a message, the migration will refer to a non-existing tablet, - // so it can never complete. Periodically scan the metadata table and - // remove any migrating tablets that no longer exist. - private void cleanupMutations() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - Connector connector = getConnector(); + /** + * If a migrating tablet splits, and the tablet dies before sending the + * master a message, the migration will refer to a non-existing tablet, + * so it can never complete. Periodically scan the metadata table and + * remove any migrating tablets that no longer exist. + */ + private void cleanupNonexistentMigrations(final Connector connector) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { Scanner scanner = connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner); Set found = new HashSet(); @@ -1950,6 +1952,21 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } migrations.keySet().retainAll(found); } + + /** + * If migrating a tablet for a table that is offline, the migration + * can never succeed because no tablet server will load the tablet. + * check for offline tables and remove their migrations. + */ + private void cleanupOfflineMigrations() { + TableManager manager = TableManager.getInstance(); + for (String tableId : Tables.getIdToNameMap(instance).keySet()) { + TableState state = manager.getTableState(tableId); + if (TableState.OFFLINE == state) { + clearMigrations(tableId); + } + } + } } private class StatusThread extends Daemon { @@ -2418,6 +2435,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt @Override public void stateChanged(String tableId, TableState state) { nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state); + if (TableState.OFFLINE == state) { + clearMigrations(tableId); + } } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java index e14008a..92eca13 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java +++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java @@ -33,6 +33,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.state.TabletMigration; +import org.apache.log4j.Logger; import org.apache.thrift.TException; /** @@ -40,6 +41,7 @@ import org.apache.thrift.TException; * designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer. */ public class ChaoticLoadBalancer extends TabletBalancer { + private static final Logger log = Logger.getLogger(ChaoticLoadBalancer.class); Random r = new Random(); @Override @@ -75,6 +77,8 @@ public class ChaoticLoadBalancer extends TabletBalancer { } } + protected final OutstandingMigrations outstandingMigrations = new OutstandingMigrations(log); + /** * Will balance randomly, maintaining distribution */ @@ -83,8 +87,12 @@ public class ChaoticLoadBalancer extends TabletBalancer { Map numTablets = new HashMap(); List underCapacityTServer = new ArrayList(); - if (!migrations.isEmpty()) + if (!migrations.isEmpty()) { + outstandingMigrations.migrations = migrations; + constraintNotMet(outstandingMigrations); return 100; + } + resetBalancerErrors(); boolean moveMetadata = r.nextInt(4) == 0; long totalTablets = 0; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java index 1fcab46..9a970e7 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java +++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java @@ -302,16 +302,26 @@ public class DefaultLoadBalancer extends TabletBalancer { assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), entry.getValue())); } } - + + private static final NoTservers NO_SERVERS = new NoTservers(log); + + protected final OutstandingMigrations outstandingMigrations = new OutstandingMigrations(log); + @Override public long balance(SortedMap current, Set migrations, List migrationsOut) { // do we have any servers? if (current.size() > 0) { // Don't migrate if we have migrations in progress if (migrations.size() == 0) { + resetBalancerErrors(); if (getMigrations(current, migrationsOut)) return 1 * 1000; + } else { + outstandingMigrations.migrations = migrations; + constraintNotMet(outstandingMigrations); } + } else { + constraintNotMet(NO_SERVERS); } return 5 * 1000; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java index 69387d3..16c5dbc 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java +++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java @@ -17,11 +17,14 @@ package org.apache.accumulo.server.master.balancer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; +import com.google.common.collect.Iterables; + import org.apache.accumulo.trace.instrument.Tracer; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.KeyExtent; @@ -67,6 +70,11 @@ public abstract class TabletBalancer { /** * Ask the balancer if any migrations are necessary. + * + * If the balancer is going to self-abort due to some environmental constraint (e.g. it requires some minimum number of tservers, or a maximum number + * of outstanding migrations), it should issue a log message to alert operators. The message should be at WARN normally and at ERROR if the balancer knows that the + * problem can not self correct. It should not issue these messages more than once a minute. Subclasses can use the convenience methods of {@link #constraintNotMet()} and + * {@link #balanceSuccessful()} to accomplish this logging. * * @param current * The current table-summary state of all the online tablet servers. Read-only. @@ -79,6 +87,89 @@ public abstract class TabletBalancer { * This method will not be called when there are unassigned tablets. */ public abstract long balance(SortedMap current, Set migrations, List migrationsOut); + + private static final long ONE_SECOND = 1000l; + private boolean stuck = false; + private long stuckNotificationTime = -1l; + + protected static final long TIME_BETWEEN_BALANCER_WARNINGS = 60 * ONE_SECOND; + + /** + * A deferred call descendent TabletBalancers use to log why they can't continue. + * The call is deferred so that TabletBalancer can limit how often messages happen. + * + * Implementations should be reused as much as possible. + * + * Be sure to pass in a properly scoped Logger instance so that messages indicate + * what part of the system is having trouble. + */ + protected static abstract class BalancerProblem implements Runnable { + protected final Logger balancerLog; + public BalancerProblem(Logger logger) { + balancerLog = logger; + } + } + + /** + * If a TabletBalancer requires active tservers, it should use this problem to indicate when there are none. + * NoTservers is safe to share with anyone who uses the same Logger. TabletBalancers should have a single + * static instance. + */ + protected static class NoTservers extends BalancerProblem { + public NoTservers(Logger logger) { + super(logger); + } + + @Override + public void run() { + balancerLog.warn("Not balancing because we don't have any tservers"); + } + } + + /** + * If a TabletBalancer only balances when there are no outstanding migrations, it should use this problem + * to indicate when they exist. + * + * Iff a TabletBalancer makes use of the migrations member to provide samples, then OutstandingMigrations + * is not thread safe. + */ + protected static class OutstandingMigrations extends BalancerProblem { + public Set migrations = Collections.emptySet(); + + public OutstandingMigrations(Logger logger) { + super(logger); + } + + @Override + public void run() { + balancerLog.warn("Not balancing due to " + migrations.size() + " outstanding migrations."); + /* TODO ACCUMULO-2938 redact key extents in this output to avoid leaking protected information. */ + balancerLog.debug("Sample up to 10 outstanding migrations: " + Iterables.limit(migrations, 10)); + } + } + + /** + * Warn that a Balancer can't work because of some external restriction. + * Will not call the provided logging handler more often than TIME_BETWEEN_BALANCER_WARNINGS + */ + protected void constraintNotMet(BalancerProblem cause) { + if (!stuck) { + stuck = true; + stuckNotificationTime = System.currentTimeMillis(); + } else { + if ((System.currentTimeMillis() - stuckNotificationTime) > TIME_BETWEEN_BALANCER_WARNINGS) { + cause.run(); + stuckNotificationTime = System.currentTimeMillis(); + } + } + } + + /** + * Resets logging about problems meeting an external constraint on balancing. + */ + protected void resetBalancerErrors() { + stuck = false; + } /** * Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bbe1216/test/system/auto/stress/migrations.py ---------------------------------------------------------------------- diff --git a/test/system/auto/stress/migrations.py b/test/system/auto/stress/migrations.py index d07d7a8..9c94b88 100755 --- a/test/system/auto/stress/migrations.py +++ b/test/system/auto/stress/migrations.py @@ -76,7 +76,105 @@ class ChaoticBalancerIntegrity(SunnyDayTest): self.shell(self.masterHost(), 'flush -t test_ingest') self.waitForStop(self.verify(self.masterHost(), self.options.rows), 60) +# Check for ACCUMULO-2694 +class BalanceInPresenceOfOfflineTable(SunnyDayTest): + """Start a new table, create many splits, and offline before they can rebalance. Then try to have a different table balance""" + + order = 98 + + settings = TestUtilsMixin.settings.copy() + settings.update({ + 'tserver.memory.maps.max':'10K', + 'tserver.compaction.major.delay': 0, + }) + tableSettings = SunnyDayTest.tableSettings.copy() + tableSettings['test_ingest'] = { + 'table.split.threshold': '10K', + } + def setUp(self): + # ensure we have two servers + if len(self.options.hosts) == 1: + self.options.hosts.append('localhost') + self.options.hosts = self.options.hosts[:2] + + TestUtilsMixin.setUp(self); + + # create a table with 200 splits + import tempfile + fileno, filename = tempfile.mkstemp() + fp = os.fdopen(fileno, "wb") + try: + for i in range(200): + fp.write("%08x\n" % (i * 1000)) + finally: + fp.close() + self.createTable('unused', filename) + out,err,code = self.shell(self.masterHost(), 'offline -t unused\n') + self.processResult(out,err,code); + + # create an empty table + self.createTable('test_ingest') + + def runTest(self): + + # start test ingestion + log.info("Starting Test Ingester") + self.ingester = self.ingest(self.masterHost(), + 200000, + size=self.options.size) + self.waitForStop(self.ingester, self.timeout_factor * 120) + self.shell(self.masterHost(), 'flush -t test_ingest\n') + self.waitForStop(self.verify(self.masterHost(), self.options.rows), 60) + + # let the server split tablets and move them around + # Keep retrying until the wait period for migration cleanup has passed + # which is hard coded to 5 minutes. :/ + startTime = time.clock() + currentWait = 10 + balancingWorked = False + while ((time.clock() - startTime) < 5*60+15): + self.sleep(currentWait) + # If we end up needing to sleep again, back off. + currentWait *= 2 + + # fetch the list of tablets from each server + h = self.runOn(self.masterHost(), + [self.accumulo_sh(), + 'org.apache.accumulo.test.GetMasterStats']) + out, err = h.communicate() + servers = {} + server = None + # if balanced based on ingest, the table that split due to ingest + # will be split evenly on both servers, not just one + table = '' + tableId = self.getTableId('test_ingest'); + for line in out.split('\n'): + if line.find(' Name: ') == 0: + server = line[7:] + servers.setdefault(server, 0) + if line.find('Table: ') >= 0: + table = line.split(' ')[-1] + if line.find(' Tablets: ') == 0: + if table == tableId: + servers[server] += int(line.split()[-1]) + log.info("Tablet counts " + repr(servers)) + + # we have two servers + if len(servers.values()) == 2: + servers = servers.values() + # a server has more than 10 splits + if servers[0] > 10: + # the ratio is roughly even + ratio = min(servers) / float(max(servers)) + if ratio > 0.5: + balancingWorked = True + break + log.debug("tablets not balanced, sleeping for %d seconds" % currentWait) + + self.assert_(balancingWorked) + def suite(): result = unittest.TestSuite() result.addTest(ChaoticBalancerIntegrity()) + result.addTest(BalanceInPresenceOfOfflineTable()) return result