Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4052C10EDD for ; Wed, 5 Mar 2014 21:13:18 +0000 (UTC) Received: (qmail 86873 invoked by uid 500); 5 Mar 2014 21:13:16 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 86751 invoked by uid 500); 5 Mar 2014 21:13:14 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 86710 invoked by uid 99); 5 Mar 2014 21:13:13 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Mar 2014 21:13:13 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9FBBD937A9F; Wed, 5 Mar 2014 21:13:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Date: Wed, 05 Mar 2014 21:13:16 -0000 Message-Id: <519c4b06e6384ee4b50f7605b4f2e40d@git.apache.org> In-Reply-To: <7d4c6a76a4ea4ef49c9fca97ff45b3d8@git.apache.org> References: <7d4c6a76a4ea4ef49c9fca97ff45b3d8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] git commit: ACCUMULO-2412 use only pre-existing merge requests before processing the metadata table ACCUMULO-2412 use only pre-existing merge requests before processing the metadata table Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1392e07f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1392e07f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1392e07f Branch: refs/heads/1.6.0-SNAPSHOT Commit: 1392e07fb5a4d5efe803810bcd888568cc57d9b9 Parents: 4fd8686 b135111 Author: Eric Newton Authored: Wed Mar 5 16:12:07 2014 -0500 Committer: Eric Newton Committed: Wed Mar 5 16:12:07 2014 -0500 ---------------------------------------------------------------------- .../src/main/java/org/apache/accumulo/master/Master.java | 4 ---- .../java/org/apache/accumulo/master/TabletGroupWatcher.java | 7 ++++++- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/1392e07f/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java index 6a75872,0000000..e123b49 mode 100644,000000..100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@@ -1,1231 -1,0 +1,1227 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.Namespaces; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.ThriftTransportPool; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.master.thrift.MasterClientService.Iface; +import org.apache.accumulo.core.master.thrift.MasterClientService.Processor; +import org.apache.accumulo.core.master.thrift.MasterGoalState; +import org.apache.accumulo.core.master.thrift.MasterState; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.NamespacePermission; +import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.AgeOffStore; +import org.apache.accumulo.fate.Fate; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.master.recovery.RecoveryManager; +import org.apache.accumulo.master.state.TableCounts; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManager.FileType; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.init.Initialize; +import org.apache.accumulo.server.master.LiveTServerSet; +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer; +import org.apache.accumulo.server.master.balancer.TabletBalancer; +import org.apache.accumulo.server.master.state.CurrentState; +import org.apache.accumulo.server.master.state.DeadServerList; +import org.apache.accumulo.server.master.state.MergeInfo; +import org.apache.accumulo.server.master.state.MergeState; +import org.apache.accumulo.server.master.state.MetaDataStateStore; +import org.apache.accumulo.server.master.state.RootTabletStateStore; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletMigration; +import org.apache.accumulo.server.master.state.TabletState; +import org.apache.accumulo.server.master.state.ZooStore; +import org.apache.accumulo.server.master.state.ZooTabletStateStore; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.security.handler.ZKPermHandler; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.accumulo.server.tables.TableObserver; +import org.apache.accumulo.server.util.DefaultMap; +import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.accumulo.server.util.TServerUtils; +import org.apache.accumulo.server.util.TServerUtils.ServerAddress; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.trace.instrument.thrift.TraceWrap; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.server.TServer; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; + +import com.google.common.collect.Iterables; + +/** + * The Master is responsible for assigning and balancing tablets to tablet servers. + * + * The master will also coordinate log recoveries and reports general status. + */ +public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState { + + final static Logger log = Logger.getLogger(Master.class); + + final static int ONE_SECOND = 1000; + final private static Text METADATA_TABLE_ID = new Text(MetadataTable.ID); + final private static Text ROOT_TABLE_ID = new Text(RootTable.ID); + final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND; + final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND; + final static long WAIT_BETWEEN_ERRORS = ONE_SECOND; + final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND; + final private static int MAX_CLEANUP_WAIT_TIME = ONE_SECOND; + final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND; + final static int MAX_TSERVER_WORK_CHUNK = 5000; + final private static int MAX_BAD_STATUS_COUNT = 3; + + final VolumeManager fs; + final private Instance instance; + final private String hostname; + final LiveTServerSet tserverSet; + final private List watchers = new ArrayList(); + final SecurityOperation security; + final Map badServers = Collections.synchronizedMap(new DefaultMap(new AtomicInteger())); + final Set serversToShutdown = Collections.synchronizedSet(new HashSet()); + final SortedMap migrations = Collections.synchronizedSortedMap(new TreeMap()); + final EventCoordinator nextEvent = new EventCoordinator(); + final private Object mergeLock = new Object(); + RecoveryManager recoveryManager = null; + + ZooLock masterLock = null; + private TServer clientService = null; + TabletBalancer tabletBalancer; + + private MasterState state = MasterState.INITIAL; + + Fate fate; + + volatile SortedMap tserverStatus = Collections.unmodifiableSortedMap(new TreeMap()); + + synchronized MasterState getMasterState() { + return state; + } + + public boolean stillMaster() { + return getMasterState() != MasterState.STOP; + } + + static final boolean X = true; + static final boolean _ = false; + // @formatter:off + static final boolean transitionOK[][] = { + // INITIAL HAVE_LOCK SAFE_MODE NORMAL UNLOAD_META UNLOAD_ROOT STOP + /* INITIAL */ {X, X, _, _, _, _, X}, + /* HAVE_LOCK */ {_, X, X, X, _, _, X}, + /* SAFE_MODE */ {_, _, X, X, X, _, X}, + /* NORMAL */ {_, _, X, X, X, _, X}, + /* UNLOAD_METADATA_TABLETS */ {_, _, X, X, X, X, X}, + /* UNLOAD_ROOT_TABLET */ {_, _, _, X, X, X, X}, + /* STOP */ {_, _, _, _, _, X, X}}; + //@formatter:on + synchronized void setMasterState(MasterState newState) { + if (state.equals(newState)) + return; + if (!transitionOK[state.ordinal()][newState.ordinal()]) { + log.error("Programmer error: master should not transition from " + state + " to " + newState); + } + MasterState oldState = state; + state = newState; + nextEvent.event("State changed from %s to %s", oldState, newState); + if (newState == MasterState.STOP) { + // Give the server a little time before shutdown so the client + // thread requesting the stop can return + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + // This frees the main thread and will cause the master to exit + clientService.stop(); + Master.this.nextEvent.event("stopped event loop"); + } + + }, 100l, 1000l); + } + + if (oldState != newState && (newState == MasterState.HAVE_LOCK)) { + upgradeZookeeper(); + } + + if (oldState != newState && (newState == MasterState.NORMAL)) { + upgradeMetadata(); + } + } + + private void moveRootTabletToRootTable(IZooReaderWriter zoo) throws Exception { + String dirZPath = ZooUtil.getRoot(instance) + RootTable.ZROOT_TABLET_PATH; + + if (!zoo.exists(dirZPath)) { + Path oldPath = fs.getFullPath(FileType.TABLE, "/" + MetadataTable.ID + "/root_tablet"); + if (fs.exists(oldPath)) { + String newPath = fs.choose(ServerConstants.getTablesDirs()) + "/" + RootTable.ID; + fs.mkdirs(new Path(newPath)); + if (!fs.rename(oldPath, new Path(newPath))) { + throw new IOException("Failed to move root tablet from " + oldPath + " to " + newPath); + } + + log.info("Upgrade renamed " + oldPath + " to " + newPath); + } + + Path location = null; + + for (String basePath : ServerConstants.getTablesDirs()) { + Path path = new Path(basePath + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION); + if (fs.exists(path)) { + if (location != null) { + throw new IllegalStateException("Root table at multiple locations " + location + " " + path); + } + + location = path; + } + } + + if (location == null) + throw new IllegalStateException("Failed to find root tablet"); + + log.info("Upgrade setting root table location in zookeeper " + location); + zoo.putPersistentData(dirZPath, location.toString().getBytes(), NodeExistsPolicy.FAIL); + } + } + + private void upgradeZookeeper() { + // 1.5.1 and 1.6.0 both do some state checking after obtaining the zoolock for the + // monitor and before starting up. It's not tied to the data version at all (and would + // introduce unnecessary complexity to try to make the master do it), but be aware + // that the master is not the only thing that may alter zookeeper before starting. + + if (Accumulo.getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) { + try { + log.info("Upgrading zookeeper"); + + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + + // create initial namespaces + String namespaces = ZooUtil.getRoot(instance) + Constants.ZNAMESPACES; + zoo.putPersistentData(namespaces, new byte[0], NodeExistsPolicy.SKIP); + for (Pair namespace : Iterables.concat( + Collections.singleton(new Pair(Namespaces.ACCUMULO_NAMESPACE, Namespaces.ACCUMULO_NAMESPACE_ID)), + Collections.singleton(new Pair(Namespaces.DEFAULT_NAMESPACE, Namespaces.DEFAULT_NAMESPACE_ID)))) { + String ns = namespace.getFirst(); + String id = namespace.getSecond(); + log.debug("Upgrade creating namespace \"" + ns + "\" (ID: " + id + ")"); + if (!Namespaces.exists(instance, id)) + TableManager.prepareNewNamespaceState(instance.getInstanceID(), id, ns, NodeExistsPolicy.SKIP); + } + + // create root table + log.debug("Upgrade creating table " + RootTable.NAME + " (ID: " + RootTable.ID + ")"); + TableManager.prepareNewTableState(instance.getInstanceID(), RootTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, TableState.ONLINE, + NodeExistsPolicy.SKIP); + Initialize.initMetadataConfig(RootTable.ID); + // ensure root user can flush root table + security.grantTablePermission(SystemCredentials.get().toThrift(instance), security.getRootUsername(), RootTable.ID, TablePermission.ALTER_TABLE, Namespaces.ACCUMULO_NAMESPACE_ID); + + // put existing tables in the correct namespaces + String tables = ZooUtil.getRoot(instance) + Constants.ZTABLES; + for (String tableId : zoo.getChildren(tables)) { + String targetNamespace = (MetadataTable.ID.equals(tableId) || RootTable.ID.equals(tableId)) ? Namespaces.ACCUMULO_NAMESPACE_ID + : Namespaces.DEFAULT_NAMESPACE_ID; + log.debug("Upgrade moving table " + new String(zoo.getData(tables + "/" + tableId + Constants.ZTABLE_NAME, null), Constants.UTF8) + " (ID: " + + tableId + ") into namespace with ID " + targetNamespace); + zoo.putPersistentData(tables + "/" + tableId + Constants.ZTABLE_NAMESPACE, targetNamespace.getBytes(Constants.UTF8), NodeExistsPolicy.SKIP); + } + + // rename metadata table + log.debug("Upgrade renaming table " + MetadataTable.OLD_NAME + " (ID: " + MetadataTable.ID + ") to " + MetadataTable.NAME); + zoo.putPersistentData(tables + "/" + MetadataTable.ID + Constants.ZTABLE_NAME, Tables.qualify(MetadataTable.NAME).getSecond().getBytes(Constants.UTF8), + NodeExistsPolicy.OVERWRITE); + + moveRootTabletToRootTable(zoo); + + // add system namespace permissions to existing users + ZKPermHandler perm = new ZKPermHandler(); + perm.initialize(instance.getInstanceID(), true); + String users = ZooUtil.getRoot(instance) + "/users"; + for (String user : zoo.getChildren(users)) { + zoo.putPersistentData(users + "/" + user + "/Namespaces", new byte[0], NodeExistsPolicy.SKIP); + perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ); + } + perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE); + + } catch (Exception ex) { + log.fatal("Error performing upgrade", ex); + System.exit(1); + } + } + } + + private final AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false); + + private final ServerConfiguration serverConfig; + + private void upgradeMetadata() { + if (Accumulo.getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) { + if (upgradeMetadataRunning.compareAndSet(false, true)) { + Runnable upgradeTask = new Runnable() { + @Override + public void run() { + try { + MetadataTableUtil.moveMetaDeleteMarkers(instance, SystemCredentials.get()); + Accumulo.updateAccumuloVersion(fs); + + log.info("Upgrade complete"); + + } catch (Exception ex) { + log.fatal("Error performing upgrade", ex); + System.exit(1); + } + + } + }; + + // need to run this in a separate thread because a lock is held that prevents metadata tablets from being assigned and this task writes to the + // metadata table + new Thread(upgradeTask).start(); + } + } + } + + private int assignedOrHosted(Text tableId) { + int result = 0; + for (TabletGroupWatcher watcher : watchers) { + TableCounts count = watcher.getStats(tableId); + result += count.hosted() + count.assigned(); + } + return result; + } + + private int totalAssignedOrHosted() { + int result = 0; + for (TabletGroupWatcher watcher : watchers) { + for (TableCounts counts : watcher.getStats().values()) { + result += counts.assigned() + counts.hosted(); + } + } + return result; + } + + private int nonMetaDataTabletsAssignedOrHosted() { + return totalAssignedOrHosted() - assignedOrHosted(new Text(MetadataTable.ID)) - assignedOrHosted(new Text(RootTable.ID)); + } + + private int notHosted() { + int result = 0; + for (TabletGroupWatcher watcher : watchers) { + for (TableCounts counts : watcher.getStats().values()) { + result += counts.assigned() + counts.assignedToDeadServers(); + } + } + return result; + } + + // The number of unassigned tablets that should be assigned: displayed on the monitor page + int displayUnassigned() { + int result = 0; + switch (getMasterState()) { + case NORMAL: + // Count offline tablets for online tables + for (TabletGroupWatcher watcher : watchers) { + TableManager manager = TableManager.getInstance(); + for (Entry entry : watcher.getStats().entrySet()) { + Text tableId = entry.getKey(); + TableCounts counts = entry.getValue(); + TableState tableState = manager.getTableState(tableId.toString()); + if (tableState != null && tableState.equals(TableState.ONLINE)) { + result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned(); + } + } + } + break; + case SAFE_MODE: + // Count offline tablets for the metadata table + for (TabletGroupWatcher watcher : watchers) { + result += watcher.getStats(METADATA_TABLE_ID).unassigned(); + } + break; + case UNLOAD_METADATA_TABLETS: + case UNLOAD_ROOT_TABLET: + for (TabletGroupWatcher watcher : watchers) { + result += watcher.getStats(METADATA_TABLE_ID).unassigned(); + } + break; + default: + break; + } + return result; + } + + public void mustBeOnline(final String tableId) throws ThriftTableOperationException { + Tables.clearCache(instance); + if (!Tables.getTableState(instance, tableId).equals(TableState.ONLINE)) + throw new ThriftTableOperationException(tableId, null, TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not online"); + } + + public Connector getConnector() throws AccumuloException, AccumuloSecurityException { + return instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken()); + } + + private Master(ServerConfiguration config, VolumeManager fs, String hostname) throws IOException { + this.serverConfig = config; + this.instance = config.getInstance(); + this.fs = fs; + this.hostname = hostname; + + AccumuloConfiguration aconf = serverConfig.getConfiguration(); + + log.info("Version " + Constants.VERSION); + log.info("Instance " + instance.getInstanceID()); + ThriftTransportPool.getInstance().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); + security = AuditedSecurityOperation.getInstance(); + tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this); + this.tabletBalancer = aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer()); + this.tabletBalancer.init(serverConfig); + } + + public TServerConnection getConnection(TServerInstance server) { + return tserverSet.getConnection(server); + } + - public MergeInfo getMergeInfo(KeyExtent tablet) { - return getMergeInfo(tablet.getTableId()); - } - + public MergeInfo getMergeInfo(Text tableId) { + synchronized (mergeLock) { + try { + String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge"; + if (!ZooReaderWriter.getInstance().exists(path)) + return new MergeInfo(); + byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat()); + DataInputBuffer in = new DataInputBuffer(); + in.reset(data, data.length); + MergeInfo info = new MergeInfo(); + info.readFields(in); + return info; + } catch (KeeperException.NoNodeException ex) { + log.info("Error reading merge state, it probably just finished"); + return new MergeInfo(); + } catch (Exception ex) { + log.warn("Unexpected error reading merge state", ex); + return new MergeInfo(); + } + } + } + + public void setMergeState(MergeInfo info, MergeState state) throws IOException, KeeperException, InterruptedException { + synchronized (mergeLock) { + String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + info.getExtent().getTableId().toString() + "/merge"; + info.setState(state); + if (state.equals(MergeState.NONE)) { + ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP); + } else { + DataOutputBuffer out = new DataOutputBuffer(); + try { + info.write(out); + } catch (IOException ex) { + throw new RuntimeException("Unlikely", ex); + } + ZooReaderWriter.getInstance().putPersistentData(path, out.getData(), + state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL : ZooUtil.NodeExistsPolicy.OVERWRITE); + } + mergeLock.notifyAll(); + } + nextEvent.event("Merge state of %s set to %s", info.getExtent(), state); + } + + public void clearMergeState(Text tableId) throws IOException, KeeperException, InterruptedException { + synchronized (mergeLock) { + String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge"; + ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP); + mergeLock.notifyAll(); + } + nextEvent.event("Merge state of %s cleared", tableId); + } + + void setMasterGoalState(MasterGoalState state) { + try { + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, state.name().getBytes(), + NodeExistsPolicy.OVERWRITE); + } catch (Exception ex) { + log.error("Unable to set master goal state in zookeeper"); + } + } + + MasterGoalState getMasterGoalState() { + while (true) + try { + byte[] data = ZooReaderWriter.getInstance().getData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, null); + return MasterGoalState.valueOf(new String(data)); + } catch (Exception e) { + log.error("Problem getting real goal state: " + e); + UtilWaitThread.sleep(1000); + } + } + + public boolean hasCycled(long time) { + for (TabletGroupWatcher watcher : watchers) { + if (watcher.stats.lastScanFinished() < time) + return false; + } + + return true; + } + + public void clearMigrations(String tableId) { + synchronized (migrations) { + Iterator iterator = migrations.keySet().iterator(); + while (iterator.hasNext()) { + KeyExtent extent = iterator.next(); + if (extent.getTableId().toString().equals(tableId)) { + iterator.remove(); + } + } + } + } + + static enum TabletGoalState { + HOSTED, UNASSIGNED, DELETED + }; + + TabletGoalState getSystemGoalState(TabletLocationState tls) { + switch (getMasterState()) { + case NORMAL: + return TabletGoalState.HOSTED; + case HAVE_LOCK: // fall-through intended + case INITIAL: // fall-through intended + case SAFE_MODE: + if (tls.extent.isMeta()) + return TabletGoalState.HOSTED; + return TabletGoalState.UNASSIGNED; + case UNLOAD_METADATA_TABLETS: + if (tls.extent.isRootTablet()) + return TabletGoalState.HOSTED; + return TabletGoalState.UNASSIGNED; + case UNLOAD_ROOT_TABLET: + return TabletGoalState.UNASSIGNED; + case STOP: + return TabletGoalState.UNASSIGNED; + default: + throw new IllegalStateException("Unknown Master State"); + } + } + + TabletGoalState getTableGoalState(KeyExtent extent) { + TableState tableState = TableManager.getInstance().getTableState(extent.getTableId().toString()); + if (tableState == null) + return TabletGoalState.DELETED; + switch (tableState) { + case DELETING: + return TabletGoalState.DELETED; + case OFFLINE: + case NEW: + return TabletGoalState.UNASSIGNED; + default: + return TabletGoalState.HOSTED; + } + } + + TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) { + KeyExtent extent = tls.extent; + // Shutting down? + TabletGoalState state = getSystemGoalState(tls); + if (state == TabletGoalState.HOSTED) { + if (tls.current != null && serversToShutdown.contains(tls.current)) { + return TabletGoalState.UNASSIGNED; + } + // Handle merge transitions + if (mergeInfo.getExtent() != null) { + log.debug("mergeInfo overlaps: " + extent + " " + mergeInfo.overlaps(extent)); + if (mergeInfo.overlaps(extent)) { + switch (mergeInfo.getState()) { + case NONE: + case COMPLETE: + break; + case STARTED: + case SPLITTING: + return TabletGoalState.HOSTED; + case WAITING_FOR_CHOPPED: + if (tls.getState(onlineTabletServers()).equals(TabletState.HOSTED)) { + if (tls.chopped) + return TabletGoalState.UNASSIGNED; + } else { + if (tls.chopped && tls.walogs.isEmpty()) + return TabletGoalState.UNASSIGNED; + } + + return TabletGoalState.HOSTED; + case WAITING_FOR_OFFLINE: + case MERGING: + return TabletGoalState.UNASSIGNED; + } + } + } + + // taking table offline? + state = getTableGoalState(extent); + if (state == TabletGoalState.HOSTED) { + // Maybe this tablet needs to be migrated + TServerInstance dest = migrations.get(extent); + if (dest != null && tls.current != null && !dest.equals(tls.current)) { + return TabletGoalState.UNASSIGNED; + } + } + } + return state; + } + + private class MigrationCleanupThread extends Daemon { + + @Override + public void run() { + setName("Migration Cleanup Thread"); + while (stillMaster()) { + if (!migrations.isEmpty()) { + try { + cleanupMutations(); + } catch (Exception ex) { + log.error("Error cleaning up migrations", ex); + } + } + UtilWaitThread.sleep(TIME_BETWEEN_MIGRATION_CLEANUPS); + } + } + + // If a migrating tablet splits, and the tablet dies before sending the + // master a message, the migration will refer to a non-existing tablet, + // so it can never complete. Periodically scan the metadata table and + // remove any migrating tablets that no longer exist. + private void cleanupMutations() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + Connector connector = getConnector(); + Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + Set found = new HashSet(); + for (Entry entry : scanner) { + KeyExtent extent = new KeyExtent(entry.getKey().getRow(), entry.getValue()); + if (migrations.containsKey(extent)) { + found.add(extent); + } + } + migrations.keySet().retainAll(found); + } + } + + private class StatusThread extends Daemon { + + @Override + public void run() { + setName("Status Thread"); + EventCoordinator.Listener eventListener = nextEvent.getListener(); + while (stillMaster()) { + long wait = DEFAULT_WAIT_FOR_WATCHER; + try { + switch (getMasterGoalState()) { + case NORMAL: + setMasterState(MasterState.NORMAL); + break; + case SAFE_MODE: + if (getMasterState() == MasterState.NORMAL) { + setMasterState(MasterState.SAFE_MODE); + } + if (getMasterState() == MasterState.HAVE_LOCK) { + setMasterState(MasterState.SAFE_MODE); + } + break; + case CLEAN_STOP: + switch (getMasterState()) { + case NORMAL: + setMasterState(MasterState.SAFE_MODE); + break; + case SAFE_MODE: { + int count = nonMetaDataTabletsAssignedOrHosted(); + log.debug(String.format("There are %d non-metadata tablets assigned or hosted", count)); + if (count == 0) + setMasterState(MasterState.UNLOAD_METADATA_TABLETS); + } + break; + case UNLOAD_METADATA_TABLETS: { + int count = assignedOrHosted(METADATA_TABLE_ID); + log.debug(String.format("There are %d metadata tablets assigned or hosted", count)); + if (count == 0) + setMasterState(MasterState.UNLOAD_ROOT_TABLET); + } + break; + case UNLOAD_ROOT_TABLET: { + int count = assignedOrHosted(METADATA_TABLE_ID); + if (count > 0) { + log.debug(String.format("%d metadata tablets online", count)); + setMasterState(MasterState.UNLOAD_ROOT_TABLET); + } + int root_count = assignedOrHosted(ROOT_TABLE_ID); + if (root_count > 0) + log.debug("The root tablet is still assigned or hosted"); + if (count + root_count == 0) { + Set currentServers = tserverSet.getCurrentServers(); + log.debug("stopping " + currentServers.size() + " tablet servers"); + for (TServerInstance server : currentServers) { + try { + serversToShutdown.add(server); + tserverSet.getConnection(server).fastHalt(masterLock); + } catch (TException e) { + // its probably down, and we don't care + } finally { + tserverSet.remove(server); + } + } + if (currentServers.size() == 0) + setMasterState(MasterState.STOP); + } + } + break; + default: + break; + } + } + wait = updateStatus(); + eventListener.waitForEvents(wait); + } catch (Throwable t) { + log.error("Error balancing tablets", t); + UtilWaitThread.sleep(WAIT_BETWEEN_ERRORS); + } + } + } + + private long updateStatus() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + tserverStatus = Collections.synchronizedSortedMap(gatherTableInformation()); + checkForHeldServer(tserverStatus); + + if (!badServers.isEmpty()) { + log.debug("not balancing because the balance information is out-of-date " + badServers.keySet()); + } else if (notHosted() > 0) { + log.debug("not balancing because there are unhosted tablets"); + } else if (getMasterGoalState() == MasterGoalState.CLEAN_STOP) { + log.debug("not balancing because the master is attempting to stop cleanly"); + } else if (!serversToShutdown.isEmpty()) { + log.debug("not balancing while shutting down servers " + serversToShutdown); + } else { + return balanceTablets(); + } + return DEFAULT_WAIT_FOR_WATCHER; + } + + private void checkForHeldServer(SortedMap tserverStatus) { + TServerInstance instance = null; + int crazyHoldTime = 0; + int someHoldTime = 0; + final long maxWait = getSystemConfiguration().getTimeInMillis(Property.TSERV_HOLD_TIME_SUICIDE); + for (Entry entry : tserverStatus.entrySet()) { + if (entry.getValue().getHoldTime() > 0) { + someHoldTime++; + if (entry.getValue().getHoldTime() > maxWait) { + instance = entry.getKey(); + crazyHoldTime++; + } + } + } + if (crazyHoldTime == 1 && someHoldTime == 1 && tserverStatus.size() > 1) { + log.warn("Tablet server " + instance + " exceeded maximum hold time: attempting to kill it"); + try { + TServerConnection connection = tserverSet.getConnection(instance); + if (connection != null) + connection.fastHalt(masterLock); + } catch (TException e) { + log.error(e, e); + } + tserverSet.remove(instance); + } + } + + private long balanceTablets() { + List migrationsOut = new ArrayList(); + Set migrationsCopy = new HashSet(); + synchronized (migrations) { + migrationsCopy.addAll(migrations.keySet()); + } + long wait = tabletBalancer.balance(Collections.unmodifiableSortedMap(tserverStatus), Collections.unmodifiableSet(migrationsCopy), migrationsOut); + + for (TabletMigration m : TabletBalancer.checkMigrationSanity(tserverStatus.keySet(), migrationsOut)) { + if (migrations.containsKey(m.tablet)) { + log.warn("balancer requested migration more than once, skipping " + m); + continue; + } + migrations.put(m.tablet, m.newServer); + log.debug("migration " + m); + } + if (migrationsOut.size() > 0) { + nextEvent.event("Migrating %d more tablets, %d total", migrationsOut.size(), migrations.size()); + } + return wait; + } + + } + + private SortedMap gatherTableInformation() { + long start = System.currentTimeMillis(); + SortedMap result = new TreeMap(); + Set currentServers = tserverSet.getCurrentServers(); + for (TServerInstance server : currentServers) { + try { + Thread t = Thread.currentThread(); + String oldName = t.getName(); + try { + t.setName("Getting status from " + server); + TServerConnection connection = tserverSet.getConnection(server); + if (connection == null) + throw new IOException("No connection to " + server); + TabletServerStatus status = connection.getTableMap(false); + result.put(server, status); + } finally { + t.setName(oldName); + } + } catch (Exception ex) { + log.error("unable to get tablet server status " + server + " " + ex.toString()); + log.debug("unable to get tablet server status " + server, ex); + if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) { + log.warn("attempting to stop " + server); + try { + TServerConnection connection = tserverSet.getConnection(server); + if (connection != null) + connection.halt(masterLock); + } catch (TTransportException e) { + // ignore: it's probably down + } catch (Exception e) { + log.info("error talking to troublesome tablet server ", e); + } + badServers.remove(server); + tserverSet.remove(server); + } + } + } + synchronized (badServers) { + badServers.keySet().retainAll(currentServers); + badServers.keySet().removeAll(result.keySet()); + } + log.debug(String.format("Finished gathering information from %d servers in %.2f seconds", result.size(), (System.currentTimeMillis() - start) / 1000.)); + return result; + } + + public void run() throws IOException, InterruptedException, KeeperException { + final String zroot = ZooUtil.getRoot(instance); + + getMasterLock(zroot + Constants.ZMASTER_LOCK); + + recoveryManager = new RecoveryManager(this); + + TableManager.getInstance().addObserver(this); + + StatusThread statusThread = new StatusThread(); + statusThread.start(); + + MigrationCleanupThread migrationCleanupThread = new MigrationCleanupThread(); + migrationCleanupThread.start(); + + tserverSet.startListeningForTabletServerChanges(); + + try { + final AgeOffStore store = new AgeOffStore(new org.apache.accumulo.fate.ZooStore(ZooUtil.getRoot(instance) + Constants.ZFATE, + ZooReaderWriter.getRetryingInstance()), 1000 * 60 * 60 * 8); + + int threads = this.getConfiguration().getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE); + + fate = new Fate(this, store, threads); + + SimpleTimer.getInstance().schedule(new Runnable() { + + @Override + public void run() { + store.ageOff(); + } + }, 63000, 63000); + } catch (KeeperException e) { + throw new IOException(e); + } catch (InterruptedException e) { + throw new IOException(e); + } + + ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() { + @Override + public void process(WatchedEvent event) { + nextEvent.event("Noticed recovery changes", event.getType()); + try { + // watcher only fires once, add it back + ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, this); + } catch (Exception e) { + log.error("Failed to add log recovery watcher back", e); + } + } + }); + + Credentials systemCreds = SystemCredentials.get(); + watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(instance, systemCreds, this), null)); + watchers.add(new TabletGroupWatcher(this, new RootTabletStateStore(instance, systemCreds, this), watchers.get(0))); + watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1))); + for (TabletGroupWatcher watcher : watchers) { + watcher.start(); + } + + Processor processor = new Processor(TraceWrap.service(new MasterClientServiceHandler(this))); + ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", + "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + clientService = sa.server; + String address = sa.address.toString(); + log.info("Setting master lock data to " + address); + masterLock.replaceLockData(address.getBytes()); + + while (!clientService.isServing()) { + UtilWaitThread.sleep(100); + } + while (clientService.isServing()) { + UtilWaitThread.sleep(500); + } + log.info("Shutting down fate."); + fate.shutdown(); + + final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; + statusThread.join(remaining(deadline)); + + // quit, even if the tablet servers somehow jam up and the watchers + // don't stop + for (TabletGroupWatcher watcher : watchers) { + watcher.join(remaining(deadline)); + } + log.info("exiting"); + } + + private long remaining(long deadline) { + return Math.max(1, deadline - System.currentTimeMillis()); + } + + public ZooLock getMasterLock() { + return masterLock; + } + + private static class MasterLockWatcher implements ZooLock.AsyncLockWatcher { + + boolean acquiredLock = false; + boolean failedToAcquireLock = false; + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Throwable e) { + Halt.halt(-1, new Runnable() { + @Override + public void run() { + log.fatal("No longer able to monitor master lock node", e); + } + }); + + } + + @Override + public synchronized void acquiredLock() { + log.debug("Acquired master lock"); + + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + log.warn("Failed to get master lock " + e); + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + wait(); + } catch (InterruptedException e) {} + } + } + } + + private void getMasterLock(final String zMasterLoc) throws KeeperException, InterruptedException { + log.info("trying to get master lock"); + + final String masterClientAddress = hostname + ":" + getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT); + + while (true) { + + MasterLockWatcher masterLockWatcher = new MasterLockWatcher(); + masterLock = new ZooLock(zMasterLoc); + masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes()); + + masterLockWatcher.waitForChange(); + + if (masterLockWatcher.acquiredLock) { + break; + } + + if (!masterLockWatcher.failedToAcquireLock) { + throw new IllegalStateException("master lock in unknown state"); + } + + masterLock.tryToCancelAsyncLockOrUnlock(); + + UtilWaitThread.sleep(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS); + } + + setMasterState(MasterState.HAVE_LOCK); + } + + public static void main(String[] args) throws Exception { + try { + SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration()); + + VolumeManager fs = VolumeManagerImpl.get(); + ServerOpts opts = new ServerOpts(); + opts.parseArgs("master", args); + String hostname = opts.getAddress(); + Instance instance = HdfsZooInstance.getInstance(); + ServerConfiguration conf = new ServerConfiguration(instance); + Accumulo.init(fs, conf, "master"); + Master master = new Master(conf, fs, hostname); + Accumulo.enableTracing(hostname, "master"); + master.run(); + } catch (Exception ex) { + log.error("Unexpected exception, exiting", ex); + System.exit(1); + } + } + + @Override + public void update(LiveTServerSet current, Set deleted, Set added) { + DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS); + if (added.size() > 0) { + log.info("New servers: " + added); + for (TServerInstance up : added) + obit.delete(up.hostPort()); + } + for (TServerInstance dead : deleted) { + String cause = "unexpected failure"; + if (serversToShutdown.contains(dead)) + cause = "clean shutdown"; // maybe an incorrect assumption + if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) + obit.post(dead.hostPort(), cause); + } + + Set unexpected = new HashSet(deleted); + unexpected.removeAll(this.serversToShutdown); + if (unexpected.size() > 0) { + if (stillMaster() && !getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) { + log.warn("Lost servers " + unexpected); + } + } + serversToShutdown.removeAll(deleted); + badServers.keySet().removeAll(deleted); + // clear out any bad server with the same host/port as a new server + synchronized (badServers) { + cleanListByHostAndPort(badServers.keySet(), deleted, added); + } + synchronized (serversToShutdown) { + cleanListByHostAndPort(serversToShutdown, deleted, added); + } + + synchronized (migrations) { + Iterator> iter = migrations.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = iter.next(); + if (deleted.contains(entry.getValue())) { + log.info("Canceling migration of " + entry.getKey() + " to " + entry.getValue()); + iter.remove(); + } + } + } + nextEvent.event("There are now %d tablet servers", current.size()); + } + + private static void cleanListByHostAndPort(Collection badServers, Set deleted, Set added) { + Iterator badIter = badServers.iterator(); + while (badIter.hasNext()) { + TServerInstance bad = badIter.next(); + for (TServerInstance add : added) { + if (bad.hostPort().equals(add.hostPort())) { + badIter.remove(); + break; + } + } + for (TServerInstance del : deleted) { + if (bad.hostPort().equals(del.hostPort())) { + badIter.remove(); + break; + } + } + } + } + + @Override + public void stateChanged(String tableId, TableState state) { + nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state); + } + + @Override + public void initialize(Map tableIdToStateMap) {} + + @Override + public void sessionExpired() {} + + @Override + public Set onlineTables() { + Set result = new HashSet(); + if (getMasterState() != MasterState.NORMAL) { + if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS) + result.add(MetadataTable.ID); + if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET) + result.add(RootTable.ID); + return result; + } + TableManager manager = TableManager.getInstance(); + + for (String tableId : Tables.getIdToNameMap(instance).keySet()) { + TableState state = manager.getTableState(tableId); + if (state != null) { + if (state == TableState.ONLINE) + result.add(tableId); + } + } + return result; + } + + @Override + public Set onlineTabletServers() { + return tserverSet.getCurrentServers(); + } + + @Override + public Collection merges() { + List result = new ArrayList(); + for (String tableId : Tables.getIdToNameMap(instance).keySet()) { + result.add(getMergeInfo(new Text(tableId))); + } + return result; + } + + // recovers state from the persistent transaction to shutdown a server + public void shutdownTServer(TServerInstance server) { + nextEvent.event("Tablet Server shutdown requested for %s", server); + serversToShutdown.add(server); + } + + public EventCoordinator getEventCoordinator() { + return nextEvent; + } + + public Instance getInstance() { + return this.instance; + } + + public AccumuloConfiguration getSystemConfiguration() { + return serverConfig.getConfiguration(); + } + + public ServerConfiguration getConfiguration() { + return serverConfig; + } + + public VolumeManager getFileSystem() { + return this.fs; + } + + public void assignedTablet(KeyExtent extent) { + if (extent.isMeta()) { + if (getMasterState().equals(MasterState.UNLOAD_ROOT_TABLET)) { + setMasterState(MasterState.UNLOAD_METADATA_TABLETS); + } + } + if (extent.isRootTablet()) { + // probably too late, but try anyhow + if (getMasterState().equals(MasterState.STOP)) { + setMasterState(MasterState.UNLOAD_ROOT_TABLET); + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1392e07f/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index 1590348,0000000..cc52e45 mode 100644,000000..100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@@ -1,742 -1,0 +1,747 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master; + +import static java.lang.Math.min; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.master.Master.TabletGoalState; +import org.apache.accumulo.master.state.MergeStats; +import org.apache.accumulo.master.state.TableCounts; +import org.apache.accumulo.master.state.TableStats; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager.FileType; +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.master.state.Assignment; +import org.apache.accumulo.server.master.state.ClosableIterator; +import org.apache.accumulo.server.master.state.DistributedStoreException; +import org.apache.accumulo.server.master.state.MergeInfo; +import org.apache.accumulo.server.master.state.MergeState; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; +import org.apache.accumulo.server.master.state.TabletState; +import org.apache.accumulo.server.master.state.TabletStateStore; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.accumulo.server.tablets.TabletTime; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; + +import com.google.common.collect.Iterators; + +class TabletGroupWatcher extends Daemon { + + private final Master master; + final TabletStateStore store; + final TabletGroupWatcher dependentWatcher; + + final TableStats stats = new TableStats(); + + TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) { + this.master = master; + this.store = store; + this.dependentWatcher = dependentWatcher; + } + + Map getStats() { + return stats.getLast(); + } + + TableCounts getStats(Text tableId) { + return stats.getLast(tableId); + } + + @Override + public void run() { + + Thread.currentThread().setName("Watching " + store.name()); + int[] oldCounts = new int[TabletState.values().length]; + EventCoordinator.Listener eventListener = this.master.nextEvent.getListener(); + + while (this.master.stillMaster()) { + // slow things down a little, otherwise we spam the logs when there are many wake-up events + UtilWaitThread.sleep(100); + + int totalUnloaded = 0; + int unloaded = 0; + ClosableIterator iter = null; + try { + Map mergeStatsCache = new HashMap(); ++ for (MergeInfo merge : master.merges()) { ++ if (merge.getExtent() != null) { ++ mergeStatsCache.put(merge.getExtent().getTableId(), new MergeStats(merge)); ++ } ++ } + + // Get the current status for the current list of tservers + SortedMap currentTServers = new TreeMap(); + for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) { + currentTServers.put(entry, this.master.tserverStatus.get(entry)); + } + + if (currentTServers.size() == 0) { + eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); + continue; + } + + // Don't move tablets to servers that are shutting down + SortedMap destinations = new TreeMap(currentTServers); + destinations.keySet().removeAll(this.master.serversToShutdown); + + List assignments = new ArrayList(); + List assigned = new ArrayList(); + List assignedToDeadServers = new ArrayList(); + Map unassigned = new HashMap(); + + int[] counts = new int[TabletState.values().length]; + stats.begin(); + // Walk through the tablets in our store, and work tablets + // towards their goal + iter = store.iterator(); + while (iter.hasNext()) { + TabletLocationState tls = iter.next(); + if (tls == null) { + continue; + } + // ignore entries for tables that do not exist in zookeeper + if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null) + continue; + + if (Master.log.isTraceEnabled()) + Master.log.trace(tls + " walogs " + tls.walogs.size()); + + // Don't overwhelm the tablet servers with work + if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { + flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned); + assignments.clear(); + assigned.clear(); + assignedToDeadServers.clear(); + unassigned.clear(); + unloaded = 0; + eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); + } + Text tableId = tls.extent.getTableId(); + MergeStats mergeStats = mergeStatsCache.get(tableId); + if (mergeStats == null) { - mergeStatsCache.put(tableId, mergeStats = new MergeStats(this.master.getMergeInfo(tls.extent))); ++ mergeStatsCache.put(tableId, mergeStats = new MergeStats(new MergeInfo())); + } + TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo()); + TServerInstance server = tls.getServer(); + TabletState state = tls.getState(currentTServers.keySet()); + if (Master.log.isTraceEnabled()) + Master.log.trace("Goal state " + goal + " current " + state); + stats.update(tableId, state); + mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty()); + sendChopRequest(mergeStats.getMergeInfo(), state, tls); + sendSplitRequest(mergeStats.getMergeInfo(), state, tls); + + // Always follow through with assignments + if (state == TabletState.ASSIGNED) { + goal = TabletGoalState.HOSTED; + } + + // if we are shutting down all the tabletservers, we have to do it in order + if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) { + if (this.master.serversToShutdown.equals(currentTServers.keySet())) { + if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) { + goal = TabletGoalState.HOSTED; + } + } + } + + if (goal == TabletGoalState.HOSTED) { + if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) { + if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs)) + continue; + } + switch (state) { + case HOSTED: + if (server.equals(this.master.migrations.get(tls.extent))) + this.master.migrations.remove(tls.extent); + break; + case ASSIGNED_TO_DEAD_SERVER: + assignedToDeadServers.add(tls); + if (server.equals(this.master.migrations.get(tls.extent))) + this.master.migrations.remove(tls.extent); + // log.info("Current servers " + currentTServers.keySet()); + break; + case UNASSIGNED: + // maybe it's a finishing migration + TServerInstance dest = this.master.migrations.get(tls.extent); + if (dest != null) { + // if destination is still good, assign it + if (destinations.keySet().contains(dest)) { + assignments.add(new Assignment(tls.extent, dest)); + } else { + // get rid of this migration + this.master.migrations.remove(tls.extent); + unassigned.put(tls.extent, server); + } + } else { + unassigned.put(tls.extent, server); + } + break; + case ASSIGNED: + // Send another reminder + assigned.add(new Assignment(tls.extent, tls.future)); + break; + } + } else { + switch (state) { + case UNASSIGNED: + break; + case ASSIGNED_TO_DEAD_SERVER: + assignedToDeadServers.add(tls); + // log.info("Current servers " + currentTServers.keySet()); + break; + case HOSTED: + TServerConnection conn = this.master.tserverSet.getConnection(server); + if (conn != null) { + conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED); + unloaded++; + totalUnloaded++; + } else { + Master.log.warn("Could not connect to server " + server); + } + break; + case ASSIGNED: + break; + } + } + counts[state.ordinal()]++; + } + + flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned); + + // provide stats after flushing changes to avoid race conditions w/ delete table + stats.end(); + + // Report changes + for (TabletState state : TabletState.values()) { + int i = state.ordinal(); + if (counts[i] > 0 && counts[i] != oldCounts[i]) { + this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name()); + } + } + Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.)); + oldCounts = counts; + if (totalUnloaded > 0) { + this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded); + } + + updateMergeState(mergeStatsCache); + + Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.)); + eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); + } catch (Exception ex) { + Master.log.error("Error processing table state for store " + store.name(), ex); + if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) { + repairMetadata(((BadLocationStateException) ex.getCause()).getEncodedEndRow()); + } else { + UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS); + } + } finally { + if (iter != null) { + try { + iter.close(); + } catch (IOException ex) { + Master.log.warn("Error closing TabletLocationState iterator: " + ex, ex); + } + } + } + } + } + + private void repairMetadata(Text row) { + Master.log.debug("Attempting repair on " + row); + // ACCUMULO-2261 if a dying tserver writes a location before its lock information propagates, it may cause duplicate assignment. + // Attempt to find the dead server entry and remove it. + try { + Map future = new HashMap(); + Map assigned = new HashMap(); + KeyExtent extent = new KeyExtent(row, new Value(new byte[]{0})); + String table = MetadataTable.NAME; + if (extent.isMeta()) + table = RootTable.NAME; + Scanner scanner = this.master.getConnector().createScanner(table, Authorizations.EMPTY); + scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); + scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME); + scanner.setRange(new Range(row)); + for (Entry entry : scanner) { + if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) { + assigned.put(entry.getKey(), entry.getValue()); + } else if (entry.getKey().getColumnFamily().equals(FutureLocationColumnFamily.NAME)) { + future.put(entry.getKey(), entry.getValue()); + } + } + if (future.size() > 0 && assigned.size() > 0) { + Master.log.warn("Found a tablet assigned and hosted, attempting to repair"); + } else if (future.size() > 1 && assigned.size() == 0) { + Master.log.warn("Found a tablet assigned to multiple servers, attempting to repair"); + } else if (future.size() == 0 && assigned.size() > 1) { + Master.log.warn("Found a tablet hosted on multiple servers, attempting to repair"); + } else { + Master.log.info("Attempted a repair, but nothing seems to be obviously wrong. " + assigned + " " + future); + return; + } + Iterator> iter = Iterators.concat(future.entrySet().iterator(), assigned.entrySet().iterator()); + while (iter.hasNext()) { + Entry entry = iter.next(); + TServerInstance alive = master.tserverSet.find(entry.getValue().toString()); + if (alive == null) { + Master.log.info("Removing entry " + entry); + BatchWriter bw = this.master.getConnector().createBatchWriter(table, new BatchWriterConfig()); + Mutation m = new Mutation(entry.getKey().getRow()); + m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); + bw.addMutation(m); + bw.close(); + return; + } + } + Master.log.error("Metadata table is inconsistent at " + row + " and all assigned/future tservers are still online."); + } catch (Throwable e) { + Master.log.error("Error attempting repair of metadata " + row + ": " + e, e); + } + } + + private int assignedOrHosted() { + int result = 0; + for (TableCounts counts : stats.getLast().values()) { + result += counts.assigned() + counts.hosted(); + } + return result; + } + + private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) { + // Already split? + if (!info.getState().equals(MergeState.SPLITTING)) + return; + // Merges don't split + if (!info.isDelete()) + return; + // Online and ready to split? + if (!state.equals(TabletState.HOSTED)) + return; + // Does this extent cover the end points of the delete? + KeyExtent range = info.getExtent(); + if (tls.extent.overlaps(range)) { + for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) { + if (splitPoint == null) + continue; + if (!tls.extent.contains(splitPoint)) + continue; + if (splitPoint.equals(tls.extent.getEndRow())) + continue; + if (splitPoint.equals(tls.extent.getPrevEndRow())) + continue; + try { + TServerConnection conn; + conn = this.master.tserverSet.getConnection(tls.current); + if (conn != null) { + Master.log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint); + conn.splitTablet(this.master.masterLock, tls.extent, splitPoint); + } else { + Master.log.warn("Not connected to server " + tls.current); + } + } catch (NotServingTabletException e) { + Master.log.debug("Error asking tablet server to split a tablet: " + e); + } catch (Exception e) { + Master.log.warn("Error asking tablet server to split a tablet: " + e); + } + } + } + } + + private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) { + // Don't bother if we're in the wrong state + if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED)) + return; + // Tablet must be online + if (!state.equals(TabletState.HOSTED)) + return; + // Tablet isn't already chopped + if (tls.chopped) + return; + // Tablet ranges intersect + if (info.needsToBeChopped(tls.extent)) { + TServerConnection conn; + try { + conn = this.master.tserverSet.getConnection(tls.current); + if (conn != null) { + Master.log.info("Asking " + tls.current + " to chop " + tls.extent); + conn.chop(this.master.masterLock, tls.extent); + } else { + Master.log.warn("Could not connect to server " + tls.current); + } + } catch (TException e) { + Master.log.warn("Communications error asking tablet server to chop a tablet"); + } + } + } + + private void updateMergeState(Map mergeStatsCache) { + for (MergeStats stats : mergeStatsCache.values()) { + try { + MergeState update = stats.nextMergeState(this.master.getConnector(), this.master); + // when next state is MERGING, its important to persist this before + // starting the merge... the verification check that is done before + // moving into the merging state could fail if merge starts but does + // not finish + if (update == MergeState.COMPLETE) + update = MergeState.NONE; + if (update != stats.getMergeInfo().getState()) { + this.master.setMergeState(stats.getMergeInfo(), update); + } + + if (update == MergeState.MERGING) { + try { + if (stats.getMergeInfo().isDelete()) { + deleteTablets(stats.getMergeInfo()); + } else { + mergeMetadataRecords(stats.getMergeInfo()); + } + this.master.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE); + } catch (Exception ex) { + Master.log.error("Unable merge metadata table records", ex); + } + } + } catch (Exception ex) { + Master.log.error("Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex); + } + } + } + + private void deleteTablets(MergeInfo info) throws AccumuloException { + KeyExtent extent = info.getExtent(); + String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME; + Master.log.debug("Deleting tablets for " + extent); + char timeType = '\0'; + KeyExtent followingTablet = null; + if (extent.getEndRow() != null) { + Key nextExtent = new Key(extent.getEndRow()).followingKey(PartialKey.ROW); + followingTablet = getHighTablet(new KeyExtent(extent.getTableId(), nextExtent.getRow(), extent.getEndRow())); + Master.log.debug("Found following tablet " + followingTablet); + } + try { + Connector conn = this.master.getConnector(); + Text start = extent.getPrevEndRow(); + if (start == null) { + start = new Text(); + } + Master.log.debug("Making file deletion entries for " + extent); + Range deleteRange = new Range(KeyExtent.getMetadataEntry(extent.getTableId(), start), false, KeyExtent.getMetadataEntry(extent.getTableId(), + extent.getEndRow()), true); + Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY); + scanner.setRange(deleteRange); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); + TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); + Set datafiles = new TreeSet(); + for (Entry entry : scanner) { + Key key = entry.getKey(); + if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) { + datafiles.add(new FileRef(this.master.fs, key)); + if (datafiles.size() > 1000) { + MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get()); + datafiles.clear(); + } + } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { + timeType = entry.getValue().toString().charAt(0); + } else if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) { + throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!"); + } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { + datafiles.add(new FileRef(entry.getValue().toString(), this.master.fs.getFullPath(FileType.TABLE, entry.getValue().toString()))); + if (datafiles.size() > 1000) { + MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get()); + datafiles.clear(); + } + } + } + MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get()); + BatchWriter bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig()); + try { + deleteTablets(info, deleteRange, bw, conn); + } finally { + bw.close(); + } + + if (followingTablet != null) { + Master.log.debug("Updating prevRow of " + followingTablet + " to " + extent.getPrevEndRow()); + bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig()); + try { + Mutation m = new Mutation(followingTablet.getMetadataEntry()); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(extent.getPrevEndRow())); + ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m); + bw.addMutation(m); + bw.flush(); + } finally { + bw.close(); + } + } else { + // Recreate the default tablet to hold the end of the table + Master.log.debug("Recreating the last tablet to point to " + extent.getPrevEndRow()); + String tdir = master.getFileSystem().choose(ServerConstants.getTablesDirs()) + "/" + extent.getTableId() + Constants.DEFAULT_TABLET_LOCATION; + MetadataTableUtil.addTablet(new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), tdir, + SystemCredentials.get(), timeType, this.master.masterLock); + } + } catch (Exception ex) { + throw new AccumuloException(ex); + } + } + + private void mergeMetadataRecords(MergeInfo info) throws AccumuloException { + KeyExtent range = info.getExtent(); + Master.log.debug("Merging metadata for " + range); + KeyExtent stop = getHighTablet(range); + Master.log.debug("Highest tablet is " + stop); + Value firstPrevRowValue = null; + Text stopRow = stop.getMetadataEntry(); + Text start = range.getPrevEndRow(); + if (start == null) { + start = new Text(); + } + Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false); + String targetSystemTable = MetadataTable.NAME; + if (range.isMeta()) { + targetSystemTable = RootTable.NAME; + } + + BatchWriter bw = null; + try { + long fileCount = 0; + Connector conn = this.master.getConnector(); + // Make file entries in highest tablet + bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig()); + Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY); + scanner.setRange(scanRange); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + Mutation m = new Mutation(stopRow); + String maxLogicalTime = null; + for (Entry entry : scanner) { + Key key = entry.getKey(); + Value value = entry.getValue(); + if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + m.put(key.getColumnFamily(), key.getColumnQualifier(), value); + fileCount++; + } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) { + Master.log.debug("prevRow entry for lowest tablet is " + value); + firstPrevRowValue = new Value(value); + } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { + maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString()); + } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { + bw.addMutation(MetadataTableUtil.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString())); + } + } + + // read the logical time from the last tablet in the merge range, it is not included in + // the loop above + scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY); + scanner.setRange(new Range(stopRow)); + TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); + for (Entry entry : scanner) { + if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) { + maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString()); + } + } + + if (maxLogicalTime != null) + TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes())); + + if (!m.getUpdates().isEmpty()) { + bw.addMutation(m); + } + + bw.flush(); + + Master.log.debug("Moved " + fileCount + " files to " + stop); + + if (firstPrevRowValue == null) { + Master.log.debug("tablet already merged"); + return; + } + + stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue)); + Mutation updatePrevRow = stop.getPrevRowUpdateMutation(); + Master.log.debug("Setting the prevRow for last tablet: " + stop); + bw.addMutation(updatePrevRow); + bw.flush(); + + deleteTablets(info, scanRange, bw, conn); + + // Clean-up the last chopped marker + m = new Mutation(stopRow); + ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m); + bw.addMutation(m); + bw.flush(); + + } catch (Exception ex) { + throw new AccumuloException(ex); + } finally { + if (bw != null) + try { + bw.close(); + } catch (Exception ex) { + throw new AccumuloException(ex); + } + } + } + + private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException { + Scanner scanner; + Mutation m; + // Delete everything in the other tablets + // group all deletes into tablet into one mutation, this makes tablets + // either disappear entirely or not all.. this is important for the case + // where the process terminates in the loop below... + scanner = conn.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY); + Master.log.debug("Deleting range " + scanRange); + scanner.setRange(scanRange); + RowIterator rowIter = new RowIterator(scanner); + while (rowIter.hasNext()) { + Iterator> row = rowIter.next(); + m = null; + while (row.hasNext()) { + Entry entry = row.next(); + Key key = entry.getKey(); + + if (m == null) + m = new Mutation(key.getRow()); + + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); + Master.log.debug("deleting entry " + key); + } + bw.addMutation(m); + } + + bw.flush(); + } + + private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException { + try { + Connector conn = this.master.getConnector(); + Scanner scanner = conn.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null); + scanner.setRange(new Range(start.getMetadataEntry(), null)); + Iterator> iterator = scanner.iterator(); + if (!iterator.hasNext()) { + throw new AccumuloException("No last tablet for a merge " + range); + } + Entry entry = iterator.next(); + KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue())); + if (highTablet.getTableId() != range.getTableId()) { + throw new AccumuloException("No last tablet for merge " + range + " " + highTablet); + } + return highTablet; + } catch (Exception ex) { + throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex); + } + } + + private void flushChanges(SortedMap currentTServers, List assignments, List assigned, + List assignedToDeadServers, Map unassigned) throws DistributedStoreException, TException { + if (!assignedToDeadServers.isEmpty()) { + int maxServersToShow = min(assignedToDeadServers.size(), 100); + Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "..."); + store.unassign(assignedToDeadServers); + this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size()); + } + + if (!currentTServers.isEmpty()) { + Map assignedOut = new HashMap(); + this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut); + for (Entry assignment : assignedOut.entrySet()) { + if (unassigned.containsKey(assignment.getKey())) { + if (assignment.getValue() != null) { + if (!currentTServers.containsKey(assignment.getValue())) { + Master.log.warn("balancer assigned " + assignment.getKey() + " to a tablet server that is not current " + assignment.getValue() + " ignoring"); + continue; + } + Master.log.debug(store.name() + " assigning tablet " + assignment); + assignments.add(new Assignment(assignment.getKey(), assignment.getValue())); + } + } else { + Master.log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey()); + } + } + if (!unassigned.isEmpty() && assignedOut.isEmpty()) + Master.log.warn("Load balancer failed to assign any tablets"); + } + + if (assignments.size() > 0) { + Master.log.info(String.format("Assigning %d tablets", assignments.size())); + store.setFutureLocations(assignments); + } + assignments.addAll(assigned); + for (Assignment a : assignments) { + TServerConnection conn = this.master.tserverSet.getConnection(a.server); + if (conn != null) { + conn.assignTablet(this.master.masterLock, a.tablet); + } else { + Master.log.warn("Could not connect to server " + a.server); + } + master.assignedTablet(a.tablet); + } + } + +}