Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 236AB1062B for ; Mon, 30 Dec 2013 19:11:39 +0000 (UTC) Received: (qmail 97486 invoked by uid 500); 30 Dec 2013 19:10:29 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 97106 invoked by uid 500); 30 Dec 2013 19:09:59 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 96997 invoked by uid 99); 30 Dec 2013 19:09:54 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Dec 2013 19:09:54 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 24F5788CD62; Mon, 30 Dec 2013 19:09:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Date: Mon, 30 Dec 2013 19:10:04 -0000 Message-Id: <9a17c0c034a7460d96323e29c81bce3a@git.apache.org> In-Reply-To: <122313e481d6456580bda73f405bc2fb@git.apache.org> References: <122313e481d6456580bda73f405bc2fb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/15] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT Conflicts: server/src/main/java/org/apache/accumulo/server/master/Master.java Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5e247981 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5e247981 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5e247981 Branch: refs/heads/1.6.0-SNAPSHOT Commit: 5e247981d13b7e2616ea05c05a2a44eeefc0d897 Parents: 063c88d 20cc9f4 Author: Eric Newton Authored: Mon Dec 30 14:08:18 2013 -0500 Committer: Eric Newton Committed: Mon Dec 30 14:08:18 2013 -0500 ---------------------------------------------------------------------- .../java/org/apache/accumulo/master/Master.java | 1 + .../functional/BalanceAfterCommsFailureIT.java | 90 ++++++++++++++++++++ 2 files changed, 91 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e247981/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java index 60ec46c,0000000..304366c mode 100644,000000..100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@@ -1,1215 -1,0 +1,1216 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.Namespaces; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.ThriftTransportPool; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.master.thrift.MasterClientService.Iface; +import org.apache.accumulo.core.master.thrift.MasterClientService.Processor; +import org.apache.accumulo.core.master.thrift.MasterGoalState; +import org.apache.accumulo.core.master.thrift.MasterState; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.NamespacePermission; +import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.AgeOffStore; +import org.apache.accumulo.fate.Fate; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.master.recovery.RecoveryManager; +import org.apache.accumulo.master.state.TableCounts; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManager.FileType; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.init.Initialize; +import org.apache.accumulo.server.master.LiveTServerSet; +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer; +import org.apache.accumulo.server.master.balancer.TabletBalancer; +import org.apache.accumulo.server.master.state.CurrentState; +import org.apache.accumulo.server.master.state.DeadServerList; +import org.apache.accumulo.server.master.state.MergeInfo; +import org.apache.accumulo.server.master.state.MergeState; +import org.apache.accumulo.server.master.state.MetaDataStateStore; +import org.apache.accumulo.server.master.state.RootTabletStateStore; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletMigration; +import org.apache.accumulo.server.master.state.TabletState; +import org.apache.accumulo.server.master.state.ZooStore; +import org.apache.accumulo.server.master.state.ZooTabletStateStore; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.security.handler.ZKPermHandler; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.accumulo.server.tables.TableObserver; +import org.apache.accumulo.server.util.DefaultMap; +import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.accumulo.server.util.TServerUtils; +import org.apache.accumulo.server.util.TServerUtils.ServerAddress; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.trace.instrument.thrift.TraceWrap; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.server.TServer; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; + +/** + * The Master is responsible for assigning and balancing tablets to tablet servers. + * + * The master will also coordinate log recoveries and reports general status. + */ +public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState { + + final static Logger log = Logger.getLogger(Master.class); + + final static int ONE_SECOND = 1000; + final private static Text METADATA_TABLE_ID = new Text(MetadataTable.ID); + final private static Text ROOT_TABLE_ID = new Text(RootTable.ID); + final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND; + final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND; + final static long WAIT_BETWEEN_ERRORS = ONE_SECOND; + final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND; + final private static int MAX_CLEANUP_WAIT_TIME = ONE_SECOND; + final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND; + final static int MAX_TSERVER_WORK_CHUNK = 5000; + final private static int MAX_BAD_STATUS_COUNT = 3; + + final VolumeManager fs; + final private Instance instance; + final private String hostname; + final LiveTServerSet tserverSet; + final private List watchers = new ArrayList(); + final SecurityOperation security; + final Map badServers = Collections.synchronizedMap(new DefaultMap(new AtomicInteger())); + final Set serversToShutdown = Collections.synchronizedSet(new HashSet()); + final SortedMap migrations = Collections.synchronizedSortedMap(new TreeMap()); + final EventCoordinator nextEvent = new EventCoordinator(); + final private Object mergeLock = new Object(); + RecoveryManager recoveryManager = null; + + ZooLock masterLock = null; + private TServer clientService = null; + TabletBalancer tabletBalancer; + + private MasterState state = MasterState.INITIAL; + + Fate fate; + + volatile SortedMap tserverStatus = Collections.unmodifiableSortedMap(new TreeMap()); + + synchronized MasterState getMasterState() { + return state; + } + + public boolean stillMaster() { + return getMasterState() != MasterState.STOP; + } + + static final boolean X = true; + static final boolean _ = false; + // @formatter:off + static final boolean transitionOK[][] = { + // INITIAL HAVE_LOCK SAFE_MODE NORMAL UNLOAD_META UNLOAD_ROOT STOP + /* INITIAL */ {X, X, _, _, _, _, X}, + /* HAVE_LOCK */ {_, X, X, X, _, _, X}, + /* SAFE_MODE */ {_, _, X, X, X, _, X}, + /* NORMAL */ {_, _, X, X, X, _, X}, + /* UNLOAD_METADATA_TABLETS */ {_, _, X, X, X, X, X}, + /* UNLOAD_ROOT_TABLET */ {_, _, _, X, X, X, X}, + /* STOP */ {_, _, _, _, _, X, X}}; + //@formatter:on + synchronized void setMasterState(MasterState newState) { + if (state.equals(newState)) + return; + if (!transitionOK[state.ordinal()][newState.ordinal()]) { + log.error("Programmer error: master should not transition from " + state + " to " + newState); + } + MasterState oldState = state; + state = newState; + nextEvent.event("State changed from %s to %s", oldState, newState); + if (newState == MasterState.STOP) { + // Give the server a little time before shutdown so the client + // thread requesting the stop can return + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + // This frees the main thread and will cause the master to exit + clientService.stop(); + Master.this.nextEvent.event("stopped event loop"); + } + + }, 100l, 1000l); + } + + if (oldState != newState && (newState == MasterState.HAVE_LOCK)) { + upgradeZookeeper(); + } + + if (oldState != newState && (newState == MasterState.NORMAL)) { + upgradeMetadata(); + } + } + + private void moveRootTabletToRootTable(IZooReaderWriter zoo) throws Exception { + String dirZPath = ZooUtil.getRoot(instance) + RootTable.ZROOT_TABLET_PATH; + + if (!zoo.exists(dirZPath)) { + Path oldPath = fs.getFullPath(FileType.TABLE, "/!0/root_tablet"); + if (fs.exists(oldPath)) { + String newPath = fs.choose(ServerConstants.getTablesDirs()) + "/" + RootTable.ID; + fs.mkdirs(new Path(newPath)); + if (!fs.rename(oldPath, new Path(newPath))) { + throw new IOException("Failed to move root tablet from " + oldPath + " to " + newPath); + } + + log.info("Upgrade renamed " + oldPath + " to " + newPath); + } + + Path location = null; + + for (String basePath : ServerConstants.getTablesDirs()) { + Path path = new Path(basePath + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION); + if (fs.exists(path)) { + if (location != null) { + throw new IllegalStateException("Root table at multiple locations " + location + " " + path); + } + + location = path; + } + } + + if (location == null) + throw new IllegalStateException("Failed to find root tablet"); + + log.info("Upgrade setting root table location in zookeeper " + location); + zoo.putPersistentData(dirZPath, location.toString().getBytes(), NodeExistsPolicy.FAIL); + } + } + + private void upgradeZookeeper() { + if (Accumulo.getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) { + try { + log.info("Upgrading zookeeper"); + + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + + // create initial namespaces + String namespaces = ZooUtil.getRoot(instance) + Constants.ZNAMESPACES; + zoo.putPersistentData(namespaces, new byte[0], NodeExistsPolicy.SKIP); + if (!Namespaces.exists(instance, Namespaces.ACCUMULO_NAMESPACE_ID)) + TableManager.prepareNewNamespaceState(instance.getInstanceID(), Namespaces.ACCUMULO_NAMESPACE_ID, Namespaces.ACCUMULO_NAMESPACE, + NodeExistsPolicy.SKIP); + if (!Namespaces.exists(instance, Namespaces.DEFAULT_NAMESPACE_ID)) + TableManager.prepareNewNamespaceState(instance.getInstanceID(), Namespaces.DEFAULT_NAMESPACE_ID, Namespaces.DEFAULT_NAMESPACE, NodeExistsPolicy.SKIP); + + // create root table + if (!Tables.exists(instance, RootTable.ID)) { + TableManager.prepareNewTableState(instance.getInstanceID(), RootTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, TableState.ONLINE, + NodeExistsPolicy.SKIP); + Initialize.initMetadataConfig(RootTable.ID); + // ensure root user can flush root table + security.grantTablePermission(SystemCredentials.get().toThrift(instance), security.getRootUsername(), RootTable.ID, TablePermission.ALTER_TABLE); + } + + // put existing tables in the correct namespaces + String tables = ZooUtil.getRoot(instance) + Constants.ZTABLES; + for (Entry table : Tables.getIdToNameMap(instance).entrySet()) { + String targetNamespace = (MetadataTable.ID.equals(table.getKey()) || RootTable.ID.equals(table.getKey())) ? Namespaces.ACCUMULO_NAMESPACE_ID + : Namespaces.DEFAULT_NAMESPACE_ID; + zoo.putPersistentData(tables + "/" + table.getKey() + Constants.ZTABLE_NAMESPACE, targetNamespace.getBytes(Constants.UTF8), NodeExistsPolicy.SKIP); + } + + // rename metadata table + zoo.putPersistentData(tables + "/" + MetadataTable.ID + Constants.ZTABLE_NAME, Tables.qualify(MetadataTable.NAME).getSecond().getBytes(Constants.UTF8), + NodeExistsPolicy.OVERWRITE); + + moveRootTabletToRootTable(zoo); + + // add system namespace permissions to existing users + ZKPermHandler perm = new ZKPermHandler(); + perm.initialize(instance.getInstanceID(), true); + String users = ZooUtil.getRoot(instance) + "/users"; + for (String user : zoo.getChildren(users)) { + zoo.putPersistentData(users + "/" + user + "/Namespaces", new byte[0], NodeExistsPolicy.SKIP); + perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ); + } + perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE); + + } catch (Exception ex) { + log.fatal("Error performing upgrade", ex); + System.exit(1); + } + } + } + + private final AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false); + + private final ServerConfiguration serverConfig; + + private void upgradeMetadata() { + if (Accumulo.getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) { + if (upgradeMetadataRunning.compareAndSet(false, true)) { + Runnable upgradeTask = new Runnable() { + @Override + public void run() { + try { + MetadataTableUtil.moveMetaDeleteMarkers(instance, SystemCredentials.get()); + Accumulo.updateAccumuloVersion(fs); + + log.info("Upgrade complete"); + + } catch (Exception ex) { + log.fatal("Error performing upgrade", ex); + System.exit(1); + } + + } + }; + + // need to run this in a separate thread because a lock is held that prevents metadata tablets from being assigned and this task writes to the + // metadata table + new Thread(upgradeTask).start(); + } + } + } + + private int assignedOrHosted(Text tableId) { + int result = 0; + for (TabletGroupWatcher watcher : watchers) { + TableCounts count = watcher.getStats(tableId); + result += count.hosted() + count.assigned(); + } + return result; + } + + private int totalAssignedOrHosted() { + int result = 0; + for (TabletGroupWatcher watcher : watchers) { + for (TableCounts counts : watcher.getStats().values()) { + result += counts.assigned() + counts.hosted(); + } + } + return result; + } + + private int nonMetaDataTabletsAssignedOrHosted() { + return totalAssignedOrHosted() - assignedOrHosted(new Text(MetadataTable.ID)) - assignedOrHosted(new Text(RootTable.ID)); + } + + private int notHosted() { + int result = 0; + for (TabletGroupWatcher watcher : watchers) { + for (TableCounts counts : watcher.getStats().values()) { + result += counts.assigned() + counts.assignedToDeadServers(); + } + } + return result; + } + + // The number of unassigned tablets that should be assigned: displayed on the monitor page + int displayUnassigned() { + int result = 0; + switch (getMasterState()) { + case NORMAL: + // Count offline tablets for online tables + for (TabletGroupWatcher watcher : watchers) { + TableManager manager = TableManager.getInstance(); + for (Entry entry : watcher.getStats().entrySet()) { + Text tableId = entry.getKey(); + TableCounts counts = entry.getValue(); + TableState tableState = manager.getTableState(tableId.toString()); + if (tableState != null && tableState.equals(TableState.ONLINE)) { + result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned(); + } + } + } + break; + case SAFE_MODE: + // Count offline tablets for the metadata table + for (TabletGroupWatcher watcher : watchers) { + result += watcher.getStats(METADATA_TABLE_ID).unassigned(); + } + break; + case UNLOAD_METADATA_TABLETS: + case UNLOAD_ROOT_TABLET: + for (TabletGroupWatcher watcher : watchers) { + result += watcher.getStats(METADATA_TABLE_ID).unassigned(); + } + break; + default: + break; + } + return result; + } + + public void mustBeOnline(final String tableId) throws ThriftTableOperationException { + Tables.clearCache(instance); + if (!Tables.getTableState(instance, tableId).equals(TableState.ONLINE)) + throw new ThriftTableOperationException(tableId, null, TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not online"); + } + + public Connector getConnector() throws AccumuloException, AccumuloSecurityException { + return instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken()); + } + + private Master(ServerConfiguration config, VolumeManager fs, String hostname) throws IOException { + this.serverConfig = config; + this.instance = config.getInstance(); + this.fs = fs; + this.hostname = hostname; + + AccumuloConfiguration aconf = serverConfig.getConfiguration(); + + log.info("Version " + Constants.VERSION); + log.info("Instance " + instance.getInstanceID()); + ThriftTransportPool.getInstance().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); + security = AuditedSecurityOperation.getInstance(); + tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this); + this.tabletBalancer = aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer()); + this.tabletBalancer.init(serverConfig); + } + + public TServerConnection getConnection(TServerInstance server) { + return tserverSet.getConnection(server); + } + + public MergeInfo getMergeInfo(KeyExtent tablet) { + return getMergeInfo(tablet.getTableId()); + } + + public MergeInfo getMergeInfo(Text tableId) { + synchronized (mergeLock) { + try { + String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge"; + if (!ZooReaderWriter.getInstance().exists(path)) + return new MergeInfo(); + byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat()); + DataInputBuffer in = new DataInputBuffer(); + in.reset(data, data.length); + MergeInfo info = new MergeInfo(); + info.readFields(in); + return info; + } catch (KeeperException.NoNodeException ex) { + log.info("Error reading merge state, it probably just finished"); + return new MergeInfo(); + } catch (Exception ex) { + log.warn("Unexpected error reading merge state", ex); + return new MergeInfo(); + } + } + } + + public void setMergeState(MergeInfo info, MergeState state) throws IOException, KeeperException, InterruptedException { + synchronized (mergeLock) { + String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + info.getExtent().getTableId().toString() + "/merge"; + info.setState(state); + if (state.equals(MergeState.NONE)) { + ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP); + } else { + DataOutputBuffer out = new DataOutputBuffer(); + try { + info.write(out); + } catch (IOException ex) { + throw new RuntimeException("Unlikely", ex); + } + ZooReaderWriter.getInstance().putPersistentData(path, out.getData(), + state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL : ZooUtil.NodeExistsPolicy.OVERWRITE); + } + mergeLock.notifyAll(); + } + nextEvent.event("Merge state of %s set to %s", info.getExtent(), state); + } + + public void clearMergeState(Text tableId) throws IOException, KeeperException, InterruptedException { + synchronized (mergeLock) { + String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge"; + ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP); + mergeLock.notifyAll(); + } + nextEvent.event("Merge state of %s cleared", tableId); + } + + void setMasterGoalState(MasterGoalState state) { + try { + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, state.name().getBytes(), + NodeExistsPolicy.OVERWRITE); + } catch (Exception ex) { + log.error("Unable to set master goal state in zookeeper"); + } + } + + MasterGoalState getMasterGoalState() { + while (true) + try { + byte[] data = ZooReaderWriter.getInstance().getData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, null); + return MasterGoalState.valueOf(new String(data)); + } catch (Exception e) { + log.error("Problem getting real goal state: " + e); + UtilWaitThread.sleep(1000); + } + } + + public boolean hasCycled(long time) { + for (TabletGroupWatcher watcher : watchers) { + if (watcher.stats.lastScanFinished() < time) + return false; + } + + return true; + } + + public void clearMigrations(String tableId) { + synchronized (migrations) { + Iterator iterator = migrations.keySet().iterator(); + while (iterator.hasNext()) { + KeyExtent extent = iterator.next(); + if (extent.getTableId().toString().equals(tableId)) { + iterator.remove(); + } + } + } + } + + static enum TabletGoalState { + HOSTED, UNASSIGNED, DELETED + }; + + TabletGoalState getSystemGoalState(TabletLocationState tls) { + switch (getMasterState()) { + case NORMAL: + return TabletGoalState.HOSTED; + case HAVE_LOCK: // fall-through intended + case INITIAL: // fall-through intended + case SAFE_MODE: + if (tls.extent.isMeta()) + return TabletGoalState.HOSTED; + return TabletGoalState.UNASSIGNED; + case UNLOAD_METADATA_TABLETS: + if (tls.extent.isRootTablet()) + return TabletGoalState.HOSTED; + return TabletGoalState.UNASSIGNED; + case UNLOAD_ROOT_TABLET: + return TabletGoalState.UNASSIGNED; + case STOP: + return TabletGoalState.UNASSIGNED; + default: + throw new IllegalStateException("Unknown Master State"); + } + } + + TabletGoalState getTableGoalState(KeyExtent extent) { + TableState tableState = TableManager.getInstance().getTableState(extent.getTableId().toString()); + if (tableState == null) + return TabletGoalState.DELETED; + switch (tableState) { + case DELETING: + return TabletGoalState.DELETED; + case OFFLINE: + case NEW: + return TabletGoalState.UNASSIGNED; + default: + return TabletGoalState.HOSTED; + } + } + + TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) { + KeyExtent extent = tls.extent; + // Shutting down? + TabletGoalState state = getSystemGoalState(tls); + if (state == TabletGoalState.HOSTED) { + if (tls.current != null && serversToShutdown.contains(tls.current)) { + return TabletGoalState.UNASSIGNED; + } + // Handle merge transitions + if (mergeInfo.getExtent() != null) { + log.debug("mergeInfo overlaps: " + extent + " " + mergeInfo.overlaps(extent)); + if (mergeInfo.overlaps(extent)) { + switch (mergeInfo.getState()) { + case NONE: + case COMPLETE: + break; + case STARTED: + case SPLITTING: + return TabletGoalState.HOSTED; + case WAITING_FOR_CHOPPED: + if (tls.getState(onlineTabletServers()).equals(TabletState.HOSTED)) { + if (tls.chopped) + return TabletGoalState.UNASSIGNED; + } else { + if (tls.chopped && tls.walogs.isEmpty()) + return TabletGoalState.UNASSIGNED; + } + + return TabletGoalState.HOSTED; + case WAITING_FOR_OFFLINE: + case MERGING: + return TabletGoalState.UNASSIGNED; + } + } + } + + // taking table offline? + state = getTableGoalState(extent); + if (state == TabletGoalState.HOSTED) { + // Maybe this tablet needs to be migrated + TServerInstance dest = migrations.get(extent); + if (dest != null && tls.current != null && !dest.equals(tls.current)) { + return TabletGoalState.UNASSIGNED; + } + } + } + return state; + } + + private class MigrationCleanupThread extends Daemon { + + @Override + public void run() { + setName("Migration Cleanup Thread"); + while (stillMaster()) { + if (!migrations.isEmpty()) { + try { + cleanupMutations(); + } catch (Exception ex) { + log.error("Error cleaning up migrations", ex); + } + } + UtilWaitThread.sleep(TIME_BETWEEN_MIGRATION_CLEANUPS); + } + } + + // If a migrating tablet splits, and the tablet dies before sending the + // master a message, the migration will refer to a non-existing tablet, + // so it can never complete. Periodically scan the metadata table and + // remove any migrating tablets that no longer exist. + private void cleanupMutations() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + Connector connector = getConnector(); + Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + Set found = new HashSet(); + for (Entry entry : scanner) { + KeyExtent extent = new KeyExtent(entry.getKey().getRow(), entry.getValue()); + if (migrations.containsKey(extent)) { + found.add(extent); + } + } + migrations.keySet().retainAll(found); + } + } + + private class StatusThread extends Daemon { + + @Override + public void run() { + setName("Status Thread"); + EventCoordinator.Listener eventListener = nextEvent.getListener(); + while (stillMaster()) { + long wait = DEFAULT_WAIT_FOR_WATCHER; + try { + switch (getMasterGoalState()) { + case NORMAL: + setMasterState(MasterState.NORMAL); + break; + case SAFE_MODE: + if (getMasterState() == MasterState.NORMAL) { + setMasterState(MasterState.SAFE_MODE); + } + if (getMasterState() == MasterState.HAVE_LOCK) { + setMasterState(MasterState.SAFE_MODE); + } + break; + case CLEAN_STOP: + switch (getMasterState()) { + case NORMAL: + setMasterState(MasterState.SAFE_MODE); + break; + case SAFE_MODE: { + int count = nonMetaDataTabletsAssignedOrHosted(); + log.debug(String.format("There are %d non-metadata tablets assigned or hosted", count)); + if (count == 0) + setMasterState(MasterState.UNLOAD_METADATA_TABLETS); + } + break; + case UNLOAD_METADATA_TABLETS: { + int count = assignedOrHosted(METADATA_TABLE_ID); + log.debug(String.format("There are %d metadata tablets assigned or hosted", count)); + if (count == 0) + setMasterState(MasterState.UNLOAD_ROOT_TABLET); + } + break; + case UNLOAD_ROOT_TABLET: { + int count = assignedOrHosted(METADATA_TABLE_ID); + if (count > 0) { + log.debug(String.format("%d metadata tablets online", count)); + setMasterState(MasterState.UNLOAD_ROOT_TABLET); + } + int root_count = assignedOrHosted(ROOT_TABLE_ID); + if (root_count > 0) + log.debug("The root tablet is still assigned or hosted"); + if (count + root_count == 0) { + Set currentServers = tserverSet.getCurrentServers(); + log.debug("stopping " + currentServers.size() + " tablet servers"); + for (TServerInstance server : currentServers) { + try { + serversToShutdown.add(server); + tserverSet.getConnection(server).fastHalt(masterLock); + } catch (TException e) { + // its probably down, and we don't care + } finally { + tserverSet.remove(server); + } + } + if (currentServers.size() == 0) + setMasterState(MasterState.STOP); + } + } + break; + default: + break; + } + } + wait = updateStatus(); + eventListener.waitForEvents(wait); + } catch (Throwable t) { + log.error("Error balancing tablets", t); + UtilWaitThread.sleep(WAIT_BETWEEN_ERRORS); + } + } + } + + private long updateStatus() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + tserverStatus = Collections.synchronizedSortedMap(gatherTableInformation()); + checkForHeldServer(tserverStatus); + + if (!badServers.isEmpty()) { + log.debug("not balancing because the balance information is out-of-date " + badServers.keySet()); + } else if (notHosted() > 0) { + log.debug("not balancing because there are unhosted tablets"); + } else if (getMasterGoalState() == MasterGoalState.CLEAN_STOP) { + log.debug("not balancing because the master is attempting to stop cleanly"); + } else if (!serversToShutdown.isEmpty()) { + log.debug("not balancing while shutting down servers " + serversToShutdown); + } else { + return balanceTablets(); + } + return DEFAULT_WAIT_FOR_WATCHER; + } + + private void checkForHeldServer(SortedMap tserverStatus) { + TServerInstance instance = null; + int crazyHoldTime = 0; + int someHoldTime = 0; + final long maxWait = getSystemConfiguration().getTimeInMillis(Property.TSERV_HOLD_TIME_SUICIDE); + for (Entry entry : tserverStatus.entrySet()) { + if (entry.getValue().getHoldTime() > 0) { + someHoldTime++; + if (entry.getValue().getHoldTime() > maxWait) { + instance = entry.getKey(); + crazyHoldTime++; + } + } + } + if (crazyHoldTime == 1 && someHoldTime == 1 && tserverStatus.size() > 1) { + log.warn("Tablet server " + instance + " exceeded maximum hold time: attempting to kill it"); + try { + TServerConnection connection = tserverSet.getConnection(instance); + if (connection != null) + connection.fastHalt(masterLock); + } catch (TException e) { + log.error(e, e); + } + tserverSet.remove(instance); + } + } + + private long balanceTablets() { + List migrationsOut = new ArrayList(); + Set migrationsCopy = new HashSet(); + synchronized (migrations) { + migrationsCopy.addAll(migrations.keySet()); + } + long wait = tabletBalancer.balance(Collections.unmodifiableSortedMap(tserverStatus), Collections.unmodifiableSet(migrationsCopy), migrationsOut); + + for (TabletMigration m : TabletBalancer.checkMigrationSanity(tserverStatus.keySet(), migrationsOut)) { + if (migrations.containsKey(m.tablet)) { + log.warn("balancer requested migration more than once, skipping " + m); + continue; + } + migrations.put(m.tablet, m.newServer); + log.debug("migration " + m); + } + if (migrationsOut.size() > 0) { + nextEvent.event("Migrating %d more tablets, %d total", migrationsOut.size(), migrations.size()); + } + return wait; + } + + } + + private SortedMap gatherTableInformation() { + long start = System.currentTimeMillis(); + SortedMap result = new TreeMap(); + Set currentServers = tserverSet.getCurrentServers(); + for (TServerInstance server : currentServers) { + try { + Thread t = Thread.currentThread(); + String oldName = t.getName(); + try { + t.setName("Getting status from " + server); + TServerConnection connection = tserverSet.getConnection(server); + if (connection == null) + throw new IOException("No connection to " + server); + TabletServerStatus status = connection.getTableMap(false); + result.put(server, status); + } finally { + t.setName(oldName); + } + } catch (Exception ex) { + log.error("unable to get tablet server status " + server + " " + ex.toString()); + log.debug("unable to get tablet server status " + server, ex); + if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) { + log.warn("attempting to stop " + server); + try { + TServerConnection connection = tserverSet.getConnection(server); + if (connection != null) + connection.halt(masterLock); + } catch (TTransportException e) { + // ignore: it's probably down + } catch (Exception e) { + log.info("error talking to troublesome tablet server ", e); + } + badServers.remove(server); + tserverSet.remove(server); + } + } + } + synchronized (badServers) { + badServers.keySet().retainAll(currentServers); ++ badServers.keySet().removeAll(result.keySet()); + } + log.debug(String.format("Finished gathering information from %d servers in %.2f seconds", result.size(), (System.currentTimeMillis() - start) / 1000.)); + return result; + } + + public void run() throws IOException, InterruptedException, KeeperException { + final String zroot = ZooUtil.getRoot(instance); + + getMasterLock(zroot + Constants.ZMASTER_LOCK); + + recoveryManager = new RecoveryManager(this); + + TableManager.getInstance().addObserver(this); + + StatusThread statusThread = new StatusThread(); + statusThread.start(); + + MigrationCleanupThread migrationCleanupThread = new MigrationCleanupThread(); + migrationCleanupThread.start(); + + tserverSet.startListeningForTabletServerChanges(); + + try { + final AgeOffStore store = new AgeOffStore(new org.apache.accumulo.fate.ZooStore(ZooUtil.getRoot(instance) + Constants.ZFATE, + ZooReaderWriter.getRetryingInstance()), 1000 * 60 * 60 * 8); + + int threads = this.getConfiguration().getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE); + + fate = new Fate(this, store, threads); + + SimpleTimer.getInstance().schedule(new Runnable() { + + @Override + public void run() { + store.ageOff(); + } + }, 63000, 63000); + } catch (KeeperException e) { + throw new IOException(e); + } catch (InterruptedException e) { + throw new IOException(e); + } + + ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() { + @Override + public void process(WatchedEvent event) { + nextEvent.event("Noticed recovery changes", event.getType()); + try { + // watcher only fires once, add it back + ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, this); + } catch (Exception e) { + log.error("Failed to add log recovery watcher back", e); + } + } + }); + + Credentials systemCreds = SystemCredentials.get(); + watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(instance, systemCreds, this), null)); + watchers.add(new TabletGroupWatcher(this, new RootTabletStateStore(instance, systemCreds, this), watchers.get(0))); + watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1))); + for (TabletGroupWatcher watcher : watchers) { + watcher.start(); + } + + Processor processor = new Processor(TraceWrap.service(new MasterClientServiceHandler(this))); + ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", + "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + clientService = sa.server; + String address = sa.address.toString(); + log.info("Setting master lock data to " + address); + masterLock.replaceLockData(address.getBytes()); + + while (!clientService.isServing()) { + UtilWaitThread.sleep(100); + } + while (clientService.isServing()) { + UtilWaitThread.sleep(500); + } + log.info("Shutting down fate."); + fate.shutdown(); + + final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; + statusThread.join(remaining(deadline)); + + // quit, even if the tablet servers somehow jam up and the watchers + // don't stop + for (TabletGroupWatcher watcher : watchers) { + watcher.join(remaining(deadline)); + } + log.info("exiting"); + } + + private long remaining(long deadline) { + return Math.max(1, deadline - System.currentTimeMillis()); + } + + public ZooLock getMasterLock() { + return masterLock; + } + + private static class MasterLockWatcher implements ZooLock.AsyncLockWatcher { + + boolean acquiredLock = false; + boolean failedToAcquireLock = false; + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Throwable e) { + Halt.halt(-1, new Runnable() { + @Override + public void run() { + log.fatal("No longer able to monitor master lock node", e); + } + }); + + } + + @Override + public synchronized void acquiredLock() { + + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + log.warn("Failed to get master lock " + e); + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + wait(); + } catch (InterruptedException e) {} + } + } + } + + private void getMasterLock(final String zMasterLoc) throws KeeperException, InterruptedException { + log.info("trying to get master lock"); + + final String masterClientAddress = hostname + ":" + getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT); + + while (true) { + + MasterLockWatcher masterLockWatcher = new MasterLockWatcher(); + masterLock = new ZooLock(zMasterLoc); + masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes()); + + masterLockWatcher.waitForChange(); + + if (masterLockWatcher.acquiredLock) { + break; + } + + if (!masterLockWatcher.failedToAcquireLock) { + throw new IllegalStateException("master lock in unknown state"); + } + + masterLock.tryToCancelAsyncLockOrUnlock(); + + UtilWaitThread.sleep(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS); + } + + setMasterState(MasterState.HAVE_LOCK); + } + + public static void main(String[] args) throws Exception { + try { + SecurityUtil.serverLogin(); + + VolumeManager fs = VolumeManagerImpl.get(); + ServerOpts opts = new ServerOpts(); + opts.parseArgs("master", args); + String hostname = opts.getAddress(); + Instance instance = HdfsZooInstance.getInstance(); + ServerConfiguration conf = new ServerConfiguration(instance); + Accumulo.init(fs, conf, "master"); + Master master = new Master(conf, fs, hostname); + Accumulo.enableTracing(hostname, "master"); + master.run(); + } catch (Exception ex) { + log.error("Unexpected exception, exiting", ex); + System.exit(1); + } + } + + @Override + public void update(LiveTServerSet current, Set deleted, Set added) { + DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS); + if (added.size() > 0) { + log.info("New servers: " + added); + for (TServerInstance up : added) + obit.delete(up.hostPort()); + } + for (TServerInstance dead : deleted) { + String cause = "unexpected failure"; + if (serversToShutdown.contains(dead)) + cause = "clean shutdown"; // maybe an incorrect assumption + if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) + obit.post(dead.hostPort(), cause); + } + + Set unexpected = new HashSet(deleted); + unexpected.removeAll(this.serversToShutdown); + if (unexpected.size() > 0) { + if (stillMaster() && !getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) { + log.warn("Lost servers " + unexpected); + } + } + serversToShutdown.removeAll(deleted); + badServers.keySet().removeAll(deleted); + // clear out any bad server with the same host/port as a new server + synchronized (badServers) { + cleanListByHostAndPort(badServers.keySet(), deleted, added); + } + synchronized (serversToShutdown) { + cleanListByHostAndPort(serversToShutdown, deleted, added); + } + + synchronized (migrations) { + Iterator> iter = migrations.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = iter.next(); + if (deleted.contains(entry.getValue())) { + log.info("Canceling migration of " + entry.getKey() + " to " + entry.getValue()); + iter.remove(); + } + } + } + nextEvent.event("There are now %d tablet servers", current.size()); + } + + private static void cleanListByHostAndPort(Collection badServers, Set deleted, Set added) { + Iterator badIter = badServers.iterator(); + while (badIter.hasNext()) { + TServerInstance bad = badIter.next(); + for (TServerInstance add : added) { + if (bad.hostPort().equals(add.hostPort())) { + badIter.remove(); + break; + } + } + for (TServerInstance del : deleted) { + if (bad.hostPort().equals(del.hostPort())) { + badIter.remove(); + break; + } + } + } + } + + @Override + public void stateChanged(String tableId, TableState state) { + nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state); + } + + @Override + public void initialize(Map tableIdToStateMap) {} + + @Override + public void sessionExpired() {} + + @Override + public Set onlineTables() { + Set result = new HashSet(); + if (getMasterState() != MasterState.NORMAL) { + if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS) + result.add(MetadataTable.ID); + if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET) + result.add(RootTable.ID); + return result; + } + TableManager manager = TableManager.getInstance(); + + for (String tableId : Tables.getIdToNameMap(instance).keySet()) { + TableState state = manager.getTableState(tableId); + if (state != null) { + if (state == TableState.ONLINE) + result.add(tableId); + } + } + return result; + } + + @Override + public Set onlineTabletServers() { + return tserverSet.getCurrentServers(); + } + + @Override + public Collection merges() { + List result = new ArrayList(); + for (String tableId : Tables.getIdToNameMap(instance).keySet()) { + result.add(getMergeInfo(new Text(tableId))); + } + return result; + } + + // recovers state from the persistent transaction to shutdown a server + public void shutdownTServer(TServerInstance server) { + nextEvent.event("Tablet Server shutdown requested for %s", server); + serversToShutdown.add(server); + } + + public EventCoordinator getEventCoordinator() { + return nextEvent; + } + + public Instance getInstance() { + return this.instance; + } + + public AccumuloConfiguration getSystemConfiguration() { + return serverConfig.getConfiguration(); + } + + public ServerConfiguration getConfiguration() { + return serverConfig; + } + + public VolumeManager getFileSystem() { + return this.fs; + } + + public void assignedTablet(KeyExtent extent) { + if (extent.isMeta()) { + if (getMasterState().equals(MasterState.UNLOAD_ROOT_TABLET)) { + setMasterState(MasterState.UNLOAD_METADATA_TABLETS); + } + } + if (extent.isRootTablet()) { + // probably too late, but try anyhow + if (getMasterState().equals(MasterState.STOP)) { + setMasterState(MasterState.UNLOAD_ROOT_TABLET); + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5e247981/test/src/test/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java index 0000000,0000000..bf2f6b6 new file mode 100644 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java @@@ -1,0 -1,0 +1,90 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.accumulo.test.functional; ++ ++import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertTrue; ++ ++import java.util.ArrayList; ++import java.util.Collections; ++import java.util.List; ++import java.util.SortedSet; ++import java.util.TreeSet; ++ ++import org.apache.accumulo.core.client.Connector; ++import org.apache.accumulo.core.client.impl.MasterClient; ++import org.apache.accumulo.core.client.security.tokens.PasswordToken; ++import org.apache.accumulo.core.conf.Property; ++import org.apache.accumulo.core.master.thrift.MasterClientService; ++import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; ++import org.apache.accumulo.core.master.thrift.TableInfo; ++import org.apache.accumulo.core.master.thrift.TabletServerStatus; ++import org.apache.accumulo.core.security.Credentials; ++import org.apache.accumulo.fate.util.UtilWaitThread; ++import org.apache.accumulo.minicluster.MiniAccumuloConfig; ++import org.apache.accumulo.trace.instrument.Tracer; ++import org.apache.hadoop.io.Text; ++import org.junit.Test; ++ ++public class BalanceAfterCommsFailureIT extends ConfigurableMacIT { ++ ++ @Override ++ public void configure(MiniAccumuloConfig cfg) { ++ cfg.setSiteConfig(Collections.singletonMap(Property.GENERAL_RPC_TIMEOUT.getKey(), "2s")); ++ } ++ ++ @Test(timeout = 2 * 60 * 1000) ++ public void test() throws Exception { ++ Connector c = this.getConnector(); ++ c.tableOperations().create("test"); ++ assertEquals(0, Runtime.getRuntime().exec(new String[]{"pkill", "-SIGSTOP", "-f", "TabletServer"}).waitFor()); ++ UtilWaitThread.sleep(20 * 1000); ++ assertEquals(0, Runtime.getRuntime().exec(new String[]{"pkill", "-SIGCONT", "-f", "TabletServer"}).waitFor()); ++ SortedSet splits = new TreeSet(); ++ for (String split : "a b c d e f g h i j k l m n o p q r s t u v w x y z".split(" ")) { ++ splits.add(new Text(split)); ++ } ++ c.tableOperations().addSplits("test", splits); ++ UtilWaitThread.sleep(10 * 1000); ++ checkBalance(c); ++ } ++ ++ private void checkBalance(Connector c) throws Exception { ++ Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD)); ++ ++ MasterClientService.Iface client = null; ++ MasterMonitorInfo stats = null; ++ try { ++ client = MasterClient.getConnectionWithRetry(c.getInstance()); ++ stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(c.getInstance())); ++ } finally { ++ if (client != null) ++ MasterClient.close(client); ++ } ++ List counts = new ArrayList(); ++ for (TabletServerStatus server : stats.tServerInfo) { ++ int count = 0; ++ for (TableInfo table : server.tableMap.values()) { ++ count += table.onlineTablets; ++ } ++ counts.add(count); ++ } ++ assertTrue(counts.size() > 1); ++ for (int i = 1; i < counts.size(); i++) ++ assertTrue(Math.abs(counts.get(0) - counts.get(i)) <= counts.size()); ++ } ++}