accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [11/59] [abbrv] ACCUMULO-658 Move master to its own module
Date Sat, 07 Sep 2013 03:28:14 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
new file mode 100644
index 0000000..ee31cb9
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import static java.lang.Math.min;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.master.Master.TabletGoalState;
+import org.apache.accumulo.master.state.Assignment;
+import org.apache.accumulo.master.state.DistributedStoreException;
+import org.apache.accumulo.master.state.MergeInfo;
+import org.apache.accumulo.master.state.MergeState;
+import org.apache.accumulo.master.state.MergeStats;
+import org.apache.accumulo.master.state.TServerInstance;
+import org.apache.accumulo.master.state.TableCounts;
+import org.apache.accumulo.master.state.TableStats;
+import org.apache.accumulo.master.state.TabletLocationState;
+import org.apache.accumulo.master.state.TabletState;
+import org.apache.accumulo.master.state.TabletStateStore;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.tabletserver.TabletTime;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+
+class TabletGroupWatcher extends Daemon {
+  
+  private final Master master;
+  final TabletStateStore store;
+  final TabletGroupWatcher dependentWatcher;
+  
+  final TableStats stats = new TableStats();
+  
+  TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) {
+    this.master = master;
+    this.store = store;
+    this.dependentWatcher = dependentWatcher;
+  }
+  
+  Map<Text,TableCounts> getStats() {
+    return stats.getLast();
+  }
+  
+  TableCounts getStats(Text tableId) {
+    return stats.getLast(tableId);
+  }
+  
+  @Override
+  public void run() {
+    
+    Thread.currentThread().setName("Watching " + store.name());
+    int[] oldCounts = new int[TabletState.values().length];
+    EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
+    
+    while (this.master.stillMaster()) {
+      int totalUnloaded = 0;
+      int unloaded = 0;
+      try {
+        Map<Text,MergeStats> mergeStatsCache = new HashMap<Text,MergeStats>();
+        
+        // Get the current status for the current list of tservers
+        SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<TServerInstance,TabletServerStatus>();
+        for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) {
+          currentTServers.put(entry, this.master.tserverStatus.get(entry));
+        }
+        
+        if (currentTServers.size() == 0) {
+          eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+          continue;
+        }
+        
+        // Don't move tablets to servers that are shutting down
+        SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<TServerInstance,TabletServerStatus>(currentTServers);
+        destinations.keySet().removeAll(this.master.serversToShutdown);
+        
+        List<Assignment> assignments = new ArrayList<Assignment>();
+        List<Assignment> assigned = new ArrayList<Assignment>();
+        List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
+        Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
+        
+        int[] counts = new int[TabletState.values().length];
+        stats.begin();
+        // Walk through the tablets in our store, and work tablets
+        // towards their goal
+        for (TabletLocationState tls : store) {
+          if (tls == null) {
+            continue;
+          }
+          // ignore entries for tables that do not exist in zookeeper
+          if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
+            continue;
+          
+          // Don't overwhelm the tablet servers with work
+          if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
+            flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+            assignments.clear();
+            assigned.clear();
+            assignedToDeadServers.clear();
+            unassigned.clear();
+            unloaded = 0;
+            eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+          }
+          Text tableId = tls.extent.getTableId();
+          MergeStats mergeStats = mergeStatsCache.get(tableId);
+          if (mergeStats == null) {
+            mergeStatsCache.put(tableId, mergeStats = new MergeStats(this.master.getMergeInfo(tls.extent)));
+          }
+          TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
+          TServerInstance server = tls.getServer();
+          TabletState state = tls.getState(currentTServers.keySet());
+          stats.update(tableId, state);
+          mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
+          sendChopRequest(mergeStats.getMergeInfo(), state, tls);
+          sendSplitRequest(mergeStats.getMergeInfo(), state, tls);
+          
+          // Always follow through with assignments
+          if (state == TabletState.ASSIGNED) {
+            goal = TabletGoalState.HOSTED;
+          }
+          
+          // if we are shutting down all the tabletservers, we have to do it in order
+          if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) {
+            if (this.master.serversToShutdown.equals(currentTServers.keySet())) {
+              if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) {
+                goal = TabletGoalState.HOSTED;
+              }
+            }
+          }
+          
+          if (goal == TabletGoalState.HOSTED) {
+            if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
+              if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs))
+                continue;
+            }
+            switch (state) {
+              case HOSTED:
+                if (server.equals(this.master.migrations.get(tls.extent)))
+                  this.master.migrations.remove(tls.extent);
+                break;
+              case ASSIGNED_TO_DEAD_SERVER:
+                assignedToDeadServers.add(tls);
+                if (server.equals(this.master.migrations.get(tls.extent)))
+                  this.master.migrations.remove(tls.extent);
+                // log.info("Current servers " + currentTServers.keySet());
+                break;
+              case UNASSIGNED:
+                // maybe it's a finishing migration
+                TServerInstance dest = this.master.migrations.get(tls.extent);
+                if (dest != null) {
+                  // if destination is still good, assign it
+                  if (destinations.keySet().contains(dest)) {
+                    assignments.add(new Assignment(tls.extent, dest));
+                  } else {
+                    // get rid of this migration
+                    this.master.migrations.remove(tls.extent);
+                    unassigned.put(tls.extent, server);
+                  }
+                } else {
+                  unassigned.put(tls.extent, server);
+                }
+                break;
+              case ASSIGNED:
+                // Send another reminder
+                assigned.add(new Assignment(tls.extent, tls.future));
+                break;
+            }
+          } else {
+            switch (state) {
+              case UNASSIGNED:
+                break;
+              case ASSIGNED_TO_DEAD_SERVER:
+                assignedToDeadServers.add(tls);
+                // log.info("Current servers " + currentTServers.keySet());
+                break;
+              case HOSTED:
+                TServerConnection conn = this.master.tserverSet.getConnection(server);
+                if (conn != null) {
+                  conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED);
+                  unloaded++;
+                  totalUnloaded++;
+                } else {
+                  Master.log.warn("Could not connect to server " + server);
+                }
+                break;
+              case ASSIGNED:
+                break;
+            }
+          }
+          counts[state.ordinal()]++;
+        }
+        
+        flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+        
+        // provide stats after flushing changes to avoid race conditions w/ delete table
+        stats.end();
+        
+        // Report changes
+        for (TabletState state : TabletState.values()) {
+          int i = state.ordinal();
+          if (counts[i] > 0 && counts[i] != oldCounts[i]) {
+            this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name());
+          }
+        }
+        Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.));
+        oldCounts = counts;
+        if (totalUnloaded > 0) {
+          this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded);
+        }
+        
+        updateMergeState(mergeStatsCache);
+        
+        Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
+        eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+      } catch (Exception ex) {
+        Master.log.error("Error processing table state for store " + store.name(), ex);
+        UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS);
+      }
+    }
+  }
+  
+  private int assignedOrHosted() {
+    int result = 0;
+    for (TableCounts counts : stats.getLast().values()) {
+      result += counts.assigned() + counts.hosted();
+    }
+    return result;
+  }
+  
+  private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
+    // Already split?
+    if (!info.getState().equals(MergeState.SPLITTING))
+      return;
+    // Merges don't split
+    if (!info.isDelete())
+      return;
+    // Online and ready to split?
+    if (!state.equals(TabletState.HOSTED))
+      return;
+    // Does this extent cover the end points of the delete?
+    KeyExtent range = info.getExtent();
+    if (tls.extent.overlaps(range)) {
+      for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) {
+        if (splitPoint == null)
+          continue;
+        if (!tls.extent.contains(splitPoint))
+          continue;
+        if (splitPoint.equals(tls.extent.getEndRow()))
+          continue;
+        if (splitPoint.equals(tls.extent.getPrevEndRow()))
+          continue;
+        try {
+          TServerConnection conn;
+          conn = this.master.tserverSet.getConnection(tls.current);
+          if (conn != null) {
+            Master.log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);
+            conn.splitTablet(this.master.masterLock, tls.extent, splitPoint);
+          } else {
+            Master.log.warn("Not connected to server " + tls.current);
+          }
+        } catch (NotServingTabletException e) {
+          Master.log.debug("Error asking tablet server to split a tablet: " + e);
+        } catch (Exception e) {
+          Master.log.warn("Error asking tablet server to split a tablet: " + e);
+        }
+      }
+    }
+  }
+  
+  private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
+    // Don't bother if we're in the wrong state
+    if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED))
+      return;
+    // Tablet must be online
+    if (!state.equals(TabletState.HOSTED))
+      return;
+    // Tablet isn't already chopped
+    if (tls.chopped)
+      return;
+    // Tablet ranges intersect
+    if (info.needsToBeChopped(tls.extent)) {
+      TServerConnection conn;
+      try {
+        conn = this.master.tserverSet.getConnection(tls.current);
+        if (conn != null) {
+          Master.log.info("Asking " + tls.current + " to chop " + tls.extent);
+          conn.chop(this.master.masterLock, tls.extent);
+        } else {
+          Master.log.warn("Could not connect to server " + tls.current);
+        }
+      } catch (TException e) {
+        Master.log.warn("Communications error asking tablet server to chop a tablet");
+      }
+    }
+  }
+  
+  private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
+    for (MergeStats stats : mergeStatsCache.values()) {
+      try {
+        MergeState update = stats.nextMergeState(this.master.getConnector(), this.master);
+        // when next state is MERGING, its important to persist this before
+        // starting the merge... the verification check that is done before
+        // moving into the merging state could fail if merge starts but does
+        // not finish
+        if (update == MergeState.COMPLETE)
+          update = MergeState.NONE;
+        if (update != stats.getMergeInfo().getState()) {
+          this.master.setMergeState(stats.getMergeInfo(), update);
+        }
+        
+        if (update == MergeState.MERGING) {
+          try {
+            if (stats.getMergeInfo().isDelete()) {
+              deleteTablets(stats.getMergeInfo());
+            } else {
+              mergeMetadataRecords(stats.getMergeInfo());
+            }
+            this.master.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
+          } catch (Exception ex) {
+            Master.log.error("Unable merge metadata table records", ex);
+          }
+        }
+      } catch (Exception ex) {
+        Master.log.error("Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex);
+      }
+    }
+  }
+  
+  private void deleteTablets(MergeInfo info) throws AccumuloException {
+    KeyExtent extent = info.getExtent();
+    String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME;
+    Master.log.debug("Deleting tablets for " + extent);
+    char timeType = '\0';
+    KeyExtent followingTablet = null;
+    if (extent.getEndRow() != null) {
+      Key nextExtent = new Key(extent.getEndRow()).followingKey(PartialKey.ROW);
+      followingTablet = getHighTablet(new KeyExtent(extent.getTableId(), nextExtent.getRow(), extent.getEndRow()));
+      Master.log.debug("Found following tablet " + followingTablet);
+    }
+    try {
+      Connector conn = this.master.getConnector();
+      Text start = extent.getPrevEndRow();
+      if (start == null) {
+        start = new Text();
+      }
+      Master.log.debug("Making file deletion entries for " + extent);
+      Range deleteRange = new Range(KeyExtent.getMetadataEntry(extent.getTableId(), start), false, KeyExtent.getMetadataEntry(extent.getTableId(),
+          extent.getEndRow()), true);
+      Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+      scanner.setRange(deleteRange);
+      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+      scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+      Set<FileRef> datafiles = new TreeSet<FileRef>();
+      for (Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
+          datafiles.add(new FileRef(this.master.fs, key));
+          if (datafiles.size() > 1000) {
+            MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
+            datafiles.clear();
+          }
+        } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
+          timeType = entry.getValue().toString().charAt(0);
+        } else if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) {
+          throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
+        } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
+          datafiles.add(new FileRef(this.master.fs, key));
+          if (datafiles.size() > 1000) {
+            MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
+            datafiles.clear();
+          }
+        }
+      }
+      MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get());
+      BatchWriter bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+      try {
+        deleteTablets(info, deleteRange, bw, conn);
+      } finally {
+        bw.close();
+      }
+      
+      if (followingTablet != null) {
+        Master.log.debug("Updating prevRow of " + followingTablet + " to " + extent.getPrevEndRow());
+        bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+        try {
+          Mutation m = new Mutation(followingTablet.getMetadataEntry());
+          TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(extent.getPrevEndRow()));
+          ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
+          bw.addMutation(m);
+          bw.flush();
+        } finally {
+          bw.close();
+        }
+      } else {
+        // Recreate the default tablet to hold the end of the table
+        Master.log.debug("Recreating the last tablet to point to " + extent.getPrevEndRow());
+        MetadataTableUtil.addTablet(new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), Constants.DEFAULT_TABLET_LOCATION,
+            SystemCredentials.get(), timeType, this.master.masterLock);
+      }
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    }
+  }
+  
+  private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
+    KeyExtent range = info.getExtent();
+    Master.log.debug("Merging metadata for " + range);
+    KeyExtent stop = getHighTablet(range);
+    Master.log.debug("Highest tablet is " + stop);
+    Value firstPrevRowValue = null;
+    Text stopRow = stop.getMetadataEntry();
+    Text start = range.getPrevEndRow();
+    if (start == null) {
+      start = new Text();
+    }
+    Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);
+    String targetSystemTable = MetadataTable.NAME;
+    if (range.isMeta()) {
+      targetSystemTable = RootTable.NAME;
+    }
+    
+    BatchWriter bw = null;
+    try {
+      long fileCount = 0;
+      Connector conn = this.master.getConnector();
+      // Make file entries in highest tablet
+      bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig());
+      Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+      scanner.setRange(scanRange);
+      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+      Mutation m = new Mutation(stopRow);
+      String maxLogicalTime = null;
+      for (Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        Value value = entry.getValue();
+        if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+          m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
+          fileCount++;
+        } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) {
+          Master.log.debug("prevRow entry for lowest tablet is " + value);
+          firstPrevRowValue = new Value(value);
+        } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
+          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
+        } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
+          bw.addMutation(MetadataTableUtil.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString()));
+        }
+      }
+      
+      // read the logical time from the last tablet in the merge range, it is not included in
+      // the loop above
+      scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY);
+      scanner.setRange(new Range(stopRow));
+      TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+      for (Entry<Key,Value> entry : scanner) {
+        if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
+          maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
+        }
+      }
+      
+      if (maxLogicalTime != null)
+        TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes()));
+      
+      if (!m.getUpdates().isEmpty()) {
+        bw.addMutation(m);
+      }
+      
+      bw.flush();
+      
+      Master.log.debug("Moved " + fileCount + " files to " + stop);
+      
+      if (firstPrevRowValue == null) {
+        Master.log.debug("tablet already merged");
+        return;
+      }
+      
+      stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));
+      Mutation updatePrevRow = stop.getPrevRowUpdateMutation();
+      Master.log.debug("Setting the prevRow for last tablet: " + stop);
+      bw.addMutation(updatePrevRow);
+      bw.flush();
+      
+      deleteTablets(info, scanRange, bw, conn);
+      
+      // Clean-up the last chopped marker
+      m = new Mutation(stopRow);
+      ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
+      bw.addMutation(m);
+      bw.flush();
+      
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    } finally {
+      if (bw != null)
+        try {
+          bw.close();
+        } catch (Exception ex) {
+          throw new AccumuloException(ex);
+        }
+    }
+  }
+  
+  private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException {
+    Scanner scanner;
+    Mutation m;
+    // Delete everything in the other tablets
+    // group all deletes into tablet into one mutation, this makes tablets
+    // either disappear entirely or not all.. this is important for the case
+    // where the process terminates in the loop below...
+    scanner = conn.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
+    Master.log.debug("Deleting range " + scanRange);
+    scanner.setRange(scanRange);
+    RowIterator rowIter = new RowIterator(scanner);
+    while (rowIter.hasNext()) {
+      Iterator<Entry<Key,Value>> row = rowIter.next();
+      m = null;
+      while (row.hasNext()) {
+        Entry<Key,Value> entry = row.next();
+        Key key = entry.getKey();
+        
+        if (m == null)
+          m = new Mutation(key.getRow());
+        
+        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+        Master.log.debug("deleting entry " + key);
+      }
+      bw.addMutation(m);
+    }
+    
+    bw.flush();
+  }
+  
+  private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
+    try {
+      Connector conn = this.master.getConnector();
+      Scanner scanner = conn.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
+      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+      KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);
+      scanner.setRange(new Range(start.getMetadataEntry(), null));
+      Iterator<Entry<Key,Value>> iterator = scanner.iterator();
+      if (!iterator.hasNext()) {
+        throw new AccumuloException("No last tablet for a merge " + range);
+      }
+      Entry<Key,Value> entry = iterator.next();
+      KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
+      if (highTablet.getTableId() != range.getTableId()) {
+        throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
+      }
+      return highTablet;
+    } catch (Exception ex) {
+      throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);
+    }
+  }
+  
+  private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
+      List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
+    if (!assignedToDeadServers.isEmpty()) {
+      int maxServersToShow = min(assignedToDeadServers.size(), 100);
+      Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
+      store.unassign(assignedToDeadServers);
+      this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
+    }
+    
+    if (!currentTServers.isEmpty()) {
+      Map<KeyExtent,TServerInstance> assignedOut = new HashMap<KeyExtent,TServerInstance>();
+      this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);
+      for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) {
+        if (unassigned.containsKey(assignment.getKey())) {
+          if (assignment.getValue() != null) {
+            Master.log.debug(store.name() + " assigning tablet " + assignment);
+            assignments.add(new Assignment(assignment.getKey(), assignment.getValue()));
+          }
+        } else {
+          Master.log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());
+        }
+      }
+      if (!unassigned.isEmpty() && assignedOut.isEmpty())
+        Master.log.warn("Load balancer failed to assign any tablets");
+    }
+    
+    if (assignments.size() > 0) {
+      Master.log.info(String.format("Assigning %d tablets", assignments.size()));
+      store.setFutureLocations(assignments);
+    }
+    assignments.addAll(assigned);
+    for (Assignment a : assignments) {
+      TServerConnection conn = this.master.tserverSet.getConnection(a.server);
+      if (conn != null) {
+        conn.assignTablet(this.master.masterLock, a.tablet);
+      } else {
+        Master.log.warn("Could not connect to server " + a.server);
+      }
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancer.java b/server/master/src/main/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancer.java
new file mode 100644
index 0000000..e0ecc59
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancer.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.balancer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.master.state.TServerInstance;
+import org.apache.accumulo.master.state.TabletMigration;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.thrift.TException;
+
+/**
+ * A chaotic load balancer used for testing. It constantly shuffles tablets, preventing them from resting in a single location for very long. This is not
+ * designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer.
+ */
+public class ChaoticLoadBalancer extends TabletBalancer {
+  Random r = new Random();
+  
+  @Override
+  public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
+      Map<KeyExtent,TServerInstance> assignments) {
+    long total = assignments.size() + unassigned.size();
+    long avg = (long) Math.ceil(((double) total) / current.size());
+    Map<TServerInstance,Long> toAssign = new HashMap<TServerInstance,Long>();
+    List<TServerInstance> tServerArray = new ArrayList<TServerInstance>();
+    for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+      long numTablets = 0;
+      for (TableInfo ti : e.getValue().getTableMap().values()) {
+        numTablets += ti.tablets;
+      }
+      if (numTablets < avg) {
+        tServerArray.add(e.getKey());
+        toAssign.put(e.getKey(), avg - numTablets);
+      }
+    }
+    
+    for (KeyExtent ke : unassigned.keySet()) {
+      int index = r.nextInt(tServerArray.size());
+      TServerInstance dest = tServerArray.get(index);
+      assignments.put(ke, dest);
+      long remaining = toAssign.get(dest).longValue() - 1;
+      if (remaining == 0) {
+        tServerArray.remove(index);
+        toAssign.remove(dest);
+      } else {
+        toAssign.put(dest, remaining);
+      }
+    }
+  }
+  
+  /**
+   * Will balance randomly, maintaining distribution
+   */
+  @Override
+  public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
+    Map<TServerInstance,Long> numTablets = new HashMap<TServerInstance,Long>();
+    List<TServerInstance> underCapacityTServer = new ArrayList<TServerInstance>();
+    
+    if (!migrations.isEmpty())
+      return 100;
+    
+    boolean moveMetadata = r.nextInt(4) == 0;
+    long totalTablets = 0;
+    for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+      long tabletCount = 0;
+      for (TableInfo ti : e.getValue().getTableMap().values()) {
+        tabletCount += ti.tablets;
+      }
+      numTablets.put(e.getKey(), tabletCount);
+      underCapacityTServer.add(e.getKey());
+      totalTablets += tabletCount;
+    }
+    // totalTablets is fuzzy due to asynchronicity of the stats
+    // *1.2 to handle fuzziness, and prevent locking for 'perfect' balancing scenarios
+    long avg = (long) Math.ceil(((double) totalTablets) / current.size() * 1.2);
+    
+    for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+      for (String table : e.getValue().getTableMap().keySet()) {
+        if (!moveMetadata && MetadataTable.NAME.equals(table))
+          continue;
+        try {
+          for (TabletStats ts : getOnlineTabletsForTable(e.getKey(), table)) {
+            KeyExtent ke = new KeyExtent(ts.extent);
+            int index = r.nextInt(underCapacityTServer.size());
+            TServerInstance dest = underCapacityTServer.get(index);
+            if (dest.equals(e.getKey()))
+              continue;
+            migrationsOut.add(new TabletMigration(ke, e.getKey(), dest));
+            if (numTablets.put(dest, numTablets.get(dest) + 1) > avg)
+              underCapacityTServer.remove(index);
+            if (numTablets.put(e.getKey(), numTablets.get(e.getKey()) - 1) <= avg && !underCapacityTServer.contains(e.getKey()))
+              underCapacityTServer.add(e.getKey());
+            
+            // We can get some craziness with only 1 tserver, so lets make sure there's always an option!
+            if (underCapacityTServer.isEmpty())
+              underCapacityTServer.addAll(numTablets.keySet());
+          }
+        } catch (ThriftSecurityException e1) {
+          // Shouldn't happen, but carry on if it does
+          e1.printStackTrace();
+        } catch (TException e1) {
+          // Shouldn't happen, but carry on if it does
+          e1.printStackTrace();
+        }
+      }
+    }
+    
+    return 100;
+  }
+  
+  @Override
+  public void init(ServerConfiguration conf) {
+    super.init(conf);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/balancer/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/balancer/DefaultLoadBalancer.java b/server/master/src/main/java/org/apache/accumulo/master/balancer/DefaultLoadBalancer.java
new file mode 100644
index 0000000..571316e
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/balancer/DefaultLoadBalancer.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.balancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.master.state.TServerInstance;
+import org.apache.accumulo.master.state.TabletMigration;
+import org.apache.log4j.Logger;
+
+public class DefaultLoadBalancer extends TabletBalancer {
+  
+  private static final Logger log = Logger.getLogger(DefaultLoadBalancer.class);
+  
+  Iterator<TServerInstance> assignments;
+  // if tableToBalance is set, then only balance the given table
+  String tableToBalance = null;
+  
+  public DefaultLoadBalancer() {
+    
+  }
+  
+  public DefaultLoadBalancer(String table) {
+    tableToBalance = table;
+  }
+  
+  List<TServerInstance> randomize(Set<TServerInstance> locations) {
+    List<TServerInstance> result = new ArrayList<TServerInstance>(locations);
+    Collections.shuffle(result);
+    return result;
+  }
+  
+  public TServerInstance getAssignment(SortedMap<TServerInstance,TabletServerStatus> locations, KeyExtent extent, TServerInstance last) {
+    if (locations.size() == 0)
+      return null;
+    
+    if (last != null) {
+      // Maintain locality
+      TServerInstance simple = new TServerInstance(last.getLocation(), "");
+      Iterator<TServerInstance> find = locations.tailMap(simple).keySet().iterator();
+      if (find.hasNext()) {
+        TServerInstance current = find.next();
+        if (current.host().equals(last.host()))
+          return current;
+      }
+    }
+    
+    // The strategy here is to walk through the locations and hand them back, one at a time
+    // Grab an iterator off of the set of options; use a new iterator if it hands back something not in the current list.
+    if (assignments == null || !assignments.hasNext())
+      assignments = randomize(locations.keySet()).iterator();
+    TServerInstance result = assignments.next();
+    if (!locations.containsKey(result)) {
+      assignments = null;
+      return randomize(locations.keySet()).iterator().next();
+    }
+    return result;
+  }
+  
+  static class ServerCounts implements Comparable<ServerCounts> {
+    public final TServerInstance server;
+    public final int count;
+    public final TabletServerStatus status;
+    
+    ServerCounts(int count, TServerInstance server, TabletServerStatus status) {
+      this.count = count;
+      this.server = server;
+      this.status = status;
+    }
+    
+    public int compareTo(ServerCounts obj) {
+      int result = count - obj.count;
+      if (result == 0)
+        return server.compareTo(obj.server);
+      return result;
+    }
+  }
+  
+  public boolean getMigrations(Map<TServerInstance,TabletServerStatus> current, List<TabletMigration> result) {
+    boolean moreBalancingNeeded = false;
+    try {
+      // no moves possible
+      if (current.size() < 2) {
+        return false;
+      }
+      
+      // Sort by total number of online tablets, per server
+      int total = 0;
+      ArrayList<ServerCounts> totals = new ArrayList<ServerCounts>();
+      for (Entry<TServerInstance,TabletServerStatus> entry : current.entrySet()) {
+        int serverTotal = 0;
+        if (entry.getValue() != null && entry.getValue().tableMap != null) {
+          for (Entry<String,TableInfo> e : entry.getValue().tableMap.entrySet()) {
+            /**
+             * The check below was on entry.getKey(), but that resolves to a tabletserver not a tablename. Believe it should be e.getKey() which is a tablename
+             */
+            if (tableToBalance == null || tableToBalance.equals(e.getKey()))
+              serverTotal += e.getValue().onlineTablets;
+          }
+        }
+        totals.add(new ServerCounts(serverTotal, entry.getKey(), entry.getValue()));
+        total += serverTotal;
+      }
+      
+      // order from low to high
+      Collections.sort(totals);
+      Collections.reverse(totals);
+      int even = total / totals.size();
+      int numServersOverEven = total % totals.size();
+      
+      // Move tablets from the servers with too many to the servers with
+      // the fewest but only nominate tablets to move once. This allows us
+      // to fill new servers with tablets from a mostly balanced server
+      // very quickly. However, it may take several balancing passes to move
+      // tablets from one hugely overloaded server to many slightly
+      // under-loaded servers.
+      int end = totals.size() - 1;
+      int movedAlready = 0;
+      for (int tooManyIndex = 0; tooManyIndex < totals.size(); tooManyIndex++) {
+        ServerCounts tooMany = totals.get(tooManyIndex);
+        int goal = even;
+        if (tooManyIndex < numServersOverEven) {
+          goal++;
+        }
+        int needToUnload = tooMany.count - goal;
+        ServerCounts tooLittle = totals.get(end);
+        int needToLoad = goal - tooLittle.count - movedAlready;
+        if (needToUnload < 1 && needToLoad < 1) {
+          break;
+        }
+        if (needToUnload >= needToLoad) {
+          result.addAll(move(tooMany, tooLittle, needToLoad));
+          end--;
+          movedAlready = 0;
+        } else {
+          result.addAll(move(tooMany, tooLittle, needToUnload));
+          movedAlready += needToUnload;
+        }
+        if (needToUnload > needToLoad)
+          moreBalancingNeeded = true;
+      }
+      
+    } finally {
+      log.debug("balance ended with " + result.size() + " migrations");
+    }
+    return moreBalancingNeeded;
+  }
+  
+  static class TableDiff {
+    int diff;
+    String table;
+    
+    public TableDiff(int diff, String table) {
+      this.diff = diff;
+      this.table = table;
+    }
+  };
+  
+  /**
+   * Select a tablet based on differences between table loads; if the loads are even, use the busiest table
+   */
+  List<TabletMigration> move(ServerCounts tooMuch, ServerCounts tooLittle, int count) {
+    
+    List<TabletMigration> result = new ArrayList<TabletMigration>();
+    if (count == 0)
+      return result;
+    
+    Map<String,Map<KeyExtent,TabletStats>> onlineTablets = new HashMap<String,Map<KeyExtent,TabletStats>>();
+    // Copy counts so we can update them as we propose migrations
+    Map<String,Integer> tooMuchMap = tabletCountsPerTable(tooMuch.status);
+    Map<String,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status);
+    
+    for (int i = 0; i < count; i++) {
+      String table;
+      Integer tooLittleCount;
+      if (tableToBalance == null) {
+        // find a table to migrate
+        // look for an uneven table count
+        int biggestDifference = 0;
+        String biggestDifferenceTable = null;
+        for (Entry<String,Integer> tableEntry : tooMuchMap.entrySet()) {
+          String tableID = tableEntry.getKey();
+          if (tooLittleMap.get(tableID) == null)
+            tooLittleMap.put(tableID, 0);
+          int diff = tableEntry.getValue() - tooLittleMap.get(tableID);
+          if (diff > biggestDifference) {
+            biggestDifference = diff;
+            biggestDifferenceTable = tableID;
+          }
+        }
+        if (biggestDifference < 2) {
+          table = busiest(tooMuch.status.tableMap);
+        } else {
+          table = biggestDifferenceTable;
+        }
+      } else {
+        // just balance the given table
+        table = tableToBalance;
+      }
+      Map<KeyExtent,TabletStats> onlineTabletsForTable = onlineTablets.get(table);
+      try {
+        if (onlineTabletsForTable == null) {
+          onlineTabletsForTable = new HashMap<KeyExtent,TabletStats>();
+          for (TabletStats stat : getOnlineTabletsForTable(tooMuch.server, table))
+            onlineTabletsForTable.put(new KeyExtent(stat.extent), stat);
+          onlineTablets.put(table, onlineTabletsForTable);
+        }
+      } catch (Exception ex) {
+        log.error("Unable to select a tablet to move", ex);
+        return result;
+      }
+      KeyExtent extent = selectTablet(tooMuch.server, onlineTabletsForTable);
+      onlineTabletsForTable.remove(extent);
+      if (extent == null)
+        return result;
+      tooMuchMap.put(table, tooMuchMap.get(table) - 1);
+      /**
+       * If a table grows from 1 tablet then tooLittleMap.get(table) can return a null, since there is only one tabletserver that holds all of the tablets. Here
+       * we check to see if in fact that is the case and if so set the value to 0.
+       */
+      tooLittleCount = tooLittleMap.get(table);
+      if (tooLittleCount == null) {
+        tooLittleCount = 0;
+      }
+      tooLittleMap.put(table, tooLittleCount + 1);
+      
+      result.add(new TabletMigration(extent, tooMuch.server, tooLittle.server));
+    }
+    return result;
+  }
+  
+  static Map<String,Integer> tabletCountsPerTable(TabletServerStatus status) {
+    Map<String,Integer> result = new HashMap<String,Integer>();
+    if (status != null && status.tableMap != null) {
+      Map<String,TableInfo> tableMap = status.tableMap;
+      for (Entry<String,TableInfo> entry : tableMap.entrySet()) {
+        result.put(entry.getKey(), entry.getValue().onlineTablets);
+      }
+    }
+    return result;
+  }
+  
+  static KeyExtent selectTablet(TServerInstance tserver, Map<KeyExtent,TabletStats> extents) {
+    if (extents.size() == 0)
+      return null;
+    KeyExtent mostRecentlySplit = null;
+    long splitTime = 0;
+    for (Entry<KeyExtent,TabletStats> entry : extents.entrySet())
+      if (entry.getValue().splitCreationTime >= splitTime) {
+        splitTime = entry.getValue().splitCreationTime;
+        mostRecentlySplit = entry.getKey();
+      }
+    return mostRecentlySplit;
+  }
+  
+  // define what it means for a tablet to be busy
+  private static String busiest(Map<String,TableInfo> tables) {
+    String result = null;
+    double busiest = Double.NEGATIVE_INFINITY;
+    for (Entry<String,TableInfo> entry : tables.entrySet()) {
+      TableInfo info = entry.getValue();
+      double busy = info.ingestRate + info.queryRate;
+      if (busy > busiest) {
+        busiest = busy;
+        result = entry.getKey();
+      }
+    }
+    return result;
+  }
+  
+  @Override
+  public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
+      Map<KeyExtent,TServerInstance> assignments) {
+    for (Entry<KeyExtent,TServerInstance> entry : unassigned.entrySet()) {
+      assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), entry.getValue()));
+    }
+  }
+  
+  @Override
+  public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
+    // do we have any servers?
+    if (current.size() > 0) {
+      // Don't migrate if we have migrations in progress
+      if (migrations.size() == 0) {
+        if (getMigrations(current, migrationsOut))
+          return 1 * 1000;
+      }
+    }
+    return 5 * 1000;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/balancer/TableLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/balancer/TableLoadBalancer.java b/server/master/src/main/java/org/apache/accumulo/master/balancer/TableLoadBalancer.java
new file mode 100644
index 0000000..64dc6de
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/balancer/TableLoadBalancer.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.balancer;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.master.state.TServerInstance;
+import org.apache.accumulo.master.state.TabletMigration;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.log4j.Logger;
+
+public class TableLoadBalancer extends TabletBalancer {
+  
+  private static final Logger log = Logger.getLogger(TableLoadBalancer.class);
+  
+  Map<String,TabletBalancer> perTableBalancers = new HashMap<String,TabletBalancer>();
+  
+  private TabletBalancer constructNewBalancerForTable(String clazzName, String table) throws Exception {
+    Class<? extends TabletBalancer> clazz = AccumuloVFSClassLoader.loadClass(clazzName, TabletBalancer.class);
+    Constructor<? extends TabletBalancer> constructor = clazz.getConstructor(String.class);
+    return constructor.newInstance(table);
+  }
+  
+  protected String getLoadBalancerClassNameForTable(String table) {
+    return configuration.getTableConfiguration(table).get(Property.TABLE_LOAD_BALANCER);
+  }
+  
+  protected TabletBalancer getBalancerForTable(String table) {
+    TabletBalancer balancer = perTableBalancers.get(table);
+    
+    String clazzName = getLoadBalancerClassNameForTable(table);
+    
+    if (clazzName == null)
+      clazzName = DefaultLoadBalancer.class.getName();
+    if (balancer != null) {
+      if (clazzName.equals(balancer.getClass().getName()) == false) {
+        // the balancer class for this table does not match the class specified in the configuration
+        try {
+          // attempt to construct a balancer with the specified class
+          TabletBalancer newBalancer = constructNewBalancerForTable(clazzName, table);
+          if (newBalancer != null) {
+            balancer = newBalancer;
+            perTableBalancers.put(table, balancer);
+            balancer.init(configuration);
+          }
+        } catch (Exception e) {
+          log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e);
+        }
+      }
+    }
+    if (balancer == null) {
+      try {
+        balancer = constructNewBalancerForTable(clazzName, table);
+        log.info("Loaded class " + clazzName + " for table " + table);
+      } catch (Exception e) {
+        log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e);
+      }
+      
+      if (balancer == null) {
+        log.info("Using balancer " + DefaultLoadBalancer.class.getName() + " for table " + table);
+        balancer = new DefaultLoadBalancer(table);
+      }
+      perTableBalancers.put(table, balancer);
+      balancer.init(configuration);
+    }
+    return balancer;
+  }
+  
+  @Override
+  public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
+      Map<KeyExtent,TServerInstance> assignments) {
+    // separate the unassigned into tables
+    Map<String,Map<KeyExtent,TServerInstance>> groupedUnassigned = new HashMap<String,Map<KeyExtent,TServerInstance>>();
+    for (Entry<KeyExtent,TServerInstance> e : unassigned.entrySet()) {
+      Map<KeyExtent,TServerInstance> tableUnassigned = groupedUnassigned.get(e.getKey().getTableId().toString());
+      if (tableUnassigned == null) {
+        tableUnassigned = new HashMap<KeyExtent,TServerInstance>();
+        groupedUnassigned.put(e.getKey().getTableId().toString(), tableUnassigned);
+      }
+      tableUnassigned.put(e.getKey(), e.getValue());
+    }
+    for (Entry<String,Map<KeyExtent,TServerInstance>> e : groupedUnassigned.entrySet()) {
+      Map<KeyExtent,TServerInstance> newAssignments = new HashMap<KeyExtent,TServerInstance>();
+      getBalancerForTable(e.getKey()).getAssignments(current, e.getValue(), newAssignments);
+      assignments.putAll(newAssignments);
+    }
+  }
+  
+  private TableOperations tops = null;
+  
+  protected TableOperations getTableOperations() {
+    if (tops == null)
+      try {
+        tops = configuration.getInstance().getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken()).tableOperations();
+      } catch (AccumuloException e) {
+        log.error("Unable to access table operations from within table balancer", e);
+      } catch (AccumuloSecurityException e) {
+        log.error("Unable to access table operations from within table balancer", e);
+      }
+    return tops;
+  }
+  
+  @Override
+  public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
+    long minBalanceTime = 5 * 1000;
+    // Iterate over the tables and balance each of them
+    TableOperations t = getTableOperations();
+    if (t == null)
+      return minBalanceTime;
+    for (String s : t.tableIdMap().values()) {
+      ArrayList<TabletMigration> newMigrations = new ArrayList<TabletMigration>();
+      long tableBalanceTime = getBalancerForTable(s).balance(current, migrations, newMigrations);
+      if (tableBalanceTime < minBalanceTime)
+        minBalanceTime = tableBalanceTime;
+      migrationsOut.addAll(newMigrations);
+    }
+    return minBalanceTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/balancer/TabletBalancer.java b/server/master/src/main/java/org/apache/accumulo/master/balancer/TabletBalancer.java
new file mode 100644
index 0000000..a776656
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/balancer/TabletBalancer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.balancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.master.state.TServerInstance;
+import org.apache.accumulo.master.state.TabletMigration;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+public abstract class TabletBalancer {
+  
+  private static final Logger log = Logger.getLogger(TabletBalancer.class);
+  
+  protected ServerConfiguration configuration;
+  
+  /**
+   * Initialize the TabletBalancer. This gives the balancer the opportunity to read the configuration.
+   */
+  public void init(ServerConfiguration conf) {
+    configuration = conf;
+  }
+  
+  /**
+   * Assign tablets to tablet servers. This method is called whenever the master finds tablets that are unassigned.
+   * 
+   * @param current
+   *          The current table-summary state of all the online tablet servers. Read-only. The TabletServerStatus for each server may be null if the tablet
+   *          server has not yet responded to a recent request for status.
+   * @param unassigned
+   *          A map from unassigned tablet to the last known tablet server. Read-only.
+   * @param assignments
+   *          A map from tablet to assigned server. Write-only.
+   */
+  abstract public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
+      Map<KeyExtent,TServerInstance> assignments);
+  
+  /**
+   * Ask the balancer if any migrations are necessary.
+   * 
+   * @param current
+   *          The current table-summary state of all the online tablet servers. Read-only.
+   * @param migrations
+   *          the current set of migrations. Read-only.
+   * @param migrationsOut
+   *          new migrations to perform; should not contain tablets in the current set of migrations. Write-only.
+   * @return the time, in milliseconds, to wait before re-balancing.
+   * 
+   *         This method will not be called when there are unassigned tablets.
+   */
+  public abstract long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut);
+  
+  /**
+   * Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets
+   * to move.
+   * 
+   * @param tserver
+   *          The tablet server to ask.
+   * @param tableId
+   *          The table id
+   * @return a list of tablet statistics
+   * @throws ThriftSecurityException
+   *           tablet server disapproves of your internal System password.
+   * @throws TException
+   *           any other problem
+   */
+  public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException {
+    log.debug("Scanning tablet server " + tserver + " for table " + tableId);
+    Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), tserver.getLocation(), configuration.getConfiguration());
+    try {
+      List<TabletStats> onlineTabletsForTable = client.getTabletStats(Tracer.traceInfo(), SystemCredentials.get().toThrift(configuration.getInstance()),
+          tableId);
+      return onlineTabletsForTable;
+    } catch (TTransportException e) {
+      log.error("Unable to connect to " + tserver + ": " + e);
+    } finally {
+      ThriftUtil.returnClient(client);
+    }
+    return null;
+  }
+  
+  /**
+   * Utility to ensure that the migrations from balance() are consistent:
+   * <ul>
+   * <li>Tablet objects are not null
+   * <li>Source and destination tablet servers are not null and current
+   * </ul>
+   * 
+   * @param current
+   * @param migrations
+   * @return A list of TabletMigration object that passed sanity checks.
+   */
+  public static List<TabletMigration> checkMigrationSanity(Set<TServerInstance> current, List<TabletMigration> migrations) {
+    List<TabletMigration> result = new ArrayList<TabletMigration>(migrations.size());
+    for (TabletMigration m : migrations) {
+      if (m.tablet == null) {
+        log.warn("Balancer gave back a null tablet " + m);
+        continue;
+      }
+      if (m.newServer == null) {
+        log.warn("Balancer did not set the destination " + m);
+        continue;
+      }
+      if (m.oldServer == null) {
+        log.warn("Balancer did not set the source " + m);
+        continue;
+      }
+      if (!current.contains(m.oldServer)) {
+        log.warn("Balancer wants to move a tablet from a server that is not current: " + m);
+        continue;
+      }
+      if (!current.contains(m.newServer)) {
+        log.warn("Balancer wants to move a tablet to a server that is not current: " + m);
+        continue;
+      }
+      result.add(m);
+    }
+    return result;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/recovery/HadoopLogCloser.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/recovery/HadoopLogCloser.java b/server/master/src/main/java/org/apache/accumulo/master/recovery/HadoopLogCloser.java
new file mode 100644
index 0000000..28c412c
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/HadoopLogCloser.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.recovery;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.log4j.Logger;
+
+public class HadoopLogCloser implements LogCloser {
+  
+  private static Logger log = Logger.getLogger(HadoopLogCloser.class);
+
+  @Override
+  public long close(Master master, VolumeManager fs, Path source) throws IOException {
+    FileSystem ns = fs.getFileSystemByPath(source);
+    if (ns instanceof DistributedFileSystem) {
+      DistributedFileSystem dfs = (DistributedFileSystem) ns;
+      try {
+        if (!dfs.recoverLease(source)) {
+          log.info("Waiting for file to be closed " + source.toString());
+          return master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_LEASE_RECOVERY_WAITING_PERIOD);
+        }
+        log.info("Recovered lease on " + source.toString());
+      } catch (FileNotFoundException ex) {
+        throw ex;
+      } catch (Exception ex) {
+        log.warn("Error recovery lease on " + source.toString(), ex);
+        ns.append(source).close();
+        log.info("Recovered lease on " + source.toString() + " using append");
+      }
+    } else if (ns instanceof LocalFileSystem) {
+      // ignore
+    } else {
+      throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+    }
+    return 0;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/recovery/LogCloser.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/recovery/LogCloser.java b/server/master/src/main/java/org/apache/accumulo/master/recovery/LogCloser.java
new file mode 100644
index 0000000..45a4d66
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/LogCloser.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.Path;
+
+public interface LogCloser {
+  public long close(Master master, VolumeManager fs, Path path) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/recovery/MapRLogCloser.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/recovery/MapRLogCloser.java b/server/master/src/main/java/org/apache/accumulo/master/recovery/MapRLogCloser.java
new file mode 100644
index 0000000..9eeec80
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/MapRLogCloser.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.log4j.Logger;
+
+public class MapRLogCloser implements LogCloser {
+  
+  private static Logger log = Logger.getLogger(MapRLogCloser.class);
+  
+  @Override
+  public long close(Master m, VolumeManager fs, Path path) throws IOException {
+    log.info("Recovering file " + path.toString() + " by changing permission to readonly");
+    FileSystem ns = fs.getFileSystemByPath(path);
+    FsPermission roPerm = new FsPermission((short) 0444);
+    try {
+      ns.setPermission(path, roPerm);
+      return 0;
+    } catch (IOException ex) {
+      log.error("error recovering lease ", ex);
+      // lets do this again
+      return 1000;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
new file mode 100644
index 0000000..454cf42
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.recovery;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+public class RecoveryManager {
+  
+  private static Logger log = Logger.getLogger(RecoveryManager.class);
+  
+  private Map<String,Long> recoveryDelay = new HashMap<String,Long>();
+  private Set<String> closeTasksQueued = new HashSet<String>();
+  private Set<String> sortsQueued = new HashSet<String>();
+  private ScheduledExecutorService executor;
+  private Master master;
+  private ZooCache zooCache;
+  
+  public RecoveryManager(Master master) {
+    this.master = master;
+    executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter "));
+    zooCache = new ZooCache();
+    try {
+      List<String> workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).getWorkQueued();
+      sortsQueued.addAll(workIDs);
+    } catch (Exception e) {
+      log.warn(e, e);
+    }
+  }
+  
+  private class LogSortTask implements Runnable {
+    private String source;
+    private String destination;
+    private String sortId;
+    private LogCloser closer;
+    
+    public LogSortTask(LogCloser closer, String source, String destination, String sortId) {
+      this.closer = closer;
+      this.source = source;
+      this.destination = destination;
+      this.sortId = sortId;
+    }
+    
+    @Override
+    public void run() {
+      boolean rescheduled = false;
+      try {
+        
+        long time = closer.close(master, master.getFileSystem(), new Path(source));
+        
+        if (time > 0) {
+          executor.schedule(this, time, TimeUnit.MILLISECONDS);
+          rescheduled = true;
+        } else {
+          initiateSort(sortId, source, destination);
+        }
+      } catch (FileNotFoundException e) {
+        log.debug("Unable to initate log sort for " + source + ": " + e);
+      } catch (Exception e) {
+        log.warn("Failed to initiate log sort " + source, e);
+      } finally {
+        if (!rescheduled) {
+          synchronized (RecoveryManager.this) {
+            closeTasksQueued.remove(sortId);
+          }
+        }
+      }
+    }
+    
+  }
+  
+  private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException {
+    String work = source + "|" + destination;
+    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes());
+    
+    synchronized (this) {
+      sortsQueued.add(sortId);
+    }
+    
+    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
+    log.info("Created zookeeper entry " + path + " with data " + work);
+  }
+  
+  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
+    boolean recoveryNeeded = false;
+    ;
+    for (Collection<String> logs : walogs) {
+      for (String walog : logs) {
+        String hostFilename[] = walog.split("/", 2);
+        String host = hostFilename[0];
+        String filename = hostFilename[1];
+        String parts[] = filename.split("/");
+        String sortId = parts[parts.length - 1];
+        String dest = master.getFileSystem().choose(ServerConstants.getRecoveryDirs()) + "/" + sortId;
+        log.debug("Recovering " + filename + " to " + dest);
+        
+        boolean sortQueued;
+        synchronized (this) {
+          sortQueued = sortsQueued.contains(sortId);
+        }
+        
+        if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) {
+          synchronized (this) {
+            sortsQueued.remove(sortId);
+          }
+        }
+        
+        if (master.getFileSystem().exists(new Path(dest, "finished"))) {
+          synchronized (this) {
+            closeTasksQueued.remove(sortId);
+            recoveryDelay.remove(sortId);
+            sortsQueued.remove(sortId);
+          }
+          continue;
+        }
+        
+        recoveryNeeded = true;
+        synchronized (this) {
+          if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) {
+            AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
+            LogCloser closer = aconf.instantiateClassProperty(Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class, new HadoopLogCloser());
+            Long delay = recoveryDelay.get(sortId);
+            if (delay == null) {
+              delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
+            } else {
+              delay = Math.min(2 * delay, 1000 * 60 * 5l);
+            }
+            
+            log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
+            
+            executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
+            closeTasksQueued.add(sortId);
+            recoveryDelay.put(sortId, delay);
+          }
+        }
+      }
+    }
+    return recoveryNeeded;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/Assignment.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/Assignment.java b/server/master/src/main/java/org/apache/accumulo/master/state/Assignment.java
new file mode 100644
index 0000000..efda721
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/Assignment.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.master.state.TServerInstance;
+
+public class Assignment {
+  public KeyExtent tablet;
+  public TServerInstance server;
+  
+  public Assignment(KeyExtent tablet, TServerInstance server) {
+    this.tablet = tablet;
+    this.server = server;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/DistributedStore.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/DistributedStore.java b/server/master/src/main/java/org/apache/accumulo/master/state/DistributedStore.java
new file mode 100644
index 0000000..786f342
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/DistributedStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+import java.util.List;
+
+/*
+ * An abstract version of ZooKeeper that we can write tests against.
+ */
+public interface DistributedStore {
+  
+  public List<String> getChildren(String path) throws DistributedStoreException;
+  
+  public byte[] get(String path) throws DistributedStoreException;
+  
+  public void put(String path, byte[] bs) throws DistributedStoreException;
+  
+  public void remove(String path) throws DistributedStoreException;
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/state/DistributedStoreException.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/DistributedStoreException.java b/server/master/src/main/java/org/apache/accumulo/master/state/DistributedStoreException.java
new file mode 100644
index 0000000..42652f8
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/DistributedStoreException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.state;
+
+public class DistributedStoreException extends Exception {
+  
+  private static final long serialVersionUID = 1L;
+  
+  public DistributedStoreException(String why) {
+    super(why);
+  }
+  
+  public DistributedStoreException(Exception cause) {
+    super(cause);
+  }
+  
+  public DistributedStoreException(String why, Exception cause) {
+    super(why, cause);
+  }
+}


Mime
View raw message