accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [4/5] git commit: ACCUMULO-2412 use only pre-existing merge requests before processing the metadata table
Date Wed, 05 Mar 2014 21:13:26 GMT
ACCUMULO-2412 use only pre-existing merge requests before processing the metadata table


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1392e07f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1392e07f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1392e07f

Branch: refs/heads/master
Commit: 1392e07fb5a4d5efe803810bcd888568cc57d9b9
Parents: 4fd8686 b135111
Author: Eric Newton <eric.newton@gmail.com>
Authored: Wed Mar 5 16:12:07 2014 -0500
Committer: Eric Newton <eric.newton@gmail.com>
Committed: Wed Mar 5 16:12:07 2014 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/accumulo/master/Master.java      | 4 ----
 .../java/org/apache/accumulo/master/TabletGroupWatcher.java   | 7 ++++++-
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1392e07f/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 6a75872,0000000..e123b49
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -1,1231 -1,0 +1,1227 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.impl.Namespaces;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
 +import org.apache.accumulo.core.master.thrift.MasterClientService.Processor;
 +import org.apache.accumulo.core.master.thrift.MasterGoalState;
 +import org.apache.accumulo.core.master.thrift.MasterState;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.Credentials;
 +import org.apache.accumulo.core.security.NamespacePermission;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.AgeOffStore;
 +import org.apache.accumulo.fate.Fate;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.accumulo.master.recovery.RecoveryManager;
 +import org.apache.accumulo.master.state.TableCounts;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManager.FileType;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.init.Initialize;
 +import org.apache.accumulo.server.master.LiveTServerSet;
 +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 +import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
 +import org.apache.accumulo.server.master.balancer.TabletBalancer;
 +import org.apache.accumulo.server.master.state.CurrentState;
 +import org.apache.accumulo.server.master.state.DeadServerList;
 +import org.apache.accumulo.server.master.state.MergeInfo;
 +import org.apache.accumulo.server.master.state.MergeState;
 +import org.apache.accumulo.server.master.state.MetaDataStateStore;
 +import org.apache.accumulo.server.master.state.RootTabletStateStore;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.master.state.TabletMigration;
 +import org.apache.accumulo.server.master.state.TabletState;
 +import org.apache.accumulo.server.master.state.ZooStore;
 +import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.security.handler.ZKPermHandler;
 +import org.apache.accumulo.server.tables.TableManager;
 +import org.apache.accumulo.server.tables.TableObserver;
 +import org.apache.accumulo.server.util.DefaultMap;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.accumulo.server.util.TServerUtils;
 +import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.DataInputBuffer;
 +import org.apache.hadoop.io.DataOutputBuffer;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.data.Stat;
 +
 +import com.google.common.collect.Iterables;
 +
 +/**
 + * The Master is responsible for assigning and balancing tablets to tablet servers.
 + * 
 + * The master will also coordinate log recoveries and reports general status.
 + */
 +public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState {
 +
 +  final static Logger log = Logger.getLogger(Master.class);
 +
 +  final static int ONE_SECOND = 1000;
 +  final private static Text METADATA_TABLE_ID = new Text(MetadataTable.ID);
 +  final private static Text ROOT_TABLE_ID = new Text(RootTable.ID);
 +  final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
 +  final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
 +  final static long WAIT_BETWEEN_ERRORS = ONE_SECOND;
 +  final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
 +  final private static int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
 +  final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
 +  final static int MAX_TSERVER_WORK_CHUNK = 5000;
 +  final private static int MAX_BAD_STATUS_COUNT = 3;
 +
 +  final VolumeManager fs;
 +  final private Instance instance;
 +  final private String hostname;
 +  final LiveTServerSet tserverSet;
 +  final private List<TabletGroupWatcher> watchers = new ArrayList<TabletGroupWatcher>();
 +  final SecurityOperation security;
 +  final Map<TServerInstance,AtomicInteger> badServers = Collections.synchronizedMap(new DefaultMap<TServerInstance,AtomicInteger>(new AtomicInteger()));
 +  final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<TServerInstance>());
 +  final SortedMap<KeyExtent,TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
 +  final EventCoordinator nextEvent = new EventCoordinator();
 +  final private Object mergeLock = new Object();
 +  RecoveryManager recoveryManager = null;
 +
 +  ZooLock masterLock = null;
 +  private TServer clientService = null;
 +  TabletBalancer tabletBalancer;
 +
 +  private MasterState state = MasterState.INITIAL;
 +
 +  Fate<Master> fate;
 +
 +  volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections.unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>());
 +
 +  synchronized MasterState getMasterState() {
 +    return state;
 +  }
 +
 +  public boolean stillMaster() {
 +    return getMasterState() != MasterState.STOP;
 +  }
 +
 +  static final boolean X = true;
 +  static final boolean _ = false;
 +  // @formatter:off
 +  static final boolean transitionOK[][] = {
 +      //                              INITIAL HAVE_LOCK SAFE_MODE NORMAL UNLOAD_META UNLOAD_ROOT STOP
 +      /* INITIAL */                   {X,     X,        _,        _,      _,         _,          X},
 +      /* HAVE_LOCK */                 {_,     X,        X,        X,      _,         _,          X},
 +      /* SAFE_MODE */                 {_,     _,        X,        X,      X,         _,          X},
 +      /* NORMAL */                    {_,     _,        X,        X,      X,         _,          X},
 +      /* UNLOAD_METADATA_TABLETS */   {_,     _,        X,        X,      X,         X,          X},
 +      /* UNLOAD_ROOT_TABLET */        {_,     _,        _,        X,      X,         X,          X},
 +      /* STOP */                      {_,     _,        _,        _,      _,         X,          X}};
 +  //@formatter:on
 +  synchronized void setMasterState(MasterState newState) {
 +    if (state.equals(newState))
 +      return;
 +    if (!transitionOK[state.ordinal()][newState.ordinal()]) {
 +      log.error("Programmer error: master should not transition from " + state + " to " + newState);
 +    }
 +    MasterState oldState = state;
 +    state = newState;
 +    nextEvent.event("State changed from %s to %s", oldState, newState);
 +    if (newState == MasterState.STOP) {
 +      // Give the server a little time before shutdown so the client
 +      // thread requesting the stop can return
 +      SimpleTimer.getInstance().schedule(new Runnable() {
 +        @Override
 +        public void run() {
 +          // This frees the main thread and will cause the master to exit
 +          clientService.stop();
 +          Master.this.nextEvent.event("stopped event loop");
 +        }
 +
 +      }, 100l, 1000l);
 +    }
 +
 +    if (oldState != newState && (newState == MasterState.HAVE_LOCK)) {
 +      upgradeZookeeper();
 +    }
 +
 +    if (oldState != newState && (newState == MasterState.NORMAL)) {
 +      upgradeMetadata();
 +    }
 +  }
 +
 +  private void moveRootTabletToRootTable(IZooReaderWriter zoo) throws Exception {
 +    String dirZPath = ZooUtil.getRoot(instance) + RootTable.ZROOT_TABLET_PATH;
 +
 +    if (!zoo.exists(dirZPath)) {
 +      Path oldPath = fs.getFullPath(FileType.TABLE, "/" + MetadataTable.ID + "/root_tablet");
 +      if (fs.exists(oldPath)) {
 +        String newPath = fs.choose(ServerConstants.getTablesDirs()) + "/" + RootTable.ID;
 +        fs.mkdirs(new Path(newPath));
 +        if (!fs.rename(oldPath, new Path(newPath))) {
 +          throw new IOException("Failed to move root tablet from " + oldPath + " to " + newPath);
 +        }
 +
 +        log.info("Upgrade renamed " + oldPath + " to " + newPath);
 +      }
 +
 +      Path location = null;
 +
 +      for (String basePath : ServerConstants.getTablesDirs()) {
 +        Path path = new Path(basePath + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION);
 +        if (fs.exists(path)) {
 +          if (location != null) {
 +            throw new IllegalStateException("Root table at multiple locations " + location + " " + path);
 +          }
 +
 +          location = path;
 +        }
 +      }
 +
 +      if (location == null)
 +        throw new IllegalStateException("Failed to find root tablet");
 +
 +      log.info("Upgrade setting root table location in zookeeper " + location);
 +      zoo.putPersistentData(dirZPath, location.toString().getBytes(), NodeExistsPolicy.FAIL);
 +    }
 +  }
 +
 +  private void upgradeZookeeper() {
 +    // 1.5.1 and 1.6.0 both do some state checking after obtaining the zoolock for the
 +    // monitor and before starting up. It's not tied to the data version at all (and would
 +    // introduce unnecessary complexity to try to make the master do it), but be aware
 +    // that the master is not the only thing that may alter zookeeper before starting.
 +
 +    if (Accumulo.getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
 +      try {
 +        log.info("Upgrading zookeeper");
 +
 +        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +
 +        // create initial namespaces
 +        String namespaces = ZooUtil.getRoot(instance) + Constants.ZNAMESPACES;
 +        zoo.putPersistentData(namespaces, new byte[0], NodeExistsPolicy.SKIP);
 +        for (Pair<String,String> namespace : Iterables.concat(
 +            Collections.singleton(new Pair<String,String>(Namespaces.ACCUMULO_NAMESPACE, Namespaces.ACCUMULO_NAMESPACE_ID)),
 +            Collections.singleton(new Pair<String,String>(Namespaces.DEFAULT_NAMESPACE, Namespaces.DEFAULT_NAMESPACE_ID)))) {
 +          String ns = namespace.getFirst();
 +          String id = namespace.getSecond();
 +          log.debug("Upgrade creating namespace \"" + ns + "\" (ID: " + id + ")");
 +          if (!Namespaces.exists(instance, id))
 +            TableManager.prepareNewNamespaceState(instance.getInstanceID(), id, ns, NodeExistsPolicy.SKIP);
 +        }
 +
 +        // create root table
 +        log.debug("Upgrade creating table " + RootTable.NAME + " (ID: " + RootTable.ID + ")");
 +        TableManager.prepareNewTableState(instance.getInstanceID(), RootTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, TableState.ONLINE,
 +            NodeExistsPolicy.SKIP);
 +        Initialize.initMetadataConfig(RootTable.ID);
 +        // ensure root user can flush root table
 +        security.grantTablePermission(SystemCredentials.get().toThrift(instance), security.getRootUsername(), RootTable.ID, TablePermission.ALTER_TABLE, Namespaces.ACCUMULO_NAMESPACE_ID);
 +
 +        // put existing tables in the correct namespaces
 +        String tables = ZooUtil.getRoot(instance) + Constants.ZTABLES;
 +        for (String tableId : zoo.getChildren(tables)) {
 +          String targetNamespace = (MetadataTable.ID.equals(tableId) || RootTable.ID.equals(tableId)) ? Namespaces.ACCUMULO_NAMESPACE_ID
 +              : Namespaces.DEFAULT_NAMESPACE_ID;
 +          log.debug("Upgrade moving table " + new String(zoo.getData(tables + "/" + tableId + Constants.ZTABLE_NAME, null), Constants.UTF8) + " (ID: "
 +              + tableId + ") into namespace with ID " + targetNamespace);
 +          zoo.putPersistentData(tables + "/" + tableId + Constants.ZTABLE_NAMESPACE, targetNamespace.getBytes(Constants.UTF8), NodeExistsPolicy.SKIP);
 +        }
 +
 +        // rename metadata table
 +        log.debug("Upgrade renaming table " + MetadataTable.OLD_NAME + " (ID: " + MetadataTable.ID + ") to " + MetadataTable.NAME);
 +        zoo.putPersistentData(tables + "/" + MetadataTable.ID + Constants.ZTABLE_NAME, Tables.qualify(MetadataTable.NAME).getSecond().getBytes(Constants.UTF8),
 +            NodeExistsPolicy.OVERWRITE);
 +
 +        moveRootTabletToRootTable(zoo);
 +
 +        // add system namespace permissions to existing users
 +        ZKPermHandler perm = new ZKPermHandler();
 +        perm.initialize(instance.getInstanceID(), true);
 +        String users = ZooUtil.getRoot(instance) + "/users";
 +        for (String user : zoo.getChildren(users)) {
 +          zoo.putPersistentData(users + "/" + user + "/Namespaces", new byte[0], NodeExistsPolicy.SKIP);
 +          perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
 +        }
 +        perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
 +
 +      } catch (Exception ex) {
 +        log.fatal("Error performing upgrade", ex);
 +        System.exit(1);
 +      }
 +    }
 +  }
 +
 +  private final AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false);
 +
 +  private final ServerConfiguration serverConfig;
 +
 +  private void upgradeMetadata() {
 +    if (Accumulo.getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
 +      if (upgradeMetadataRunning.compareAndSet(false, true)) {
 +        Runnable upgradeTask = new Runnable() {
 +          @Override
 +          public void run() {
 +            try {
 +              MetadataTableUtil.moveMetaDeleteMarkers(instance, SystemCredentials.get());
 +              Accumulo.updateAccumuloVersion(fs);
 +
 +              log.info("Upgrade complete");
 +
 +            } catch (Exception ex) {
 +              log.fatal("Error performing upgrade", ex);
 +              System.exit(1);
 +            }
 +
 +          }
 +        };
 +
 +        // need to run this in a separate thread because a lock is held that prevents metadata tablets from being assigned and this task writes to the
 +        // metadata table
 +        new Thread(upgradeTask).start();
 +      }
 +    }
 +  }
 +
 +  private int assignedOrHosted(Text tableId) {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      TableCounts count = watcher.getStats(tableId);
 +      result += count.hosted() + count.assigned();
 +    }
 +    return result;
 +  }
 +
 +  private int totalAssignedOrHosted() {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      for (TableCounts counts : watcher.getStats().values()) {
 +        result += counts.assigned() + counts.hosted();
 +      }
 +    }
 +    return result;
 +  }
 +
 +  private int nonMetaDataTabletsAssignedOrHosted() {
 +    return totalAssignedOrHosted() - assignedOrHosted(new Text(MetadataTable.ID)) - assignedOrHosted(new Text(RootTable.ID));
 +  }
 +
 +  private int notHosted() {
 +    int result = 0;
 +    for (TabletGroupWatcher watcher : watchers) {
 +      for (TableCounts counts : watcher.getStats().values()) {
 +        result += counts.assigned() + counts.assignedToDeadServers();
 +      }
 +    }
 +    return result;
 +  }
 +
 +  // The number of unassigned tablets that should be assigned: displayed on the monitor page
 +  int displayUnassigned() {
 +    int result = 0;
 +    switch (getMasterState()) {
 +      case NORMAL:
 +        // Count offline tablets for online tables
 +        for (TabletGroupWatcher watcher : watchers) {
 +          TableManager manager = TableManager.getInstance();
 +          for (Entry<Text,TableCounts> entry : watcher.getStats().entrySet()) {
 +            Text tableId = entry.getKey();
 +            TableCounts counts = entry.getValue();
 +            TableState tableState = manager.getTableState(tableId.toString());
 +            if (tableState != null && tableState.equals(TableState.ONLINE)) {
 +              result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned();
 +            }
 +          }
 +        }
 +        break;
 +      case SAFE_MODE:
 +        // Count offline tablets for the metadata table
 +        for (TabletGroupWatcher watcher : watchers) {
 +          result += watcher.getStats(METADATA_TABLE_ID).unassigned();
 +        }
 +        break;
 +      case UNLOAD_METADATA_TABLETS:
 +      case UNLOAD_ROOT_TABLET:
 +        for (TabletGroupWatcher watcher : watchers) {
 +          result += watcher.getStats(METADATA_TABLE_ID).unassigned();
 +        }
 +        break;
 +      default:
 +        break;
 +    }
 +    return result;
 +  }
 +
 +  public void mustBeOnline(final String tableId) throws ThriftTableOperationException {
 +    Tables.clearCache(instance);
 +    if (!Tables.getTableState(instance, tableId).equals(TableState.ONLINE))
 +      throw new ThriftTableOperationException(tableId, null, TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not online");
 +  }
 +
 +  public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
 +    return instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken());
 +  }
 +
 +  private Master(ServerConfiguration config, VolumeManager fs, String hostname) throws IOException {
 +    this.serverConfig = config;
 +    this.instance = config.getInstance();
 +    this.fs = fs;
 +    this.hostname = hostname;
 +
 +    AccumuloConfiguration aconf = serverConfig.getConfiguration();
 +
 +    log.info("Version " + Constants.VERSION);
 +    log.info("Instance " + instance.getInstanceID());
 +    ThriftTransportPool.getInstance().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +    security = AuditedSecurityOperation.getInstance();
 +    tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this);
 +    this.tabletBalancer = aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer());
 +    this.tabletBalancer.init(serverConfig);
 +  }
 +
 +  public TServerConnection getConnection(TServerInstance server) {
 +    return tserverSet.getConnection(server);
 +  }
 +
-   public MergeInfo getMergeInfo(KeyExtent tablet) {
-     return getMergeInfo(tablet.getTableId());
-   }
- 
 +  public MergeInfo getMergeInfo(Text tableId) {
 +    synchronized (mergeLock) {
 +      try {
 +        String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
 +        if (!ZooReaderWriter.getInstance().exists(path))
 +          return new MergeInfo();
 +        byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat());
 +        DataInputBuffer in = new DataInputBuffer();
 +        in.reset(data, data.length);
 +        MergeInfo info = new MergeInfo();
 +        info.readFields(in);
 +        return info;
 +      } catch (KeeperException.NoNodeException ex) {
 +        log.info("Error reading merge state, it probably just finished");
 +        return new MergeInfo();
 +      } catch (Exception ex) {
 +        log.warn("Unexpected error reading merge state", ex);
 +        return new MergeInfo();
 +      }
 +    }
 +  }
 +
 +  public void setMergeState(MergeInfo info, MergeState state) throws IOException, KeeperException, InterruptedException {
 +    synchronized (mergeLock) {
 +      String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + info.getExtent().getTableId().toString() + "/merge";
 +      info.setState(state);
 +      if (state.equals(MergeState.NONE)) {
 +        ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP);
 +      } else {
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        try {
 +          info.write(out);
 +        } catch (IOException ex) {
 +          throw new RuntimeException("Unlikely", ex);
 +        }
 +        ZooReaderWriter.getInstance().putPersistentData(path, out.getData(),
 +            state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL : ZooUtil.NodeExistsPolicy.OVERWRITE);
 +      }
 +      mergeLock.notifyAll();
 +    }
 +    nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
 +  }
 +
 +  public void clearMergeState(Text tableId) throws IOException, KeeperException, InterruptedException {
 +    synchronized (mergeLock) {
 +      String path = ZooUtil.getRoot(instance.getInstanceID()) + Constants.ZTABLES + "/" + tableId.toString() + "/merge";
 +      ZooReaderWriter.getInstance().recursiveDelete(path, NodeMissingPolicy.SKIP);
 +      mergeLock.notifyAll();
 +    }
 +    nextEvent.event("Merge state of %s cleared", tableId);
 +  }
 +
 +  void setMasterGoalState(MasterGoalState state) {
 +    try {
 +      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, state.name().getBytes(),
 +          NodeExistsPolicy.OVERWRITE);
 +    } catch (Exception ex) {
 +      log.error("Unable to set master goal state in zookeeper");
 +    }
 +  }
 +
 +  MasterGoalState getMasterGoalState() {
 +    while (true)
 +      try {
 +        byte[] data = ZooReaderWriter.getInstance().getData(ZooUtil.getRoot(instance) + Constants.ZMASTER_GOAL_STATE, null);
 +        return MasterGoalState.valueOf(new String(data));
 +      } catch (Exception e) {
 +        log.error("Problem getting real goal state: " + e);
 +        UtilWaitThread.sleep(1000);
 +      }
 +  }
 +
 +  public boolean hasCycled(long time) {
 +    for (TabletGroupWatcher watcher : watchers) {
 +      if (watcher.stats.lastScanFinished() < time)
 +        return false;
 +    }
 +
 +    return true;
 +  }
 +
 +  public void clearMigrations(String tableId) {
 +    synchronized (migrations) {
 +      Iterator<KeyExtent> iterator = migrations.keySet().iterator();
 +      while (iterator.hasNext()) {
 +        KeyExtent extent = iterator.next();
 +        if (extent.getTableId().toString().equals(tableId)) {
 +          iterator.remove();
 +        }
 +      }
 +    }
 +  }
 +
 +  static enum TabletGoalState {
 +    HOSTED, UNASSIGNED, DELETED
 +  };
 +
 +  TabletGoalState getSystemGoalState(TabletLocationState tls) {
 +    switch (getMasterState()) {
 +      case NORMAL:
 +        return TabletGoalState.HOSTED;
 +      case HAVE_LOCK: // fall-through intended
 +      case INITIAL: // fall-through intended
 +      case SAFE_MODE:
 +        if (tls.extent.isMeta())
 +          return TabletGoalState.HOSTED;
 +        return TabletGoalState.UNASSIGNED;
 +      case UNLOAD_METADATA_TABLETS:
 +        if (tls.extent.isRootTablet())
 +          return TabletGoalState.HOSTED;
 +        return TabletGoalState.UNASSIGNED;
 +      case UNLOAD_ROOT_TABLET:
 +        return TabletGoalState.UNASSIGNED;
 +      case STOP:
 +        return TabletGoalState.UNASSIGNED;
 +      default:
 +        throw new IllegalStateException("Unknown Master State");
 +    }
 +  }
 +
 +  TabletGoalState getTableGoalState(KeyExtent extent) {
 +    TableState tableState = TableManager.getInstance().getTableState(extent.getTableId().toString());
 +    if (tableState == null)
 +      return TabletGoalState.DELETED;
 +    switch (tableState) {
 +      case DELETING:
 +        return TabletGoalState.DELETED;
 +      case OFFLINE:
 +      case NEW:
 +        return TabletGoalState.UNASSIGNED;
 +      default:
 +        return TabletGoalState.HOSTED;
 +    }
 +  }
 +
 +  TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) {
 +    KeyExtent extent = tls.extent;
 +    // Shutting down?
 +    TabletGoalState state = getSystemGoalState(tls);
 +    if (state == TabletGoalState.HOSTED) {
 +      if (tls.current != null && serversToShutdown.contains(tls.current)) {
 +        return TabletGoalState.UNASSIGNED;
 +      }
 +      // Handle merge transitions
 +      if (mergeInfo.getExtent() != null) {
 +        log.debug("mergeInfo overlaps: " + extent + " " + mergeInfo.overlaps(extent));
 +        if (mergeInfo.overlaps(extent)) {
 +          switch (mergeInfo.getState()) {
 +            case NONE:
 +            case COMPLETE:
 +              break;
 +            case STARTED:
 +            case SPLITTING:
 +              return TabletGoalState.HOSTED;
 +            case WAITING_FOR_CHOPPED:
 +              if (tls.getState(onlineTabletServers()).equals(TabletState.HOSTED)) {
 +                if (tls.chopped)
 +                  return TabletGoalState.UNASSIGNED;
 +              } else {
 +                if (tls.chopped && tls.walogs.isEmpty())
 +                  return TabletGoalState.UNASSIGNED;
 +              }
 +
 +              return TabletGoalState.HOSTED;
 +            case WAITING_FOR_OFFLINE:
 +            case MERGING:
 +              return TabletGoalState.UNASSIGNED;
 +          }
 +        }
 +      }
 +
 +      // taking table offline?
 +      state = getTableGoalState(extent);
 +      if (state == TabletGoalState.HOSTED) {
 +        // Maybe this tablet needs to be migrated
 +        TServerInstance dest = migrations.get(extent);
 +        if (dest != null && tls.current != null && !dest.equals(tls.current)) {
 +          return TabletGoalState.UNASSIGNED;
 +        }
 +      }
 +    }
 +    return state;
 +  }
 +
 +  private class MigrationCleanupThread extends Daemon {
 +
 +    @Override
 +    public void run() {
 +      setName("Migration Cleanup Thread");
 +      while (stillMaster()) {
 +        if (!migrations.isEmpty()) {
 +          try {
 +            cleanupMutations();
 +          } catch (Exception ex) {
 +            log.error("Error cleaning up migrations", ex);
 +          }
 +        }
 +        UtilWaitThread.sleep(TIME_BETWEEN_MIGRATION_CLEANUPS);
 +      }
 +    }
 +
 +    // If a migrating tablet splits, and the tablet dies before sending the
 +    // master a message, the migration will refer to a non-existing tablet,
 +    // so it can never complete. Periodically scan the metadata table and
 +    // remove any migrating tablets that no longer exist.
 +    private void cleanupMutations() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +      Connector connector = getConnector();
 +      Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +      Set<KeyExtent> found = new HashSet<KeyExtent>();
 +      for (Entry<Key,Value> entry : scanner) {
 +        KeyExtent extent = new KeyExtent(entry.getKey().getRow(), entry.getValue());
 +        if (migrations.containsKey(extent)) {
 +          found.add(extent);
 +        }
 +      }
 +      migrations.keySet().retainAll(found);
 +    }
 +  }
 +
 +  private class StatusThread extends Daemon {
 +
 +    @Override
 +    public void run() {
 +      setName("Status Thread");
 +      EventCoordinator.Listener eventListener = nextEvent.getListener();
 +      while (stillMaster()) {
 +        long wait = DEFAULT_WAIT_FOR_WATCHER;
 +        try {
 +          switch (getMasterGoalState()) {
 +            case NORMAL:
 +              setMasterState(MasterState.NORMAL);
 +              break;
 +            case SAFE_MODE:
 +              if (getMasterState() == MasterState.NORMAL) {
 +                setMasterState(MasterState.SAFE_MODE);
 +              }
 +              if (getMasterState() == MasterState.HAVE_LOCK) {
 +                setMasterState(MasterState.SAFE_MODE);
 +              }
 +              break;
 +            case CLEAN_STOP:
 +              switch (getMasterState()) {
 +                case NORMAL:
 +                  setMasterState(MasterState.SAFE_MODE);
 +                  break;
 +                case SAFE_MODE: {
 +                  int count = nonMetaDataTabletsAssignedOrHosted();
 +                  log.debug(String.format("There are %d non-metadata tablets assigned or hosted", count));
 +                  if (count == 0)
 +                    setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
 +                }
 +                  break;
 +                case UNLOAD_METADATA_TABLETS: {
 +                  int count = assignedOrHosted(METADATA_TABLE_ID);
 +                  log.debug(String.format("There are %d metadata tablets assigned or hosted", count));
 +                  if (count == 0)
 +                    setMasterState(MasterState.UNLOAD_ROOT_TABLET);
 +                }
 +                  break;
 +                case UNLOAD_ROOT_TABLET: {
 +                  int count = assignedOrHosted(METADATA_TABLE_ID);
 +                  if (count > 0) {
 +                    log.debug(String.format("%d metadata tablets online", count));
 +                    setMasterState(MasterState.UNLOAD_ROOT_TABLET);
 +                  }
 +                  int root_count = assignedOrHosted(ROOT_TABLE_ID);
 +                  if (root_count > 0)
 +                    log.debug("The root tablet is still assigned or hosted");
 +                  if (count + root_count == 0) {
 +                    Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
 +                    log.debug("stopping " + currentServers.size() + " tablet servers");
 +                    for (TServerInstance server : currentServers) {
 +                      try {
 +                        serversToShutdown.add(server);
 +                        tserverSet.getConnection(server).fastHalt(masterLock);
 +                      } catch (TException e) {
 +                        // its probably down, and we don't care
 +                      } finally {
 +                        tserverSet.remove(server);
 +                      }
 +                    }
 +                    if (currentServers.size() == 0)
 +                      setMasterState(MasterState.STOP);
 +                  }
 +                }
 +                  break;
 +                default:
 +                  break;
 +              }
 +          }
 +          wait = updateStatus();
 +          eventListener.waitForEvents(wait);
 +        } catch (Throwable t) {
 +          log.error("Error balancing tablets", t);
 +          UtilWaitThread.sleep(WAIT_BETWEEN_ERRORS);
 +        }
 +      }
 +    }
 +
 +    private long updateStatus() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +      tserverStatus = Collections.synchronizedSortedMap(gatherTableInformation());
 +      checkForHeldServer(tserverStatus);
 +
 +      if (!badServers.isEmpty()) {
 +        log.debug("not balancing because the balance information is out-of-date " + badServers.keySet());
 +      } else if (notHosted() > 0) {
 +        log.debug("not balancing because there are unhosted tablets");
 +      } else if (getMasterGoalState() == MasterGoalState.CLEAN_STOP) {
 +        log.debug("not balancing because the master is attempting to stop cleanly");
 +      } else if (!serversToShutdown.isEmpty()) {
 +        log.debug("not balancing while shutting down servers " + serversToShutdown);
 +      } else {
 +        return balanceTablets();
 +      }
 +      return DEFAULT_WAIT_FOR_WATCHER;
 +    }
 +
 +    private void checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> tserverStatus) {
 +      TServerInstance instance = null;
 +      int crazyHoldTime = 0;
 +      int someHoldTime = 0;
 +      final long maxWait = getSystemConfiguration().getTimeInMillis(Property.TSERV_HOLD_TIME_SUICIDE);
 +      for (Entry<TServerInstance,TabletServerStatus> entry : tserverStatus.entrySet()) {
 +        if (entry.getValue().getHoldTime() > 0) {
 +          someHoldTime++;
 +          if (entry.getValue().getHoldTime() > maxWait) {
 +            instance = entry.getKey();
 +            crazyHoldTime++;
 +          }
 +        }
 +      }
 +      if (crazyHoldTime == 1 && someHoldTime == 1 && tserverStatus.size() > 1) {
 +        log.warn("Tablet server " + instance + " exceeded maximum hold time: attempting to kill it");
 +        try {
 +          TServerConnection connection = tserverSet.getConnection(instance);
 +          if (connection != null)
 +            connection.fastHalt(masterLock);
 +        } catch (TException e) {
 +          log.error(e, e);
 +        }
 +        tserverSet.remove(instance);
 +      }
 +    }
 +
 +    private long balanceTablets() {
 +      List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
 +      Set<KeyExtent> migrationsCopy = new HashSet<KeyExtent>();
 +      synchronized (migrations) {
 +        migrationsCopy.addAll(migrations.keySet());
 +      }
 +      long wait = tabletBalancer.balance(Collections.unmodifiableSortedMap(tserverStatus), Collections.unmodifiableSet(migrationsCopy), migrationsOut);
 +
 +      for (TabletMigration m : TabletBalancer.checkMigrationSanity(tserverStatus.keySet(), migrationsOut)) {
 +        if (migrations.containsKey(m.tablet)) {
 +          log.warn("balancer requested migration more than once, skipping " + m);
 +          continue;
 +        }
 +        migrations.put(m.tablet, m.newServer);
 +        log.debug("migration " + m);
 +      }
 +      if (migrationsOut.size() > 0) {
 +        nextEvent.event("Migrating %d more tablets, %d total", migrationsOut.size(), migrations.size());
 +      }
 +      return wait;
 +    }
 +
 +  }
 +
 +  private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation() {
 +    long start = System.currentTimeMillis();
 +    SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<TServerInstance,TabletServerStatus>();
 +    Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
 +    for (TServerInstance server : currentServers) {
 +      try {
 +        Thread t = Thread.currentThread();
 +        String oldName = t.getName();
 +        try {
 +          t.setName("Getting status from " + server);
 +          TServerConnection connection = tserverSet.getConnection(server);
 +          if (connection == null)
 +            throw new IOException("No connection to " + server);
 +          TabletServerStatus status = connection.getTableMap(false);
 +          result.put(server, status);
 +        } finally {
 +          t.setName(oldName);
 +        }
 +      } catch (Exception ex) {
 +        log.error("unable to get tablet server status " + server + " " + ex.toString());
 +        log.debug("unable to get tablet server status " + server, ex);
 +        if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
 +          log.warn("attempting to stop " + server);
 +          try {
 +            TServerConnection connection = tserverSet.getConnection(server);
 +            if (connection != null)
 +              connection.halt(masterLock);
 +          } catch (TTransportException e) {
 +            // ignore: it's probably down
 +          } catch (Exception e) {
 +            log.info("error talking to troublesome tablet server ", e);
 +          }
 +          badServers.remove(server);
 +          tserverSet.remove(server);
 +        }
 +      }
 +    }
 +    synchronized (badServers) {
 +      badServers.keySet().retainAll(currentServers);
 +      badServers.keySet().removeAll(result.keySet());
 +    }
 +    log.debug(String.format("Finished gathering information from %d servers in %.2f seconds", result.size(), (System.currentTimeMillis() - start) / 1000.));
 +    return result;
 +  }
 +
 +  public void run() throws IOException, InterruptedException, KeeperException {
 +    final String zroot = ZooUtil.getRoot(instance);
 +
 +    getMasterLock(zroot + Constants.ZMASTER_LOCK);
 +
 +    recoveryManager = new RecoveryManager(this);
 +
 +    TableManager.getInstance().addObserver(this);
 +
 +    StatusThread statusThread = new StatusThread();
 +    statusThread.start();
 +
 +    MigrationCleanupThread migrationCleanupThread = new MigrationCleanupThread();
 +    migrationCleanupThread.start();
 +
 +    tserverSet.startListeningForTabletServerChanges();
 +
 +    try {
 +      final AgeOffStore<Master> store = new AgeOffStore<Master>(new org.apache.accumulo.fate.ZooStore<Master>(ZooUtil.getRoot(instance) + Constants.ZFATE,
 +          ZooReaderWriter.getRetryingInstance()), 1000 * 60 * 60 * 8);
 +
 +      int threads = this.getConfiguration().getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE);
 +
 +      fate = new Fate<Master>(this, store, threads);
 +
 +      SimpleTimer.getInstance().schedule(new Runnable() {
 +
 +        @Override
 +        public void run() {
 +          store.ageOff();
 +        }
 +      }, 63000, 63000);
 +    } catch (KeeperException e) {
 +      throw new IOException(e);
 +    } catch (InterruptedException e) {
 +      throw new IOException(e);
 +    }
 +
 +    ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
 +      @Override
 +      public void process(WatchedEvent event) {
 +        nextEvent.event("Noticed recovery changes", event.getType());
 +        try {
 +          // watcher only fires once, add it back
 +          ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, this);
 +        } catch (Exception e) {
 +          log.error("Failed to add log recovery watcher back", e);
 +        }
 +      }
 +    });
 +
 +    Credentials systemCreds = SystemCredentials.get();
 +    watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(instance, systemCreds, this), null));
 +    watchers.add(new TabletGroupWatcher(this, new RootTabletStateStore(instance, systemCreds, this), watchers.get(0)));
 +    watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1)));
 +    for (TabletGroupWatcher watcher : watchers) {
 +      watcher.start();
 +    }
 +
 +    Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler(this)));
 +    ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master",
 +        "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
 +    clientService = sa.server;
 +    String address = sa.address.toString();
 +    log.info("Setting master lock data to " + address);
 +    masterLock.replaceLockData(address.getBytes());
 +
 +    while (!clientService.isServing()) {
 +      UtilWaitThread.sleep(100);
 +    }
 +    while (clientService.isServing()) {
 +      UtilWaitThread.sleep(500);
 +    }
 +    log.info("Shutting down fate.");
 +    fate.shutdown();
 +
 +    final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
 +    statusThread.join(remaining(deadline));
 +
 +    // quit, even if the tablet servers somehow jam up and the watchers
 +    // don't stop
 +    for (TabletGroupWatcher watcher : watchers) {
 +      watcher.join(remaining(deadline));
 +    }
 +    log.info("exiting");
 +  }
 +
 +  private long remaining(long deadline) {
 +    return Math.max(1, deadline - System.currentTimeMillis());
 +  }
 +
 +  public ZooLock getMasterLock() {
 +    return masterLock;
 +  }
 +
 +  private static class MasterLockWatcher implements ZooLock.AsyncLockWatcher {
 +
 +    boolean acquiredLock = false;
 +    boolean failedToAcquireLock = false;
 +
 +    @Override
 +    public void lostLock(LockLossReason reason) {
 +      Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
 +    }
 +
 +    @Override
 +    public void unableToMonitorLockNode(final Throwable e) {
 +      Halt.halt(-1, new Runnable() {
 +        @Override
 +        public void run() {
 +          log.fatal("No longer able to monitor master lock node", e);
 +        }
 +      });
 +
 +    }
 +
 +    @Override
 +    public synchronized void acquiredLock() {
 +      log.debug("Acquired master lock");
 +
 +      if (acquiredLock || failedToAcquireLock) {
 +        Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1);
 +      }
 +
 +      acquiredLock = true;
 +      notifyAll();
 +    }
 +
 +    @Override
 +    public synchronized void failedToAcquireLock(Exception e) {
 +      log.warn("Failed to get master lock " + e);
 +
 +      if (acquiredLock) {
 +        Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1);
 +      }
 +
 +      failedToAcquireLock = true;
 +      notifyAll();
 +    }
 +
 +    public synchronized void waitForChange() {
 +      while (!acquiredLock && !failedToAcquireLock) {
 +        try {
 +          wait();
 +        } catch (InterruptedException e) {}
 +      }
 +    }
 +  }
 +
 +  private void getMasterLock(final String zMasterLoc) throws KeeperException, InterruptedException {
 +    log.info("trying to get master lock");
 +
 +    final String masterClientAddress = hostname + ":" + getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT);
 +
 +    while (true) {
 +
 +      MasterLockWatcher masterLockWatcher = new MasterLockWatcher();
 +      masterLock = new ZooLock(zMasterLoc);
 +      masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes());
 +
 +      masterLockWatcher.waitForChange();
 +
 +      if (masterLockWatcher.acquiredLock) {
 +        break;
 +      }
 +
 +      if (!masterLockWatcher.failedToAcquireLock) {
 +        throw new IllegalStateException("master lock in unknown state");
 +      }
 +
 +      masterLock.tryToCancelAsyncLockOrUnlock();
 +
 +      UtilWaitThread.sleep(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS);
 +    }
 +
 +    setMasterState(MasterState.HAVE_LOCK);
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    try {
 +      SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
 +
 +      VolumeManager fs = VolumeManagerImpl.get();
 +      ServerOpts opts = new ServerOpts();
 +      opts.parseArgs("master", args);
 +      String hostname = opts.getAddress();
 +      Instance instance = HdfsZooInstance.getInstance();
 +      ServerConfiguration conf = new ServerConfiguration(instance);
 +      Accumulo.init(fs, conf, "master");
 +      Master master = new Master(conf, fs, hostname);
 +      Accumulo.enableTracing(hostname, "master");
 +      master.run();
 +    } catch (Exception ex) {
 +      log.error("Unexpected exception, exiting", ex);
 +      System.exit(1);
 +    }
 +  }
 +
 +  @Override
 +  public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
 +    DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS);
 +    if (added.size() > 0) {
 +      log.info("New servers: " + added);
 +      for (TServerInstance up : added)
 +        obit.delete(up.hostPort());
 +    }
 +    for (TServerInstance dead : deleted) {
 +      String cause = "unexpected failure";
 +      if (serversToShutdown.contains(dead))
 +        cause = "clean shutdown"; // maybe an incorrect assumption
 +      if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP))
 +        obit.post(dead.hostPort(), cause);
 +    }
 +
 +    Set<TServerInstance> unexpected = new HashSet<TServerInstance>(deleted);
 +    unexpected.removeAll(this.serversToShutdown);
 +    if (unexpected.size() > 0) {
 +      if (stillMaster() && !getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) {
 +        log.warn("Lost servers " + unexpected);
 +      }
 +    }
 +    serversToShutdown.removeAll(deleted);
 +    badServers.keySet().removeAll(deleted);
 +    // clear out any bad server with the same host/port as a new server
 +    synchronized (badServers) {
 +      cleanListByHostAndPort(badServers.keySet(), deleted, added);
 +    }
 +    synchronized (serversToShutdown) {
 +      cleanListByHostAndPort(serversToShutdown, deleted, added);
 +    }
 +
 +    synchronized (migrations) {
 +      Iterator<Entry<KeyExtent,TServerInstance>> iter = migrations.entrySet().iterator();
 +      while (iter.hasNext()) {
 +        Entry<KeyExtent,TServerInstance> entry = iter.next();
 +        if (deleted.contains(entry.getValue())) {
 +          log.info("Canceling migration of " + entry.getKey() + " to " + entry.getValue());
 +          iter.remove();
 +        }
 +      }
 +    }
 +    nextEvent.event("There are now %d tablet servers", current.size());
 +  }
 +
 +  private static void cleanListByHostAndPort(Collection<TServerInstance> badServers, Set<TServerInstance> deleted, Set<TServerInstance> added) {
 +    Iterator<TServerInstance> badIter = badServers.iterator();
 +    while (badIter.hasNext()) {
 +      TServerInstance bad = badIter.next();
 +      for (TServerInstance add : added) {
 +        if (bad.hostPort().equals(add.hostPort())) {
 +          badIter.remove();
 +          break;
 +        }
 +      }
 +      for (TServerInstance del : deleted) {
 +        if (bad.hostPort().equals(del.hostPort())) {
 +          badIter.remove();
 +          break;
 +        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void stateChanged(String tableId, TableState state) {
 +    nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state);
 +  }
 +
 +  @Override
 +  public void initialize(Map<String,TableState> tableIdToStateMap) {}
 +
 +  @Override
 +  public void sessionExpired() {}
 +
 +  @Override
 +  public Set<String> onlineTables() {
 +    Set<String> result = new HashSet<String>();
 +    if (getMasterState() != MasterState.NORMAL) {
 +      if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS)
 +        result.add(MetadataTable.ID);
 +      if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET)
 +        result.add(RootTable.ID);
 +      return result;
 +    }
 +    TableManager manager = TableManager.getInstance();
 +
 +    for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
 +      TableState state = manager.getTableState(tableId);
 +      if (state != null) {
 +        if (state == TableState.ONLINE)
 +          result.add(tableId);
 +      }
 +    }
 +    return result;
 +  }
 +
 +  @Override
 +  public Set<TServerInstance> onlineTabletServers() {
 +    return tserverSet.getCurrentServers();
 +  }
 +
 +  @Override
 +  public Collection<MergeInfo> merges() {
 +    List<MergeInfo> result = new ArrayList<MergeInfo>();
 +    for (String tableId : Tables.getIdToNameMap(instance).keySet()) {
 +      result.add(getMergeInfo(new Text(tableId)));
 +    }
 +    return result;
 +  }
 +
 +  // recovers state from the persistent transaction to shutdown a server
 +  public void shutdownTServer(TServerInstance server) {
 +    nextEvent.event("Tablet Server shutdown requested for %s", server);
 +    serversToShutdown.add(server);
 +  }
 +
 +  public EventCoordinator getEventCoordinator() {
 +    return nextEvent;
 +  }
 +
 +  public Instance getInstance() {
 +    return this.instance;
 +  }
 +
 +  public AccumuloConfiguration getSystemConfiguration() {
 +    return serverConfig.getConfiguration();
 +  }
 +
 +  public ServerConfiguration getConfiguration() {
 +    return serverConfig;
 +  }
 +
 +  public VolumeManager getFileSystem() {
 +    return this.fs;
 +  }
 +
 +  public void assignedTablet(KeyExtent extent) {
 +    if (extent.isMeta()) {
 +      if (getMasterState().equals(MasterState.UNLOAD_ROOT_TABLET)) {
 +        setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
 +      }
 +    }
 +    if (extent.isRootTablet()) {
 +      // probably too late, but try anyhow
 +      if (getMasterState().equals(MasterState.STOP)) {
 +        setMasterState(MasterState.UNLOAD_ROOT_TABLET);
 +      }
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1392e07f/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 1590348,0000000..cc52e45
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@@ -1,742 -1,0 +1,747 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master;
 +
 +import static java.lang.Math.min;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.TreeSet;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.master.Master.TabletGoalState;
 +import org.apache.accumulo.master.state.MergeStats;
 +import org.apache.accumulo.master.state.TableCounts;
 +import org.apache.accumulo.master.state.TableStats;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.fs.FileRef;
 +import org.apache.accumulo.server.fs.VolumeManager.FileType;
 +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 +import org.apache.accumulo.server.master.state.Assignment;
 +import org.apache.accumulo.server.master.state.ClosableIterator;
 +import org.apache.accumulo.server.master.state.DistributedStoreException;
 +import org.apache.accumulo.server.master.state.MergeInfo;
 +import org.apache.accumulo.server.master.state.MergeState;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
 +import org.apache.accumulo.server.master.state.TabletState;
 +import org.apache.accumulo.server.master.state.TabletStateStore;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.tables.TableManager;
 +import org.apache.accumulo.server.tablets.TabletTime;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.hadoop.io.Text;
 +import org.apache.thrift.TException;
 +
 +import com.google.common.collect.Iterators;
 +
 +class TabletGroupWatcher extends Daemon {
 +  
 +  private final Master master;
 +  final TabletStateStore store;
 +  final TabletGroupWatcher dependentWatcher;
 +  
 +  final TableStats stats = new TableStats();
 +  
 +  TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) {
 +    this.master = master;
 +    this.store = store;
 +    this.dependentWatcher = dependentWatcher;
 +  }
 +  
 +  Map<Text,TableCounts> getStats() {
 +    return stats.getLast();
 +  }
 +  
 +  TableCounts getStats(Text tableId) {
 +    return stats.getLast(tableId);
 +  }
 +  
 +  @Override
 +  public void run() {
 +    
 +    Thread.currentThread().setName("Watching " + store.name());
 +    int[] oldCounts = new int[TabletState.values().length];
 +    EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
 +    
 +    while (this.master.stillMaster()) {
 +      // slow things down a little, otherwise we spam the logs when there are many wake-up events
 +      UtilWaitThread.sleep(100);
 +
 +      int totalUnloaded = 0;
 +      int unloaded = 0;
 +      ClosableIterator<TabletLocationState> iter = null;
 +      try {
 +        Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
++        for (MergeInfo merge : master.merges()) {
++          if (merge.getExtent() != null) {
++            mergeStatsCache.put(merge.getExtent().getTableId(), new MergeStats(merge));
++          }
++        }
 +        
 +        // Get the current status for the current list of tservers
 +        SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
 +        for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) {
 +          currentTServers.put(entry, this.master.tserverStatus.get(entry));
 +        }
 +        
 +        if (currentTServers.size() == 0) {
 +          eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
 +          continue;
 +        }
 +        
 +        // Don't move tablets to servers that are shutting down
 +        SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers);
 +        destinations.keySet().removeAll(this.master.serversToShutdown);
 +        
 +        List<Assignment> assignments = new ArrayList<Assignment>();
 +        List<Assignment> assigned = new ArrayList<Assignment>();
 +        List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
 +        Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
 +        
 +        int[] counts = new int[TabletState.values().length];
 +        stats.begin();
 +        // Walk through the tablets in our store, and work tablets
 +        // towards their goal
 +        iter = store.iterator();
 +        while (iter.hasNext()) {
 +          TabletLocationState tls = iter.next();
 +          if (tls == null) {
 +            continue;
 +          }
 +          // ignore entries for tables that do not exist in zookeeper
 +          if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
 +            continue;
 +          
 +          if (Master.log.isTraceEnabled())
 +            Master.log.trace(tls + " walogs " + tls.walogs.size());
 +                    
 +          // Don't overwhelm the tablet servers with work
 +          if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
 +            flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
 +            assignments.clear();
 +            assigned.clear();
 +            assignedToDeadServers.clear();
 +            unassigned.clear();
 +            unloaded = 0;
 +            eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
 +          }
 +          Text tableId = tls.extent.getTableId();
 +          MergeStats mergeStats = mergeStatsCache.get(tableId);
 +          if (mergeStats == null) {
-             mergeStatsCache.put(tableId, mergeStats = new MergeStats(this.master.getMergeInfo(tls.extent)));
++            mergeStatsCache.put(tableId, mergeStats = new MergeStats(new MergeInfo()));
 +          }
 +          TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
 +          TServerInstance server = tls.getServer();
 +          TabletState state = tls.getState(currentTServers.keySet());
 +          if (Master.log.isTraceEnabled())
 +            Master.log.trace("Goal state " + goal + " current " + state);
 +          stats.update(tableId, state);
 +          mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
 +          sendChopRequest(mergeStats.getMergeInfo(), state, tls);
 +          sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
 +          
 +          // Always follow through with assignments
 +          if (state == TabletState.ASSIGNED) {
 +            goal = TabletGoalState.HOSTED;
 +          }
 +          
 +          // if we are shutting down all the tabletservers, we have to do it in order
 +          if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) {
 +            if (this.master.serversToShutdown.equals(currentTServers.keySet())) {
 +              if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) {
 +                goal = TabletGoalState.HOSTED;
 +              }
 +            }
 +          }
 +          
 +          if (goal == TabletGoalState.HOSTED) {
 +            if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
 +              if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs))
 +                continue;
 +            }
 +            switch (state) {
 +              case HOSTED:
 +                if (server.equals(this.master.migrations.get(tls.extent)))
 +                  this.master.migrations.remove(tls.extent);
 +                break;
 +              case ASSIGNED_TO_DEAD_SERVER:
 +                assignedToDeadServers.add(tls);
 +                if (server.equals(this.master.migrations.get(tls.extent)))
 +                  this.master.migrations.remove(tls.extent);
 +                // log.info("Current servers " + currentTServers.keySet());
 +                break;
 +              case UNASSIGNED:
 +                // maybe it's a finishing migration
 +                TServerInstance dest = this.master.migrations.get(tls.extent);
 +                if (dest != null) {
 +                  // if destination is still good, assign it
 +                  if (destinations.keySet().contains(dest)) {
 +                    assignments.add(new Assignment(tls.extent, dest));
 +                  } else {
 +                    // get rid of this migration
 +                    this.master.migrations.remove(tls.extent);
 +                    unassigned.put(tls.extent, server);
 +                  }
 +                } else {
 +                  unassigned.put(tls.extent, server);
 +                }
 +                break;
 +              case ASSIGNED:
 +                // Send another reminder
 +                assigned.add(new Assignment(tls.extent, tls.future));
 +                break;
 +            }
 +          } else {
 +            switch (state) {
 +              case UNASSIGNED:
 +                break;
 +              case ASSIGNED_TO_DEAD_SERVER:
 +                assignedToDeadServers.add(tls);
 +                // log.info("Current servers " + currentTServers.keySet());
 +                break;
 +              case HOSTED:
 +                TServerConnection conn = this.master.tserverSet.getConnection(server);
 +                if (conn != null) {
 +                  conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED);
 +                  unloaded++;
 +                  totalUnloaded++;
 +                } else {
 +                  Master.log.warn("Could not connect to server " + server);
 +                }
 +                break;
 +              case ASSIGNED:
 +                break;
 +            }
 +          }
 +          counts[state.ordinal()]++;
 +        }
 +        
 +        flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
 +        
 +        // provide stats after flushing changes to avoid race conditions w/ delete table
 +        stats.end();
 +        
 +        // Report changes
 +        for (TabletState state : TabletState.values()) {
 +          int i = state.ordinal();
 +          if (counts[i] > 0 && counts[i] != oldCounts[i]) {
 +            this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name());
 +          }
 +        }
 +        Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.));
 +        oldCounts = counts;
 +        if (totalUnloaded > 0) {
 +          this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded);
 +        }
 +        
 +        updateMergeState(mergeStatsCache);
 +        
 +        Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
 +        eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
 +      } catch (Exception ex) {
 +        Master.log.error("Error processing table state for store " + store.name(), ex);
 +        if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) { 
 +          repairMetadata(((BadLocationStateException) ex.getCause()).getEncodedEndRow());
 +        } else {
 +          UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS);
 +        }
 +      } finally {
 +        if (iter != null) {
 +          try {
 +            iter.close();
 +          } catch (IOException ex) {
 +            Master.log.warn("Error closing TabletLocationState iterator: " + ex, ex);
 +          }
 +        }
 +      }
 +    }
 +  }
 +  
 +  private void repairMetadata(Text row) {
 +    Master.log.debug("Attempting repair on " + row);
 +    // ACCUMULO-2261 if a dying tserver writes a location before its lock information propagates, it may cause duplicate assignment.
 +    // Attempt to find the dead server entry and remove it.
 +    try {
 +      Map<Key, Value> future = new HashMap<Key, Value>();
 +      Map<Key, Value> assigned = new HashMap<Key, Value>();
 +      KeyExtent extent = new KeyExtent(row, new Value(new byte[]{0}));
 +      String table = MetadataTable.NAME;
 +      if (extent.isMeta())
 +        table = RootTable.NAME;
 +      Scanner scanner = this.master.getConnector().createScanner(table, Authorizations.EMPTY);
 +      scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
 +      scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
 +      scanner.setRange(new Range(row));
 +      for (Entry<Key,Value> entry : scanner) {
 +        if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
 +          assigned.put(entry.getKey(), entry.getValue());
 +        } else if (entry.getKey().getColumnFamily().equals(FutureLocationColumnFamily.NAME)) {
 +          future.put(entry.getKey(), entry.getValue());
 +        }
 +      }
 +      if (future.size() > 0 && assigned.size() > 0) {
 +        Master.log.warn("Found a tablet assigned and hosted, attempting to repair");
 +      } else if (future.size() > 1 && assigned.size() == 0) {
 +        Master.log.warn("Found a tablet assigned to multiple servers, attempting to repair");
 +      } else if (future.size() == 0 && assigned.size() > 1) {
 +        Master.log.warn("Found a tablet hosted on multiple servers, attempting to repair");
 +      } else {
 +        Master.log.info("Attempted a repair, but nothing seems to be obviously wrong. " + assigned + " " + future);
 +        return;
 +      }
 +      Iterator<Entry<Key, Value>> iter = Iterators.concat(future.entrySet().iterator(), assigned.entrySet().iterator());
 +      while (iter.hasNext()) {
 +        Entry<Key, Value> entry = iter.next();
 +        TServerInstance alive = master.tserverSet.find(entry.getValue().toString());
 +        if (alive == null) {
 +          Master.log.info("Removing entry " + entry);
 +          BatchWriter bw = this.master.getConnector().createBatchWriter(table, new BatchWriterConfig());
 +          Mutation m = new Mutation(entry.getKey().getRow());
 +          m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
 +          bw.addMutation(m);
 +          bw.close();
 +          return;
 +        }
 +      }
 +      Master.log.error("Metadata table is inconsistent at " + row + " and all assigned/future tservers are still online.");
 +    } catch (Throwable e) {
 +      Master.log.error("Error attempting repair of metadata " + row + ": " + e, e);
 +    }
 +  }
 +
 +  private int assignedOrHosted() {
 +    int result = 0;
 +    for (TableCounts counts : stats.getLast().values()) {
 +      result += counts.assigned() + counts.hosted();
 +    }
 +    return result;
 +  }
 +  
 +  private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
 +    // Already split?
 +    if (!info.getState().equals(MergeState.SPLITTING))
 +      return;
 +    // Merges don't split
 +    if (!info.isDelete())
 +      return;
 +    // Online and ready to split?
 +    if (!state.equals(TabletState.HOSTED))
 +      return;
 +    // Does this extent cover the end points of the delete?
 +    KeyExtent range = info.getExtent();
 +    if (tls.extent.overlaps(range)) {
 +      for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) {
 +        if (splitPoint == null)
 +          continue;
 +        if (!tls.extent.contains(splitPoint))
 +          continue;
 +        if (splitPoint.equals(tls.extent.getEndRow()))
 +          continue;
 +        if (splitPoint.equals(tls.extent.getPrevEndRow()))
 +          continue;
 +        try {
 +          TServerConnection conn;
 +          conn = this.master.tserverSet.getConnection(tls.current);
 +          if (conn != null) {
 +            Master.log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);
 +            conn.splitTablet(this.master.masterLock, tls.extent, splitPoint);
 +          } else {
 +            Master.log.warn("Not connected to server " + tls.current);
 +          }
 +        } catch (NotServingTabletException e) {
 +          Master.log.debug("Error asking tablet server to split a tablet: " + e);
 +        } catch (Exception e) {
 +          Master.log.warn("Error asking tablet server to split a tablet: " + e);
 +        }
 +      }
 +    }
 +  }
 +  
 +  private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
 +    // Don't bother if we're in the wrong state
 +    if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED))
 +      return;
 +    // Tablet must be online
 +    if (!state.equals(TabletState.HOSTED))
 +      return;
 +    // Tablet isn't already chopped
 +    if (tls.chopped)
 +      return;
 +    // Tablet ranges intersect
 +    if (info.needsToBeChopped(tls.extent)) {
 +      TServerConnection conn;
 +      try {
 +        conn = this.master.tserverSet.getConnection(tls.current);
 +        if (conn != null) {
 +          Master.log.info("Asking " + tls.current + " to chop " + tls.extent);
 +          conn.chop(this.master.masterLock, tls.extent);
 +        } else {
 +          Master.log.warn("Could not connect to server " + tls.current);
 +        }
 +      } catch (TException e) {
 +        Master.log.warn("Communications error asking tablet server to chop a tablet");
 +      }
 +    }
 +  }
 +  
 +  private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
 +    for (MergeStats stats : mergeStatsCache.values()) {
 +      try {
 +        MergeState update = stats.nextMergeState(this.master.getConnector(), this.master);
 +        // when next state is MERGING, its important to persist this before
 +        // starting the merge... the verification check that is done before
 +        // moving into the merging state could fail if merge starts but does
 +        // not finish
 +        if (update == MergeState.COMPLETE)
 +          update = MergeState.NONE;
 +        if (update != stats.getMergeInfo().getState()) {
 +          this.master.setMergeState(stats.getMergeInfo(), update);
 +        }
 +        
 +        if (update == MergeState.MERGING) {
 +          try {
 +            if (stats.getMergeInfo().isDelete()) {
 +              deleteTablets(stats.getMergeInfo());
 +            } else {
 +              mergeMetadataRecords(stats.getMergeInfo());
 +            }
 +            this.master.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
 +          } catch (Exception ex) {
 +            Master.log.error("Unable merge metadata table records", ex);
 +          }
 +        }
 +      } catch (Exception ex) {
 +        Master.log.error("Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex);
 +      }
 +    }
 +  }
 +  
 +  private void deleteTablets(MergeInfo info) throws AccumuloException {
 +    KeyExtent extent = info.getExtent();
 +    String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME;
 +    Master.log.debug("Deleting tablets for " + extent);
 +    char timeType = '\0';
 +    KeyExtent followingTablet = null;
 +    if (extent.getEndRow() != null) {
 +      Key nextExtent = new Key(extent.getEndRow()).followingKey(PartialKey.ROW);
 +      followingTablet = getHighTablet(new KeyExtent(extent.getTableId(), nextExtent.getRow(), extent.getEndRow()));
 +      Master.log.debug("Found following tablet " + followingTablet);
 +    }
 +    try {
 +      Connector conn = this.master.getConnector();
 +      Text start = extent.getPrevEndRow();
 +      if (start == null) {
 +        start = new Text();
 +      }
 +      Master.log.debug("Making file deletion entries for " + extent);
 +      Range deleteRange = new Range(KeyExtent.getMetadataEntry(extent.getTableId(), start), false, KeyExtent.getMetadataEntry(extent.getTableId(),
 +          extent.getEndRow()), true);
 +      Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
 +      scanner.setRange(deleteRange);
 +      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
 +      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
 +      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
 +      scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
 +      Set<FileRef> datafiles = new TreeSet<FileRef>();
 +      for (Entry<Key,Value> entry : scanner) {
 +        Key key = entry.getKey();
 +        if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
 +          datafiles.add(new FileRef(this.master.fs, key));
 +          if (datafiles.size() > 1000) {
 +            MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
 +            datafiles.clear();
 +          }
 +        } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
 +          timeType = entry.getValue().toString().charAt(0);
 +        } else if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) {
 +          throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
 +        } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
 +          datafiles.add(new FileRef(entry.getValue().toString(), this.master.fs.getFullPath(FileType.TABLE, entry.getValue().toString())));
 +          if (datafiles.size() > 1000) {
 +            MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
 +            datafiles.clear();
 +          }
 +        }
 +      }
 +      MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
 +      BatchWriter bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
 +      try {
 +        deleteTablets(info, deleteRange, bw, conn);
 +      } finally {
 +        bw.close();
 +      }
 +      
 +      if (followingTablet != null) {
 +        Master.log.debug("Updating prevRow of " + followingTablet + " to " + extent.getPrevEndRow());
 +        bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
 +        try {
 +          Mutation m = new Mutation(followingTablet.getMetadataEntry());
 +          TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(extent.getPrevEndRow()));
 +          ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
 +          bw.addMutation(m);
 +          bw.flush();
 +        } finally {
 +          bw.close();
 +        }
 +      } else {
 +        // Recreate the default tablet to hold the end of the table
 +        Master.log.debug("Recreating the last tablet to point to " + extent.getPrevEndRow());
 +        String tdir = master.getFileSystem().choose(ServerConstants.getTablesDirs()) + "/" + extent.getTableId() + Constants.DEFAULT_TABLET_LOCATION;
 +        MetadataTableUtil.addTablet(new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), tdir,
 +            SystemCredentials.get(), timeType, this.master.masterLock);
 +      }
 +    } catch (Exception ex) {
 +      throw new AccumuloException(ex);
 +    }
 +  }
 +  
 +  private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
 +    KeyExtent range = info.getExtent();
 +    Master.log.debug("Merging metadata for " + range);
 +    KeyExtent stop = getHighTablet(range);
 +    Master.log.debug("Highest tablet is " + stop);
 +    Value firstPrevRowValue = null;
 +    Text stopRow = stop.getMetadataEntry();
 +    Text start = range.getPrevEndRow();
 +    if (start == null) {
 +      start = new Text();
 +    }
 +    Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);
 +    String targetSystemTable = MetadataTable.NAME;
 +    if (range.isMeta()) {
 +      targetSystemTable = RootTable.NAME;
 +    }
 +    
 +    BatchWriter bw = null;
 +    try {
 +      long fileCount = 0;
 +      Connector conn = this.master.getConnector();
 +      // Make file entries in highest tablet
 +      bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
 +      Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
 +      scanner.setRange(scanRange);
 +      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
 +      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
 +      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
 +      Mutation m = new Mutation(stopRow);
 +      String maxLogicalTime = null;
 +      for (Entry<Key,Value> entry : scanner) {
 +        Key key = entry.getKey();
 +        Value value = entry.getValue();
 +        if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
 +          m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
 +          fileCount++;
 +        } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) {
 +          Master.log.debug("prevRow entry for lowest tablet is " + value);
 +          firstPrevRowValue = new Value(value);
 +        } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
 +          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
 +        } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
 +          bw.addMutation(MetadataTableUtil.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString()));
 +        }
 +      }
 +      
 +      // read the logical time from the last tablet in the merge range, it is not included in
 +      // the loop above
 +      scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
 +      scanner.setRange(new Range(stopRow));
 +      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
 +      for (Entry<Key,Value> entry : scanner) {
 +        if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
 +          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
 +        }
 +      }
 +      
 +      if (maxLogicalTime != null)
 +        TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes()));
 +      
 +      if (!m.getUpdates().isEmpty()) {
 +        bw.addMutation(m);
 +      }
 +      
 +      bw.flush();
 +      
 +      Master.log.debug("Moved " + fileCount + " files to " + stop);
 +      
 +      if (firstPrevRowValue == null) {
 +        Master.log.debug("tablet already merged");
 +        return;
 +      }
 +      
 +      stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));
 +      Mutation updatePrevRow = stop.getPrevRowUpdateMutation();
 +      Master.log.debug("Setting the prevRow for last tablet: " + stop);
 +      bw.addMutation(updatePrevRow);
 +      bw.flush();
 +      
 +      deleteTablets(info, scanRange, bw, conn);
 +      
 +      // Clean-up the last chopped marker
 +      m = new Mutation(stopRow);
 +      ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
 +      bw.addMutation(m);
 +      bw.flush();
 +      
 +    } catch (Exception ex) {
 +      throw new AccumuloException(ex);
 +    } finally {
 +      if (bw != null)
 +        try {
 +          bw.close();
 +        } catch (Exception ex) {
 +          throw new AccumuloException(ex);
 +        }
 +    }
 +  }
 +  
 +  private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException {
 +    Scanner scanner;
 +    Mutation m;
 +    // Delete everything in the other tablets
 +    // group all deletes into tablet into one mutation, this makes tablets
 +    // either disappear entirely or not all.. this is important for the case
 +    // where the process terminates in the loop below...
 +    scanner = conn.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
 +    Master.log.debug("Deleting range " + scanRange);
 +    scanner.setRange(scanRange);
 +    RowIterator rowIter = new RowIterator(scanner);
 +    while (rowIter.hasNext()) {
 +      Iterator<Entry<Key,Value>> row = rowIter.next();
 +      m = null;
 +      while (row.hasNext()) {
 +        Entry<Key,Value> entry = row.next();
 +        Key key = entry.getKey();
 +        
 +        if (m == null)
 +          m = new Mutation(key.getRow());
 +        
 +        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
 +        Master.log.debug("deleting entry " + key);
 +      }
 +      bw.addMutation(m);
 +    }
 +    
 +    bw.flush();
 +  }
 +  
 +  private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
 +    try {
 +      Connector conn = this.master.getConnector();
 +      Scanner scanner = conn.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
 +      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +      KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);
 +      scanner.setRange(new Range(start.getMetadataEntry(), null));
 +      Iterator<Entry<Key,Value>> iterator = scanner.iterator();
 +      if (!iterator.hasNext()) {
 +        throw new AccumuloException("No last tablet for a merge " + range);
 +      }
 +      Entry<Key,Value> entry = iterator.next();
 +      KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
 +      if (highTablet.getTableId() != range.getTableId()) {
 +        throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
 +      }
 +      return highTablet;
 +    } catch (Exception ex) {
 +      throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);
 +    }
 +  }
 +  
 +  private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
 +      List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
 +    if (!assignedToDeadServers.isEmpty()) {
 +      int maxServersToShow = min(assignedToDeadServers.size(), 100);
 +      Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
 +      store.unassign(assignedToDeadServers);
 +      this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
 +    }
 +    
 +    if (!currentTServers.isEmpty()) {
 +      Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>();
 +      this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);
 +      for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
 +        if (unassigned.containsKey(assignment.getKey())) {
 +          if (assignment.getValue() != null) {
 +            if (!currentTServers.containsKey(assignment.getValue())) {
 +              Master.log.warn("balancer assigned " + assignment.getKey() + " to a tablet server that is not current " + assignment.getValue() + " ignoring");
 +              continue;
 +            }
 +            Master.log.debug(store.name() + " assigning tablet " + assignment);
 +            assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
 +          }
 +        } else {
 +          Master.log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());
 +        }
 +      }
 +      if (!unassigned.isEmpty() && assignedOut.isEmpty())
 +        Master.log.warn("Load balancer failed to assign any tablets");
 +    }
 +    
 +    if (assignments.size() > 0) {
 +      Master.log.info(String.format("Assigning %d tablets", assignments.size()));
 +      store.setFutureLocations(assignments);
 +    }
 +    assignments.addAll(assigned);
 +    for (Assignment a : assignments) {
 +      TServerConnection conn = this.master.tserverSet.getConnection(a.server);
 +      if (conn != null) {
 +        conn.assignTablet(this.master.masterLock, a.tablet);
 +      } else {
 +        Master.log.warn("Could not connect to server " + a.server);
 +      }
 +      master.assignedTablet(a.tablet);
 +    }
 +  }
 +  
 +}


Mime
View raw message