accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [07/16] git commit: Splitting out TabletGroupWatcher to make merge easy
Date Mon, 22 Sep 2014 20:48:07 GMT
Splitting out TabletGroupWatcher to make merge easy



git-svn-id: https://svn.apache.org/repos/asf/accumulo/branches/ACCUMULO-CURATOR@1496196 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/51ff32a9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/51ff32a9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/51ff32a9

Branch: refs/heads/ACCUMULO-CURATOR
Commit: 51ff32a99835b42813e9e3b29588cbf69258f7be
Parents: bc335f7
Author: John Vines <vines@apache.org>
Authored: Mon Jun 24 20:24:13 2013 +0000
Committer: John Vines <vines@apache.org>
Committed: Mon Jun 24 20:24:13 2013 +0000

----------------------------------------------------------------------
 .../apache/accumulo/server/master/Master.java   | 619 +-----------------
 .../server/master/TabletGroupWatcher.java       | 645 +++++++++++++++++++
 .../accumulo/server/zookeeper/ZooCache.java     |  31 -
 3 files changed, 660 insertions(+), 635 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/51ff32a9/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java
index ed096c7..cfeda34 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.server.master;
 
-import static java.lang.Math.min;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -33,20 +32,16 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -61,9 +56,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.file.FileUtil;
@@ -81,7 +73,6 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.master.thrift.TabletSplit;
 import org.apache.accumulo.core.security.SecurityUtil;
 import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.Daemon;
@@ -102,23 +93,18 @@ import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
 import org.apache.accumulo.server.master.balancer.TabletBalancer;
 import org.apache.accumulo.server.master.recovery.RecoveryManager;
-import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.CurrentState;
 import org.apache.accumulo.server.master.state.DeadServerList;
-import org.apache.accumulo.server.master.state.DistributedStoreException;
 import org.apache.accumulo.server.master.state.MergeInfo;
 import org.apache.accumulo.server.master.state.MergeState;
-import org.apache.accumulo.server.master.state.MergeStats;
 import org.apache.accumulo.server.master.state.MetaDataStateStore;
 import org.apache.accumulo.server.master.state.RootTabletStateStore;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TableCounts;
-import org.apache.accumulo.server.master.state.TableStats;
 import org.apache.accumulo.server.master.state.TabletLocationState;
 import org.apache.accumulo.server.master.state.TabletMigration;
 import org.apache.accumulo.server.master.state.TabletServerState;
 import org.apache.accumulo.server.master.state.TabletState;
-import org.apache.accumulo.server.master.state.TabletStateStore;
 import org.apache.accumulo.server.master.state.ZooStore;
 import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 import org.apache.accumulo.server.master.state.tables.TableManager;
@@ -140,7 +126,6 @@ import org.apache.accumulo.server.monitor.Monitor;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.security.SecurityOperation;
-import org.apache.accumulo.server.tabletserver.TabletTime;
 import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.accumulo.server.util.DefaultMap;
@@ -177,41 +162,41 @@ import org.apache.zookeeper.data.Stat;
  */
 public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState {
   
-  final private static Logger log = Logger.getLogger(Master.class);
+  final static Logger log = Logger.getLogger(Master.class);
   
   final private static int ONE_SECOND = 1000;
   final private static Text METADATA_TABLE_ID = new Text(Constants.METADATA_TABLE_ID);
-  final private static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
+  final static long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
   final private static long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
-  final private static long WAIT_BETWEEN_ERRORS = ONE_SECOND;
+  final static long WAIT_BETWEEN_ERRORS = ONE_SECOND;
   final private static long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
   final private static int MAX_CLEANUP_WAIT_TIME = 1000;
   final private static int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = 1000;
-  final private static int MAX_TSERVER_WORK_CHUNK = 5000;
+  final static int MAX_TSERVER_WORK_CHUNK = 5000;
   final private static int MAX_BAD_STATUS_COUNT = 3;
   
   final private FileSystem fs;
   final private Instance instance;
   final private String hostname;
-  final private LiveTServerSet tserverSet;
+  final LiveTServerSet tserverSet;
   final private List<TabletGroupWatcher> watchers = new ArrayList<TabletGroupWatcher>();
   final private SecurityOperation security;
   final private Map<TServerInstance,AtomicInteger> badServers = Collections.synchronizedMap(new DefaultMap<TServerInstance,AtomicInteger>(new AtomicInteger()));
-  final private Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<TServerInstance>());
-  final private SortedMap<KeyExtent,TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
-  final private EventCoordinator nextEvent = new EventCoordinator();
+  final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<TServerInstance>());
+  final SortedMap<KeyExtent,TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
+  final EventCoordinator nextEvent = new EventCoordinator();
   final private Object mergeLock = new Object();
-  private RecoveryManager recoveryManager = null;
+  RecoveryManager recoveryManager = null;
   
-  private ZooLock masterLock = null;
+  ZooLock masterLock = null;
   private TServer clientService = null;
-  private TabletBalancer tabletBalancer;
+  TabletBalancer tabletBalancer;
   
   private MasterState state = MasterState.INITIAL;
   
   private Fate<Master> fate;
   
-  volatile private SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections
+  volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections
       .unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>());
   
   private final Set<String> recoveriesInProgress = Collections.synchronizedSet(new HashSet<String>());
@@ -1247,581 +1232,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
     return state;
   }
   
-  private class TabletGroupWatcher extends Daemon {
-    
-    final TabletStateStore store;
-    final TabletGroupWatcher dependentWatcher;
-    
-    final TableStats stats = new TableStats();
-    
-    TabletGroupWatcher(TabletStateStore store, TabletGroupWatcher dependentWatcher) {
-      this.store = store;
-      this.dependentWatcher = dependentWatcher;
-    }
-    
-    Map<Text,TableCounts> getStats() {
-      return stats.getLast();
-    }
-    
-    TableCounts getStats(Text tableId) {
-      return stats.getLast(tableId);
-    }
-    
-    @Override
-    public void run() {
-      
-      Thread.currentThread().setName("Watching " + store.name());
-      int[] oldCounts = new int[TabletState.values().length];
-      EventCoordinator.Listener eventListener = nextEvent.getListener();
-      
-      while (stillMaster()) {
-        int totalUnloaded = 0;
-        int unloaded = 0;
-        try {
-          Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
-          
-          // Get the current status for the current list of tservers
-          SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
-          for (TServerInstance entry : tserverSet.getCurrentServers()) {
-            currentTServers.put(entry, tserverStatus.get(entry));
-          }
-          
-          if (currentTServers.size() == 0) {
-            eventListener.waitForEvents(TIME_TO_WAIT_BETWEEN_SCANS);
-            continue;
-          }
-          
-          // Don't move tablets to servers that are shutting down
-          SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers);
-          destinations.keySet().removeAll(serversToShutdown);
-          
-          List<Assignment> assignments = new ArrayList<Assignment>();
-          List<Assignment> assigned = new ArrayList<Assignment>();
-          List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
-          Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
-          
-          int[] counts = new int[TabletState.values().length];
-          stats.begin();
-          // Walk through the tablets in our store, and work tablets towards their goal
-          for (TabletLocationState tls : store) {
-            if (tls == null) {
-              continue;
-            }
-            // ignore entries for tables that do not exist in zookeeper
-            if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
-              continue;
-            
-            // Don't overwhelm the tablet servers with work
-            if (unassigned.size() + unloaded > MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
-              flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
-              assignments.clear();
-              assigned.clear();
-              assignedToDeadServers.clear();
-              unassigned.clear();
-              unloaded = 0;
-              eventListener.waitForEvents(TIME_TO_WAIT_BETWEEN_SCANS);
-            }
-            Text tableId = tls.extent.getTableId();
-            MergeStats mergeStats = mergeStatsCache.get(tableId);
-            if (mergeStats == null) {
-              mergeStatsCache.put(tableId, mergeStats = new MergeStats(getMergeInfo(tls.extent)));
-            }
-            TabletGoalState goal = getGoalState(tls, mergeStats.getMergeInfo());
-            TServerInstance server = tls.getServer();
-            TabletState state = tls.getState(currentTServers.keySet());
-            stats.update(tableId, state);
-            mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
-            sendChopRequest(mergeStats.getMergeInfo(), state, tls);
-            sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
-            
-            // Always follow through with assignments
-            if (state == TabletState.ASSIGNED) {
-              goal = TabletGoalState.HOSTED;
-            }
-            
-            // if we are shutting down all the tabletservers, we have to do it in order
-            if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) {
-              if (serversToShutdown.equals(currentTServers.keySet())) {
-                if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) {
-                  goal = TabletGoalState.HOSTED;
-                }
-              }
-            }
-            
-            if (goal == TabletGoalState.HOSTED) {
-              if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
-                if (recoveryManager.recoverLogs(tls.extent, tls.walogs))
-                  continue;
-              }
-              switch (state) {
-                case HOSTED:
-                  if (server.equals(migrations.get(tls.extent)))
-                    migrations.remove(tls.extent);
-                  break;
-                case ASSIGNED_TO_DEAD_SERVER:
-                  assignedToDeadServers.add(tls);
-                  if (server.equals(migrations.get(tls.extent)))
-                    migrations.remove(tls.extent);
-                  // log.info("Current servers " + currentTServers.keySet());
-                  break;
-                case UNASSIGNED:
-                  // maybe it's a finishing migration
-                  TServerInstance dest = migrations.get(tls.extent);
-                  if (dest != null) {
-                    // if destination is still good, assign it
-                    if (destinations.keySet().contains(dest)) {
-                      assignments.add(new Assignment(tls.extent, dest));
-                    } else {
-                      // get rid of this migration
-                      migrations.remove(tls.extent);
-                      unassigned.put(tls.extent, server);
-                    }
-                  } else {
-                    unassigned.put(tls.extent, server);
-                  }
-                  break;
-                case ASSIGNED:
-                  // Send another reminder
-                  assigned.add(new Assignment(tls.extent, tls.future));
-                  break;
-              }
-            } else {
-              switch (state) {
-                case UNASSIGNED:
-                  break;
-                case ASSIGNED_TO_DEAD_SERVER:
-                  assignedToDeadServers.add(tls);
-                  // log.info("Current servers " + currentTServers.keySet());
-                  break;
-                case HOSTED:
-                  TServerConnection conn = tserverSet.getConnection(server);
-                  if (conn != null) {
-                    conn.unloadTablet(masterLock, tls.extent, goal != TabletGoalState.DELETED);
-                    unloaded++;
-                    totalUnloaded++;
-                  } else {
-                    log.warn("Could not connect to server " + server);
-                  }
-                  break;
-                case ASSIGNED:
-                  break;
-              }
-            }
-            counts[state.ordinal()]++;
-          }
-          
-          flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
-          
-          // provide stats after flushing changes to avoid race conditions w/ delete table
-          stats.end();
-          
-          // Report changes
-          for (TabletState state : TabletState.values()) {
-            int i = state.ordinal();
-            if (counts[i] > 0 && counts[i] != oldCounts[i]) {
-              nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name());
-            }
-          }
-          log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.));
-          oldCounts = counts;
-          if (totalUnloaded > 0) {
-            nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded);
-          }
-          
-          updateMergeState(mergeStatsCache);
-          
-          log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
-          eventListener.waitForEvents(TIME_TO_WAIT_BETWEEN_SCANS);
-        } catch (Exception ex) {
-          log.error("Error processing table state for store " + store.name(), ex);
-          UtilWaitThread.sleep(WAIT_BETWEEN_ERRORS);
-        }
-      }
-    }
-    
-    private int assignedOrHosted() {
-      int result = 0;
-      for (TableCounts counts : stats.getLast().values()) {
-        result += counts.assigned() + counts.hosted();
-      }
-      return result;
-    }
-    
-    private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
-      // Already split?
-      if (!info.getState().equals(MergeState.SPLITTING))
-        return;
-      // Merges don't split
-      if (!info.isDelete())
-        return;
-      // Online and ready to split?
-      if (!state.equals(TabletState.HOSTED))
-        return;
-      // Does this extent cover the end points of the delete?
-      KeyExtent range = info.getRange();
-      if (tls.extent.overlaps(range)) {
-        for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) {
-          if (splitPoint == null)
-            continue;
-          if (!tls.extent.contains(splitPoint))
-            continue;
-          if (splitPoint.equals(tls.extent.getEndRow()))
-            continue;
-          if (splitPoint.equals(tls.extent.getPrevEndRow()))
-            continue;
-          try {
-            TServerConnection conn;
-            conn = tserverSet.getConnection(tls.current);
-            if (conn != null) {
-              log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);
-              conn.splitTablet(masterLock, tls.extent, splitPoint);
-            } else {
-              log.warn("Not connected to server " + tls.current);
-            }
-          } catch (NotServingTabletException e) {
-            log.debug("Error asking tablet server to split a tablet: " + e);
-          } catch (Exception e) {
-            log.warn("Error asking tablet server to split a tablet: " + e);
-          }
-        }
-      }
-    }
-    
-    private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
-      // Don't bother if we're in the wrong state
-      if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED))
-        return;
-      // Tablet must be online
-      if (!state.equals(TabletState.HOSTED))
-        return;
-      // Tablet isn't already chopped
-      if (tls.chopped)
-        return;
-      // Tablet ranges intersect
-      if (info.needsToBeChopped(tls.extent)) {
-        TServerConnection conn;
-        try {
-          conn = tserverSet.getConnection(tls.current);
-          if (conn != null) {
-            log.info("Asking " + tls.current + " to chop " + tls.extent);
-            conn.chop(masterLock, tls.extent);
-          } else {
-            log.warn("Could not connect to server " + tls.current);
-          }
-        } catch (TException e) {
-          log.warn("Communications error asking tablet server to chop a tablet");
-        }
-      }
-    }
-    
-    private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
-      for (MergeStats stats : mergeStatsCache.values()) {
-        try {
-          MergeState update = stats.nextMergeState(getConnector(), Master.this);
-          // when next state is MERGING, its important to persist this before
-          // starting the merge... the verification check that is done before
-          // moving into the merging state could fail if merge starts but does
-          // not finish
-          if (update == MergeState.COMPLETE)
-            update = MergeState.NONE;
-          if (update != stats.getMergeInfo().getState()) {
-            setMergeState(stats.getMergeInfo(), update);
-          }
-          
-          if (update == MergeState.MERGING) {
-            try {
-              if (stats.getMergeInfo().isDelete()) {
-                deleteTablets(stats.getMergeInfo());
-              } else {
-                mergeMetadataRecords(stats.getMergeInfo());
-              }
-              setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
-            } catch (Exception ex) {
-              log.error("Unable merge metadata table records", ex);
-            }
-          }
-        } catch (Exception ex) {
-          log.error("Unable to update merge state for merge " + stats.getMergeInfo().getRange(), ex);
-        }
-      }
-    }
-    
-    private void deleteTablets(MergeInfo info) throws AccumuloException {
-      KeyExtent range = info.getRange();
-      log.debug("Deleting tablets for " + range);
-      char timeType = '\0';
-      KeyExtent followingTablet = null;
-      if (range.getEndRow() != null) {
-        Key nextExtent = new Key(range.getEndRow()).followingKey(PartialKey.ROW);
-        followingTablet = getHighTablet(new KeyExtent(range.getTableId(), nextExtent.getRow(), range.getEndRow()));
-        log.debug("Found following tablet " + followingTablet);
-      }
-      try {
-        Connector conn = getConnector();
-        Text start = range.getPrevEndRow();
-        if (start == null) {
-          start = new Text();
-        }
-        log.debug("Making file deletion entries for " + range);
-        Range deleteRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, KeyExtent.getMetadataEntry(range.getTableId(),
-            range.getEndRow()), true);
-        Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
-        scanner.setRange(deleteRange);
-        Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);
-        Constants.METADATA_TIME_COLUMN.fetch(scanner);
-        scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
-        scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
-        Set<String> datafiles = new TreeSet<String>();
-        for (Entry<Key,Value> entry : scanner) {
-          Key key = entry.getKey();
-          if (key.compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
-            datafiles.add(key.getColumnQualifier().toString());
-            if (datafiles.size() > 1000) {
-              MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
-              datafiles.clear();
-            }
-          } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {
-            timeType = entry.getValue().toString().charAt(0);
-          } else if (key.compareColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) == 0) {
-            throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
-          } else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
-            datafiles.add(entry.getValue().toString());
-            if (datafiles.size() > 1000) {
-              MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
-              datafiles.clear();
-            }
-          }
-        }
-        MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
-        BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
-        try {
-          deleteTablets(deleteRange, bw, conn);
-        } finally {
-          bw.close();
-        }
-        
-        if (followingTablet != null) {
-          log.debug("Updating prevRow of " + followingTablet + " to " + range.getPrevEndRow());
-          bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
-          try {
-            Mutation m = new Mutation(followingTablet.getMetadataEntry());
-            Constants.METADATA_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(range.getPrevEndRow()));
-            Constants.METADATA_CHOPPED_COLUMN.putDelete(m);
-            bw.addMutation(m);
-            bw.flush();
-          } finally {
-            bw.close();
-          }
-        } else {
-          // Recreate the default tablet to hold the end of the table
-          log.debug("Recreating the last tablet to point to " + range.getPrevEndRow());
-          MetadataTable.addTablet(new KeyExtent(range.getTableId(), null, range.getPrevEndRow()), Constants.DEFAULT_TABLET_LOCATION,
-              SecurityConstants.getSystemCredentials(), timeType, masterLock);
-        }
-      } catch (Exception ex) {
-        throw new AccumuloException(ex);
-      }
-    }
-    
-    private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
-      KeyExtent range = info.getRange();
-      log.debug("Merging metadata for " + range);
-      KeyExtent stop = getHighTablet(range);
-      log.debug("Highest tablet is " + stop);
-      Value firstPrevRowValue = null;
-      Text stopRow = stop.getMetadataEntry();
-      Text start = range.getPrevEndRow();
-      if (start == null) {
-        start = new Text();
-      }
-      Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);
-      if (range.isMeta())
-        scanRange = scanRange.clip(Constants.METADATA_ROOT_TABLET_KEYSPACE);
-      
-      BatchWriter bw = null;
-      try {
-        long fileCount = 0;
-        Connector conn = getConnector();
-        // Make file entries in highest tablet
-        bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
-        Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
-        scanner.setRange(scanRange);
-        Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
-        Constants.METADATA_TIME_COLUMN.fetch(scanner);
-        Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);
-        scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
-        Mutation m = new Mutation(stopRow);
-        String maxLogicalTime = null;
-        for (Entry<Key,Value> entry : scanner) {
-          Key key = entry.getKey();
-          Value value = entry.getValue();
-          if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
-            m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
-            fileCount++;
-          } else if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) {
-            log.debug("prevRow entry for lowest tablet is " + value);
-            firstPrevRowValue = new Value(value);
-          } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {
-            maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
-          } else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
-            if (!range.isMeta())
-              bw.addMutation(MetadataTable.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString()));
-          }
-        }
-        
-        // read the logical time from the last tablet in the merge range, it is not included in
-        // the loop above
-        scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
-        Range last = new Range(stopRow);
-        if (range.isMeta())
-          last = last.clip(Constants.METADATA_ROOT_TABLET_KEYSPACE);
-        scanner.setRange(last);
-        Constants.METADATA_TIME_COLUMN.fetch(scanner);
-        for (Entry<Key,Value> entry : scanner) {
-          if (Constants.METADATA_TIME_COLUMN.hasColumns(entry.getKey())) {
-            maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
-          }
-        }
-        
-        if (maxLogicalTime != null)
-          Constants.METADATA_TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes()));
-        
-        if (!m.getUpdates().isEmpty()) {
-          bw.addMutation(m);
-        }
-        
-        bw.flush();
-        
-        log.debug("Moved " + fileCount + " files to " + stop);
-        
-        if (firstPrevRowValue == null) {
-          log.debug("tablet already merged");
-          return;
-        }
-        
-        stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));
-        Mutation updatePrevRow = stop.getPrevRowUpdateMutation();
-        log.debug("Setting the prevRow for last tablet: " + stop);
-        bw.addMutation(updatePrevRow);
-        bw.flush();
-        
-        deleteTablets(scanRange, bw, conn);
-        
-        // Clean-up the last chopped marker
-        m = new Mutation(stopRow);
-        Constants.METADATA_CHOPPED_COLUMN.putDelete(m);
-        bw.addMutation(m);
-        bw.flush();
-        
-      } catch (Exception ex) {
-        throw new AccumuloException(ex);
-      } finally {
-        if (bw != null)
-          try {
-            bw.close();
-          } catch (Exception ex) {
-            throw new AccumuloException(ex);
-          }
-      }
-    }
-    
-    private void deleteTablets(Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException {
-      Scanner scanner;
-      Mutation m;
-      // Delete everything in the other tablets
-      // group all deletes into tablet into one mutation, this makes tablets
-      // either disappear entirely or not all.. this is important for the case
-      // where the process terminates in the loop below...
-      scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
-      log.debug("Deleting range " + scanRange);
-      scanner.setRange(scanRange);
-      RowIterator rowIter = new RowIterator(scanner);
-      while (rowIter.hasNext()) {
-        Iterator<Entry<Key,Value>> row = rowIter.next();
-        m = null;
-        while (row.hasNext()) {
-          Entry<Key,Value> entry = row.next();
-          Key key = entry.getKey();
-          
-          if (m == null)
-            m = new Mutation(key.getRow());
-          
-          m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
-          log.debug("deleting entry " + key);
-        }
-        bw.addMutation(m);
-      }
-      
-      bw.flush();
-    }
-    
-    private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
-      try {
-        Connector conn = getConnector();
-        Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
-        Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
-        KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);
-        scanner.setRange(new Range(start.getMetadataEntry(), null));
-        Iterator<Entry<Key,Value>> iterator = scanner.iterator();
-        if (!iterator.hasNext()) {
-          throw new AccumuloException("No last tablet for a merge " + range);
-        }
-        Entry<Key,Value> entry = iterator.next();
-        KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
-        if (highTablet.getTableId() != range.getTableId()) {
-          throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
-        }
-        return highTablet;
-      } catch (Exception ex) {
-        throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);
-      }
-    }
-    
-    private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
-        List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
-      if (!assignedToDeadServers.isEmpty()) {
-        int maxServersToShow = min(assignedToDeadServers.size(), 100);
-        log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
-        store.unassign(assignedToDeadServers);
-        nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
-      }
-      
-      if (!currentTServers.isEmpty()) {
-        Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>();
-        tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);
-        for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
-          if (unassigned.containsKey(assignment.getKey())) {
-            if (assignment.getValue() != null) {
-              log.debug(store.name() + " assigning tablet " + assignment);
-              assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
-            }
-          } else {
-            log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());
-          }
-        }
-        if (!unassigned.isEmpty() && assignedOut.isEmpty())
-          log.warn("Load balancer failed to assign any tablets");
-      }
-      
-      if (assignments.size() > 0) {
-        log.info(String.format("Assigning %d tablets", assignments.size()));
-        store.setFutureLocations(assignments);
-      }
-      assignments.addAll(assigned);
-      for (Assignment a : assignments) {
-        TServerConnection conn = tserverSet.getConnection(a.server);
-        if (conn != null) {
-          conn.assignTablet(masterLock, a.tablet);
-        } else {
-          log.warn("Could not connect to server " + a.server);
-        }
-      }
-    }
-    
-  }
-  
   private class MigrationCleanupThread extends Daemon {
-    
     @Override
     public void run() {
       setName("Migration Cleanup Thread");
@@ -2098,9 +1509,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
     });
     
     TCredentials systemAuths = SecurityConstants.getSystemCredentials();
-    watchers.add(new TabletGroupWatcher(new MetaDataStateStore(instance, systemAuths, this), null));
-    watchers.add(new TabletGroupWatcher(new RootTabletStateStore(instance, systemAuths, this), watchers.get(0)));
-    watchers.add(new TabletGroupWatcher(new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1)));
+    watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(instance, systemAuths, this), null));
+    watchers.add(new TabletGroupWatcher(this, new RootTabletStateStore(instance, systemAuths, this), watchers.get(0)));
+    watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(zroot)), watchers.get(1)));
     for (TabletGroupWatcher watcher : watchers) {
       watcher.start();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/51ff32a9/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java b/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java
new file mode 100644
index 0000000..61d5651
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master;
+
+import static java.lang.Math.min;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.Master.TabletGoalState;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MergeStats;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TableCounts;
+import org.apache.accumulo.server.master.state.TableStats;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.master.state.tables.TableManager;
+import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.server.tabletserver.TabletTime;
+import org.apache.accumulo.server.util.MetadataTable;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+
+class TabletGroupWatcher extends Daemon {
+  
+  private final Master master;
+  final TabletStateStore store;
+  final TabletGroupWatcher dependentWatcher;
+  
+  final TableStats stats = new TableStats();
+  
+  TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) {
+    this.master = master;
+    this.store = store;
+    this.dependentWatcher = dependentWatcher;
+  }
+  
+  Map<Text,TableCounts> getStats() {
+    return stats.getLast();
+  }
+  
+  TableCounts getStats(Text tableId) {
+    return stats.getLast(tableId);
+  }
+  
+  @Override
+  public void run() {
+    
+    Thread.currentThread().setName("Watching " + store.name());
+    int[] oldCounts = new int[TabletState.values().length];
+    EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
+    
+    while (this.master.stillMaster()) {
+      int totalUnloaded = 0;
+      int unloaded = 0;
+      try {
+        Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
+        
+        // Get the current status for the current list of tservers
+        SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
+        for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) {
+          currentTServers.put(entry, this.master.tserverStatus.get(entry));
+        }
+        
+        if (currentTServers.size() == 0) {
+          eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+          continue;
+        }
+        
+        // Don't move tablets to servers that are shutting down
+        SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers);
+        destinations.keySet().removeAll(this.master.serversToShutdown);
+        
+        List<Assignment> assignments = new ArrayList<Assignment>();
+        List<Assignment> assigned = new ArrayList<Assignment>();
+        List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
+        Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
+        
+        int[] counts = new int[TabletState.values().length];
+        stats.begin();
+        // Walk through the tablets in our store, and work tablets towards their goal
+        for (TabletLocationState tls : store) {
+          if (tls == null) {
+            continue;
+          }
+          // ignore entries for tables that do not exist in zookeeper
+          if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
+            continue;
+          
+          // Don't overwhelm the tablet servers with work
+          if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
+            flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+            assignments.clear();
+            assigned.clear();
+            assignedToDeadServers.clear();
+            unassigned.clear();
+            unloaded = 0;
+            eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+          }
+          Text tableId = tls.extent.getTableId();
+          MergeStats mergeStats = mergeStatsCache.get(tableId);
+          if (mergeStats == null) {
+            mergeStatsCache.put(tableId, mergeStats = new MergeStats(this.master.getMergeInfo(tls.extent)));
+          }
+          TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
+          TServerInstance server = tls.getServer();
+          TabletState state = tls.getState(currentTServers.keySet());
+          stats.update(tableId, state);
+          mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
+          sendChopRequest(mergeStats.getMergeInfo(), state, tls);
+          sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
+          
+          // Always follow through with assignments
+          if (state == TabletState.ASSIGNED) {
+            goal = TabletGoalState.HOSTED;
+          }
+          
+          // if we are shutting down all the tabletservers, we have to do it in order
+          if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) {
+            if (this.master.serversToShutdown.equals(currentTServers.keySet())) {
+              if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) {
+                goal = TabletGoalState.HOSTED;
+              }
+            }
+          }
+          
+          if (goal == TabletGoalState.HOSTED) {
+            if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
+              if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs))
+                continue;
+            }
+            switch (state) {
+              case HOSTED:
+                if (server.equals(this.master.migrations.get(tls.extent)))
+                  this.master.migrations.remove(tls.extent);
+                break;
+              case ASSIGNED_TO_DEAD_SERVER:
+                assignedToDeadServers.add(tls);
+                if (server.equals(this.master.migrations.get(tls.extent)))
+                  this.master.migrations.remove(tls.extent);
+                // log.info("Current servers " + currentTServers.keySet());
+                break;
+              case UNASSIGNED:
+                // maybe it's a finishing migration
+                TServerInstance dest = this.master.migrations.get(tls.extent);
+                if (dest != null) {
+                  // if destination is still good, assign it
+                  if (destinations.keySet().contains(dest)) {
+                    assignments.add(new Assignment(tls.extent, dest));
+                  } else {
+                    // get rid of this migration
+                    this.master.migrations.remove(tls.extent);
+                    unassigned.put(tls.extent, server);
+                  }
+                } else {
+                  unassigned.put(tls.extent, server);
+                }
+                break;
+              case ASSIGNED:
+                // Send another reminder
+                assigned.add(new Assignment(tls.extent, tls.future));
+                break;
+            }
+          } else {
+            switch (state) {
+              case UNASSIGNED:
+                break;
+              case ASSIGNED_TO_DEAD_SERVER:
+                assignedToDeadServers.add(tls);
+                // log.info("Current servers " + currentTServers.keySet());
+                break;
+              case HOSTED:
+                TServerConnection conn = this.master.tserverSet.getConnection(server);
+                if (conn != null) {
+                  conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED);
+                  unloaded++;
+                  totalUnloaded++;
+                } else {
+                  Master.log.warn("Could not connect to server " + server);
+                }
+                break;
+              case ASSIGNED:
+                break;
+            }
+          }
+          counts[state.ordinal()]++;
+        }
+        
+        flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+        
+        // provide stats after flushing changes to avoid race conditions w/ delete table
+        stats.end();
+        
+        // Report changes
+        for (TabletState state : TabletState.values()) {
+          int i = state.ordinal();
+          if (counts[i] > 0 && counts[i] != oldCounts[i]) {
+            this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name());
+          }
+        }
+        Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.));
+        oldCounts = counts;
+        if (totalUnloaded > 0) {
+          this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded);
+        }
+        
+        updateMergeState(mergeStatsCache);
+        
+        Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
+        eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+      } catch (Exception ex) {
+        Master.log.error("Error processing table state for store " + store.name(), ex);
+        UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS);
+      }
+    }
+  }
+  
+  private int assignedOrHosted() {
+    int result = 0;
+    for (TableCounts counts : stats.getLast().values()) {
+      result += counts.assigned() + counts.hosted();
+    }
+    return result;
+  }
+  
+  private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
+    // Already split?
+    if (!info.getState().equals(MergeState.SPLITTING))
+      return;
+    // Merges don't split
+    if (!info.isDelete())
+      return;
+    // Online and ready to split?
+    if (!state.equals(TabletState.HOSTED))
+      return;
+    // Does this extent cover the end points of the delete?
+    KeyExtent range = info.getRange();
+    if (tls.extent.overlaps(range)) {
+      for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) {
+        if (splitPoint == null)
+          continue;
+        if (!tls.extent.contains(splitPoint))
+          continue;
+        if (splitPoint.equals(tls.extent.getEndRow()))
+          continue;
+        if (splitPoint.equals(tls.extent.getPrevEndRow()))
+          continue;
+        try {
+          TServerConnection conn;
+          conn = this.master.tserverSet.getConnection(tls.current);
+          if (conn != null) {
+            Master.log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);
+            conn.splitTablet(this.master.masterLock, tls.extent, splitPoint);
+          } else {
+            Master.log.warn("Not connected to server " + tls.current);
+          }
+        } catch (NotServingTabletException e) {
+          Master.log.debug("Error asking tablet server to split a tablet: " + e);
+        } catch (Exception e) {
+          Master.log.warn("Error asking tablet server to split a tablet: " + e);
+        }
+      }
+    }
+  }
+  
+  private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
+    // Don't bother if we're in the wrong state
+    if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED))
+      return;
+    // Tablet must be online
+    if (!state.equals(TabletState.HOSTED))
+      return;
+    // Tablet isn't already chopped
+    if (tls.chopped)
+      return;
+    // Tablet ranges intersect
+    if (info.needsToBeChopped(tls.extent)) {
+      TServerConnection conn;
+      try {
+        conn = this.master.tserverSet.getConnection(tls.current);
+        if (conn != null) {
+          Master.log.info("Asking " + tls.current + " to chop " + tls.extent);
+          conn.chop(this.master.masterLock, tls.extent);
+        } else {
+          Master.log.warn("Could not connect to server " + tls.current);
+        }
+      } catch (TException e) {
+        Master.log.warn("Communications error asking tablet server to chop a tablet");
+      }
+    }
+  }
+  
+  private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
+    for (MergeStats stats : mergeStatsCache.values()) {
+      try {
+        MergeState update = stats.nextMergeState(this.master.getConnector(), this.master);
+        // when next state is MERGING, its important to persist this before
+        // starting the merge... the verification check that is done before
+        // moving into the merging state could fail if merge starts but does
+        // not finish
+        if (update == MergeState.COMPLETE)
+          update = MergeState.NONE;
+        if (update != stats.getMergeInfo().getState()) {
+          this.master.setMergeState(stats.getMergeInfo(), update);
+        }
+        
+        if (update == MergeState.MERGING) {
+          try {
+            if (stats.getMergeInfo().isDelete()) {
+              deleteTablets(stats.getMergeInfo());
+            } else {
+              mergeMetadataRecords(stats.getMergeInfo());
+            }
+            this.master.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
+          } catch (Exception ex) {
+            Master.log.error("Unable merge metadata table records", ex);
+          }
+        }
+      } catch (Exception ex) {
+        Master.log.error("Unable to update merge state for merge " + stats.getMergeInfo().getRange(), ex);
+      }
+    }
+  }
+  
+  private void deleteTablets(MergeInfo info) throws AccumuloException {
+    KeyExtent range = info.getRange();
+    Master.log.debug("Deleting tablets for " + range);
+    char timeType = '\0';
+    KeyExtent followingTablet = null;
+    if (range.getEndRow() != null) {
+      Key nextExtent = new Key(range.getEndRow()).followingKey(PartialKey.ROW);
+      followingTablet = getHighTablet(new KeyExtent(range.getTableId(), nextExtent.getRow(), range.getEndRow()));
+      Master.log.debug("Found following tablet " + followingTablet);
+    }
+    try {
+      Connector conn = this.master.getConnector();
+      Text start = range.getPrevEndRow();
+      if (start == null) {
+        start = new Text();
+      }
+      Master.log.debug("Making file deletion entries for " + range);
+      Range deleteRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, KeyExtent.getMetadataEntry(range.getTableId(),
+          range.getEndRow()), true);
+      Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+      scanner.setRange(deleteRange);
+      Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);
+      Constants.METADATA_TIME_COLUMN.fetch(scanner);
+      scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
+      scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+      Set<String> datafiles = new TreeSet<String>();
+      for (Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        if (key.compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
+          datafiles.add(key.getColumnQualifier().toString());
+          if (datafiles.size() > 1000) {
+            MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
+            datafiles.clear();
+          }
+        } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {
+          timeType = entry.getValue().toString().charAt(0);
+        } else if (key.compareColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) == 0) {
+          throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
+        } else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
+          datafiles.add(entry.getValue().toString());
+          if (datafiles.size() > 1000) {
+            MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
+            datafiles.clear();
+          }
+        }
+      }
+      MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
+      BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
+      try {
+        deleteTablets(deleteRange, bw, conn);
+      } finally {
+        bw.close();
+      }
+      
+      if (followingTablet != null) {
+        Master.log.debug("Updating prevRow of " + followingTablet + " to " + range.getPrevEndRow());
+        bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
+        try {
+          Mutation m = new Mutation(followingTablet.getMetadataEntry());
+          Constants.METADATA_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(range.getPrevEndRow()));
+          Constants.METADATA_CHOPPED_COLUMN.putDelete(m);
+          bw.addMutation(m);
+          bw.flush();
+        } finally {
+          bw.close();
+        }
+      } else {
+        // Recreate the default tablet to hold the end of the table
+        Master.log.debug("Recreating the last tablet to point to " + range.getPrevEndRow());
+        MetadataTable.addTablet(new KeyExtent(range.getTableId(), null, range.getPrevEndRow()), Constants.DEFAULT_TABLET_LOCATION,
+            SecurityConstants.getSystemCredentials(), timeType, this.master.masterLock);
+      }
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    }
+  }
+  
+  private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
+    KeyExtent range = info.getRange();
+    Master.log.debug("Merging metadata for " + range);
+    KeyExtent stop = getHighTablet(range);
+    Master.log.debug("Highest tablet is " + stop);
+    Value firstPrevRowValue = null;
+    Text stopRow = stop.getMetadataEntry();
+    Text start = range.getPrevEndRow();
+    if (start == null) {
+      start = new Text();
+    }
+    Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);
+    if (range.isMeta())
+      scanRange = scanRange.clip(Constants.METADATA_ROOT_TABLET_KEYSPACE);
+    
+    BatchWriter bw = null;
+    try {
+      long fileCount = 0;
+      Connector conn = this.master.getConnector();
+      // Make file entries in highest tablet
+      bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
+      Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+      scanner.setRange(scanRange);
+      Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
+      Constants.METADATA_TIME_COLUMN.fetch(scanner);
+      Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);
+      scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
+      Mutation m = new Mutation(stopRow);
+      String maxLogicalTime = null;
+      for (Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        Value value = entry.getValue();
+        if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
+          m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
+          fileCount++;
+        } else if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) {
+          Master.log.debug("prevRow entry for lowest tablet is " + value);
+          firstPrevRowValue = new Value(value);
+        } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {
+          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
+        } else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
+          if (!range.isMeta())
+            bw.addMutation(MetadataTable.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString()));
+        }
+      }
+      
+      // read the logical time from the last tablet in the merge range, it is not included in
+      // the loop above
+      scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+      Range last = new Range(stopRow);
+      if (range.isMeta())
+        last = last.clip(Constants.METADATA_ROOT_TABLET_KEYSPACE);
+      scanner.setRange(last);
+      Constants.METADATA_TIME_COLUMN.fetch(scanner);
+      for (Entry<Key,Value> entry : scanner) {
+        if (Constants.METADATA_TIME_COLUMN.hasColumns(entry.getKey())) {
+          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
+        }
+      }
+      
+      if (maxLogicalTime != null)
+        Constants.METADATA_TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes()));
+      
+      if (!m.getUpdates().isEmpty()) {
+        bw.addMutation(m);
+      }
+      
+      bw.flush();
+      
+      Master.log.debug("Moved " + fileCount + " files to " + stop);
+      
+      if (firstPrevRowValue == null) {
+        Master.log.debug("tablet already merged");
+        return;
+      }
+      
+      stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));
+      Mutation updatePrevRow = stop.getPrevRowUpdateMutation();
+      Master.log.debug("Setting the prevRow for last tablet: " + stop);
+      bw.addMutation(updatePrevRow);
+      bw.flush();
+      
+      deleteTablets(scanRange, bw, conn);
+      
+      // Clean-up the last chopped marker
+      m = new Mutation(stopRow);
+      Constants.METADATA_CHOPPED_COLUMN.putDelete(m);
+      bw.addMutation(m);
+      bw.flush();
+      
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    } finally {
+      if (bw != null)
+        try {
+          bw.close();
+        } catch (Exception ex) {
+          throw new AccumuloException(ex);
+        }
+    }
+  }
+  
+  private void deleteTablets(Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException {
+    Scanner scanner;
+    Mutation m;
+    // Delete everything in the other tablets
+    // group all deletes into tablet into one mutation, this makes tablets
+    // either disappear entirely or not all.. this is important for the case
+    // where the process terminates in the loop below...
+    scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+    Master.log.debug("Deleting range " + scanRange);
+    scanner.setRange(scanRange);
+    RowIterator rowIter = new RowIterator(scanner);
+    while (rowIter.hasNext()) {
+      Iterator<Entry<Key,Value>> row = rowIter.next();
+      m = null;
+      while (row.hasNext()) {
+        Entry<Key,Value> entry = row.next();
+        Key key = entry.getKey();
+        
+        if (m == null)
+          m = new Mutation(key.getRow());
+        
+        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+        Master.log.debug("deleting entry " + key);
+      }
+      bw.addMutation(m);
+    }
+    
+    bw.flush();
+  }
+  
+  private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
+    try {
+      Connector conn = this.master.getConnector();
+      Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+      Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
+      KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);
+      scanner.setRange(new Range(start.getMetadataEntry(), null));
+      Iterator<Entry<Key,Value>> iterator = scanner.iterator();
+      if (!iterator.hasNext()) {
+        throw new AccumuloException("No last tablet for a merge " + range);
+      }
+      Entry<Key,Value> entry = iterator.next();
+      KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
+      if (highTablet.getTableId() != range.getTableId()) {
+        throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
+      }
+      return highTablet;
+    } catch (Exception ex) {
+      throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);
+    }
+  }
+  
+  private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
+      List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
+    if (!assignedToDeadServers.isEmpty()) {
+      int maxServersToShow = min(assignedToDeadServers.size(), 100);
+      Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
+      store.unassign(assignedToDeadServers);
+      this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
+    }
+    
+    if (!currentTServers.isEmpty()) {
+      Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>();
+      this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);
+      for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
+        if (unassigned.containsKey(assignment.getKey())) {
+          if (assignment.getValue() != null) {
+            Master.log.debug(store.name() + " assigning tablet " + assignment);
+            assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
+          }
+        } else {
+          Master.log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());
+        }
+      }
+      if (!unassigned.isEmpty() && assignedOut.isEmpty())
+        Master.log.warn("Load balancer failed to assign any tablets");
+    }
+    
+    if (assignments.size() > 0) {
+      Master.log.info(String.format("Assigning %d tablets", assignments.size()));
+      store.setFutureLocations(assignments);
+    }
+    assignments.addAll(assigned);
+    for (Assignment a : assignments) {
+      TServerConnection conn = this.master.tserverSet.getConnection(a.server);
+      if (conn != null) {
+        conn.assignTablet(this.master.masterLock, a.tablet);
+      } else {
+        Master.log.warn("Could not connect to server " + a.server);
+      }
+    }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/51ff32a9/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooCache.java b/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooCache.java
deleted file mode 100644
index e9c4320..0000000
--- a/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooCache.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.zookeeper;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.server.curator.CuratorUtil;
-
-public class ZooCache extends org.apache.accumulo.fate.zookeeper.ZooCache {
-  public ZooCache() {
-    super(CuratorUtil.getInstance());
-  }
-  
-  public ZooCache(AccumuloConfiguration conf) {
-    super(conf.get(Property.INSTANCE_ZK_HOST), (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
-  }
-}


Mime
View raw message