accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [52/59] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars
Date Sat, 07 Sep 2013 03:28:55 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
index 765b83a..98bed54 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
@@ -25,9 +25,9 @@ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.EventCoordinator.Listener;
-import org.apache.accumulo.master.LiveTServerSet.TServerConnection;
-import org.apache.accumulo.master.state.TServerInstance;
 import org.apache.accumulo.master.tableOps.MasterRepo;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
index 20e4291..1580f70 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
@@ -39,15 +39,15 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.master.state.Assignment;
-import org.apache.accumulo.master.state.CurrentState;
-import org.apache.accumulo.master.state.MergeInfo;
-import org.apache.accumulo.master.state.MergeState;
 import org.apache.accumulo.master.state.MergeStats;
-import org.apache.accumulo.master.state.MetaDataStateStore;
-import org.apache.accumulo.master.state.TServerInstance;
-import org.apache.accumulo.master.state.TabletLocationState;
-import org.apache.accumulo.master.state.TabletState;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/master/src/test/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancerTest.java b/server/master/src/test/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancerTest.java
index 7f4b306..7a7dfa2 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancerTest.java
@@ -36,8 +36,8 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.master.balancer.ChaoticLoadBalancer;
-import org.apache.accumulo.master.state.TServerInstance;
 import org.apache.accumulo.master.state.TabletMigration;
+import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/master/src/test/java/org/apache/accumulo/master/balancer/DefaultLoadBalancerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/balancer/DefaultLoadBalancerTest.java b/server/master/src/test/java/org/apache/accumulo/master/balancer/DefaultLoadBalancerTest.java
index d005436..a9abb06 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/balancer/DefaultLoadBalancerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/balancer/DefaultLoadBalancerTest.java
@@ -36,8 +36,8 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.master.balancer.DefaultLoadBalancer;
-import org.apache.accumulo.master.state.TServerInstance;
 import org.apache.accumulo.master.state.TabletMigration;
+import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/master/src/test/java/org/apache/accumulo/master/balancer/TableLoadBalancerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/balancer/TableLoadBalancerTest.java b/server/master/src/test/java/org/apache/accumulo/master/balancer/TableLoadBalancerTest.java
index 653e1aa..749a0aa 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/balancer/TableLoadBalancerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/balancer/TableLoadBalancerTest.java
@@ -35,8 +35,8 @@ import org.apache.accumulo.core.master.thrift.TableInfo;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.master.state.TServerInstance;
 import org.apache.accumulo.master.state.TabletMigration;
+import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java
index 85ee5ec..d7fc619 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java
@@ -17,8 +17,8 @@
 package org.apache.accumulo.master.state;
 
 import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.master.state.MergeInfo;
-import org.apache.accumulo.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -34,9 +34,9 @@ public class MergeInfoTest {
     in.reset(buffer.getData(), 0, buffer.getLength());
     MergeInfo info2 = new MergeInfo();
     info2.readFields(in);
-    Assert.assertEquals(info.extent, info2.extent);
-    Assert.assertEquals(info.state, info2.state);
-    Assert.assertEquals(info.operation, info2.operation);
+    Assert.assertEquals(info.getExtent(), info2.getExtent());
+    Assert.assertEquals(info.getState(), info2.getState());
+    Assert.assertEquals(info.getOperation(), info2.getOperation());
     return info2;
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
index 158e948..0fe86e5 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
@@ -30,13 +30,13 @@ import java.util.List;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.master.state.Assignment;
-import org.apache.accumulo.master.state.DistributedStore;
-import org.apache.accumulo.master.state.DistributedStoreException;
-import org.apache.accumulo.master.state.TServerInstance;
-import org.apache.accumulo.master.state.TabletLocationState;
-import org.apache.accumulo.master.state.ZooTabletStateStore;
-import org.apache.accumulo.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.DistributedStore;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.ZooTabletStateStore;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java
index 4581d96..17c6a80 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java
@@ -25,9 +25,9 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.master.state.DeadServerList;
 import org.apache.accumulo.monitor.LogService;
 import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.master.state.DeadServerList;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TServersServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TServersServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TServersServlet.java
index 48b1feb..abe27b6 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TServersServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TServersServlet.java
@@ -38,7 +38,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.Duration;
 import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.master.state.TabletServerState;
 import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.monitor.util.Table;
 import org.apache.accumulo.monitor.util.TableRow;
@@ -50,6 +49,7 @@ import org.apache.accumulo.monitor.util.celltypes.PercentageType;
 import org.apache.accumulo.monitor.util.celltypes.ProgressChartType;
 import org.apache.accumulo.monitor.util.celltypes.TServerLinkType;
 import org.apache.accumulo.monitor.util.celltypes.TableLinkType;
+import org.apache.accumulo.server.master.state.TabletServerState;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.util.ActionStatsUpdator;
 import org.apache.accumulo.server.util.TableInfoUtil;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java
index 4efedd8..428880e 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java
@@ -35,8 +35,6 @@ import org.apache.accumulo.core.master.thrift.TableInfo;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.master.state.MetaDataTableScanner;
-import org.apache.accumulo.master.state.TabletLocationState;
 import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.monitor.util.Table;
 import org.apache.accumulo.monitor.util.TableRow;
@@ -46,6 +44,8 @@ import org.apache.accumulo.monitor.util.celltypes.NumberType;
 import org.apache.accumulo.monitor.util.celltypes.TableLinkType;
 import org.apache.accumulo.monitor.util.celltypes.TableStateType;
 import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TabletLocationState;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.tables.TableManager;
 import org.apache.accumulo.server.util.TableInfoUtil;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java
index e9c4bd3..0cbf957 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java
@@ -30,10 +30,10 @@ import org.apache.accumulo.core.master.thrift.Compacting;
 import org.apache.accumulo.core.master.thrift.DeadServer;
 import org.apache.accumulo.core.master.thrift.TableInfo;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.master.state.TabletServerState;
 import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.monitor.util.celltypes.TServerLinkType;
 import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.master.state.TabletServerState;
 import org.apache.accumulo.server.util.TableInfoUtil;
 
 public class XMLServlet extends BasicServlet {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/server/src/main/java/org/apache/accumulo/server/util/Admin.java
----------------------------------------------------------------------
diff --git a/server/server/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/server/src/main/java/org/apache/accumulo/server/util/Admin.java
deleted file mode 100644
index 8c797cd..0000000
--- a/server/server/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.InstanceOperations;
-import org.apache.accumulo.core.client.impl.ClientExec;
-import org.apache.accumulo.core.client.impl.MasterClient;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.master.thrift.MasterClientService;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.server.cli.ClientOpts;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.trace.instrument.Tracer;
-import org.apache.log4j.Logger;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-
-public class Admin {
-  private static final Logger log = Logger.getLogger(Admin.class);
-  
-  static class AdminOpts extends ClientOpts {
-    @Parameter(names = {"-f", "--force"}, description = "force the given server to stop by removing its lock")
-    boolean force = false;
-  }
-  
-  @Parameters(commandDescription = "stop the tablet server on the given hosts")
-  static class StopCommand {
-    @Parameter(description = "<host> {<host> ... }")
-    List<String> args = new ArrayList<String>();
-  }
-  
-  @Parameters(commandDescription = "Ping tablet servers.  If no arguments, pings all.")
-  static class PingCommand {
-    @Parameter(description = "{<host> ... }")
-    List<String> args = new ArrayList<String>();
-  }
-  
-  @Parameters(commandDescription = "print tablets that are offline in online tables")
-  static class CheckTabletsCommand {
-    @Parameter(names = "--fixFiles", description = "Remove dangling file pointers")
-    boolean fixFiles = false;
-    
-    @Parameter(names = {"-t", "--table"}, description = "Table to check, if not set checks all tables")
-    String table = null;
-  }
-
-  @Parameters(commandDescription = "stop the master")
-  static class StopMasterCommand {}
-  
-  @Parameters(commandDescription = "stop all the servers")
-  static class StopAllCommand {}
-  
-  @Parameters(commandDescription = "list Accumulo instances in zookeeper")
-  static class ListInstancesCommand {
-    @Parameter(names = "--print-errors", description = "display errors while listing instances")
-    boolean printErrors = false;
-    @Parameter(names = "--print-all", description = "print information for all instances, not just those with names")
-    boolean printAll = false;
-  }
-
-  public static void main(String[] args) {
-    boolean everything;
-    
-    AdminOpts opts = new AdminOpts();
-    JCommander cl = new JCommander(opts);
-    cl.setProgramName(Admin.class.getName());
-
-    CheckTabletsCommand checkTabletsCommand = new CheckTabletsCommand();
-    cl.addCommand("checkTablets", checkTabletsCommand);
-
-    ListInstancesCommand listIntancesOpts = new ListInstancesCommand();
-    cl.addCommand("listInstances", listIntancesOpts);
-    
-    PingCommand pingCommand = new PingCommand();
-    cl.addCommand("ping", pingCommand);
-
-    StopCommand stopOpts = new StopCommand();
-    cl.addCommand("stop", stopOpts);
-    StopAllCommand stopAllOpts = new StopAllCommand();
-    cl.addCommand("stopAll", stopAllOpts);
-    StopMasterCommand stopMasterOpts = new StopMasterCommand();
-    cl.addCommand("stopMaster", stopMasterOpts);
-    cl.parse(args);
-    
-    if (opts.help || cl.getParsedCommand() == null) {
-      cl.usage();
-      return;
-    }
-    Instance instance = opts.getInstance();
-    
-    try {
-      String principal;
-      AuthenticationToken token;
-      if (opts.getToken() == null) {
-        principal = SystemCredentials.get().getPrincipal();
-        token = SystemCredentials.get().getToken();
-      } else {
-        principal = opts.principal;
-        token = opts.getToken();
-      }
-      
-      int rc = 0;
-
-      if (cl.getParsedCommand().equals("listInstances")) {
-        ListInstances.listInstances(instance.getZooKeepers(), listIntancesOpts.printAll, listIntancesOpts.printErrors);
-      } else if (cl.getParsedCommand().equals("ping")) {
-        if (ping(instance, principal, token, pingCommand.args) != 0)
-          rc = 4;
-      } else if (cl.getParsedCommand().equals("checkTablets")) {
-        System.out.println("\n*** Looking for offline tablets ***\n");
-        if (FindOfflineTablets.findOffline(instance, new Credentials(principal, token), checkTabletsCommand.table) != 0)
-          rc = 5;
-        System.out.println("\n*** Looking for missing files ***\n");
-        if (checkTabletsCommand.table == null) {
-          if (RemoveEntriesForMissingFiles.checkAllTables(instance, principal, token, checkTabletsCommand.fixFiles) != 0)
-            rc = 6;
-        } else {
-          if (RemoveEntriesForMissingFiles.checkTable(instance, principal, token, checkTabletsCommand.table, checkTabletsCommand.fixFiles) != 0)
-            rc = 6;
-        }
-
-      }else if (cl.getParsedCommand().equals("stop")) {
-        stopTabletServer(instance, new Credentials(principal, token), stopOpts.args, opts.force);
-      } else {
-        everything = cl.getParsedCommand().equals("stopAll");
-        
-        if (everything)
-          flushAll(instance, principal, token);
-        
-        stopServer(instance, new Credentials(principal, token), everything);
-      }
-      
-      if (rc != 0)
-        System.exit(rc);
-    } catch (AccumuloException e) {
-      log.error(e, e);
-      System.exit(1);
-    } catch (AccumuloSecurityException e) {
-      log.error(e, e);
-      System.exit(2);
-    } catch (Exception e) {
-      log.error(e, e);
-      System.exit(3);
-    }
-  }
-  
-  private static int ping(Instance instance, String principal, AuthenticationToken token, List<String> args) throws AccumuloException,
-      AccumuloSecurityException {
-    
-    InstanceOperations io = instance.getConnector(principal, token).instanceOperations();
-    
-    if (args.size() == 0) {
-      args = io.getTabletServers();
-    }
-    
-    int unreachable = 0;
-
-    for (String tserver : args) {
-      try {
-        io.ping(tserver);
-        System.out.println(tserver + " OK");
-      } catch (AccumuloException ae) {
-        System.out.println(tserver + " FAILED (" + ae.getMessage() + ")");
-        unreachable++;
-      }
-    }
-    
-    System.out.printf("\n%d of %d tablet servers unreachable\n\n", unreachable, args.size());
-    return unreachable;
-  }
-
-  /**
-   * flushing during shutdown is a performance optimization, its not required. The method will make an attempt to initiate flushes of all tables and give up if
-   * it takes too long.
-   * 
-   */
-  private static void flushAll(final Instance instance, final String principal, final AuthenticationToken token) throws AccumuloException,
-      AccumuloSecurityException {
-    
-    final AtomicInteger flushesStarted = new AtomicInteger(0);
-    
-    Runnable flushTask = new Runnable() {
-      
-      @Override
-      public void run() {
-        try {
-          Connector conn = instance.getConnector(principal, token);
-          Set<String> tables = conn.tableOperations().tableIdMap().keySet();
-          for (String table : tables) {
-            if (table.equals(MetadataTable.NAME))
-              continue;
-            try {
-              conn.tableOperations().flush(table, null, null, false);
-              flushesStarted.incrementAndGet();
-            } catch (TableNotFoundException e) {}
-          }
-        } catch (Exception e) {
-          log.warn("Failed to intiate flush " + e.getMessage());
-        }
-      }
-    };
-    
-    Thread flusher = new Thread(flushTask);
-    flusher.setDaemon(true);
-    flusher.start();
-    
-    long start = System.currentTimeMillis();
-    try {
-      flusher.join(3000);
-    } catch (InterruptedException e) {}
-    
-    while (flusher.isAlive() && System.currentTimeMillis() - start < 15000) {
-      int flushCount = flushesStarted.get();
-      try {
-        flusher.join(1000);
-      } catch (InterruptedException e) {}
-      
-      if (flushCount == flushesStarted.get()) {
-        // no progress was made while waiting for join... maybe its stuck, stop waiting on it
-        break;
-      }
-    }
-  }
-  
-  private static void stopServer(final Instance instance, final Credentials credentials, final boolean tabletServersToo) throws AccumuloException,
-      AccumuloSecurityException {
-    MasterClient.execute(instance, new ClientExec<MasterClientService.Client>() {
-      @Override
-      public void execute(MasterClientService.Client client) throws Exception {
-        client.shutdown(Tracer.traceInfo(), credentials.toThrift(instance), tabletServersToo);
-      }
-    });
-  }
-  
-  private static void stopTabletServer(final Instance instance, final Credentials creds, List<String> servers, final boolean force) throws AccumuloException,
-      AccumuloSecurityException {
-    for (String server : servers) {
-      InetSocketAddress address = AddressUtil.parseAddress(server);
-      final String finalServer = org.apache.accumulo.core.util.AddressUtil.toString(address);
-      log.info("Stopping server " + finalServer);
-      MasterClient.execute(instance, new ClientExec<MasterClientService.Client>() {
-        @Override
-        public void execute(MasterClientService.Client client) throws Exception {
-          client.shutdownTabletServer(Tracer.traceInfo(), creds.toThrift(instance), finalServer, force);
-        }
-      });
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java
deleted file mode 100644
index 44b26ca..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.log4j.Logger;
-
-/**
- * 
- */
-public class CompactionWatcher implements Runnable {
-  private Map<List<Long>,ObservedCompactionInfo> observedCompactions = new HashMap<List<Long>,ObservedCompactionInfo>();
-  private AccumuloConfiguration config;
-  private static boolean watching = false;
-  
-  private static class ObservedCompactionInfo {
-    CompactionInfo compactionInfo;
-    long firstSeen;
-    boolean loggedWarning;
-    
-    ObservedCompactionInfo(CompactionInfo ci, long time) {
-      this.compactionInfo = ci;
-      this.firstSeen = time;
-    }
-  }
-
-  public CompactionWatcher(AccumuloConfiguration config) {
-    this.config = config;
-  }
-
-  public void run() {
-    List<CompactionInfo> runningCompactions = Compactor.getRunningCompactions();
-    
-    Set<List<Long>> newKeys = new HashSet<List<Long>>();
-    
-    long time = System.currentTimeMillis();
-
-    for (CompactionInfo ci : runningCompactions) {
-      List<Long> compactionKey = Arrays.asList(ci.getID(), ci.getEntriesRead(), ci.getEntriesWritten());
-      newKeys.add(compactionKey);
-      
-      if (!observedCompactions.containsKey(compactionKey)) {
-        observedCompactions.put(compactionKey, new ObservedCompactionInfo(ci, time));
-      }
-    }
-    
-    // look for compactions that finished or made progress and logged a warning
-    HashMap<List<Long>,ObservedCompactionInfo> copy = new HashMap<List<Long>,ObservedCompactionInfo>(observedCompactions);
-    copy.keySet().removeAll(newKeys);
-    
-    for (ObservedCompactionInfo oci : copy.values()) {
-      if (oci.loggedWarning) {
-        Logger.getLogger(CompactionWatcher.class).info("Compaction of " + oci.compactionInfo.getExtent() + " is no longer stuck");
-      }
-    }
-
-    // remove any compaction that completed or made progress
-    observedCompactions.keySet().retainAll(newKeys);
-    
-    long warnTime = config.getTimeInMillis(Property.TSERV_COMPACTION_WARN_TIME);
-
-    // check for stuck compactions
-    for (ObservedCompactionInfo oci : observedCompactions.values()) {
-      if (time - oci.firstSeen > warnTime && !oci.loggedWarning) {
-        Thread compactionThread = oci.compactionInfo.getThread();
-        if (compactionThread != null) {
-          StackTraceElement[] trace = compactionThread.getStackTrace();
-          Exception e = new Exception("Possible stack trace of compaction stuck on " + oci.compactionInfo.getExtent());
-          e.setStackTrace(trace);
-          Logger.getLogger(CompactionWatcher.class).warn(
-              "Compaction of " + oci.compactionInfo.getExtent() + " has not made progress for at least " + (time - oci.firstSeen) + "ms", e);
-          oci.loggedWarning = true;
-        }
-      }
-    }
-  }
-
-  public static synchronized void startWatching(AccumuloConfiguration config) {
-    if (!watching) {
-      SimpleTimer.getInstance().schedule(new CompactionWatcher(config), 10000, 10000);
-      watching = true;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
deleted file mode 100644
index c543881..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.trace.TraceFileSystem;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-/**
- * Copy failed bulk imports.
- */
-public class BulkFailedCopyProcessor implements Processor {
-  
-  private static final Logger log = Logger.getLogger(BulkFailedCopyProcessor.class);
-  
-  @Override
-  public Processor newProcessor() {
-    return new BulkFailedCopyProcessor();
-  }
-  
-  @Override
-  public void process(String workID, byte[] data) {
-    
-    String paths[] = new String(data).split(",");
-    
-    Path orig = new Path(paths[0]);
-    Path dest = new Path(paths[1]);
-    Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp");
-    
-    try {
-      FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
-          ServerConfiguration.getSiteConfiguration()));
-      
-      FileUtil.copy(fs, orig, fs, tmp, false, true, CachedConfiguration.getInstance());
-      fs.rename(tmp, dest);
-      log.debug("copied " + orig + " to " + dest);
-    } catch (IOException ex) {
-      try {
-        FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
-            ServerConfiguration.getSiteConfiguration()));
-        
-        fs.create(dest).close();
-        log.warn(" marked " + dest + " failed", ex);
-      } catch (IOException e) {
-        log.error("Unable to create failure flag file " + dest, e);
-      }
-    }
-
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java
deleted file mode 100644
index f5f21e4..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.AbstractQueue;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class CompactionQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
-  
-  private List<Comparable> task = new LinkedList<Comparable>();
-  
-  @Override
-  public synchronized Runnable poll() {
-    if (task.size() == 0)
-      return null;
-    
-    Comparable min = Collections.min(task);
-    task.remove(min);
-    return (Runnable) min;
-  }
-  
-  @Override
-  public synchronized Runnable peek() {
-    if (task.size() == 0)
-      return null;
-    
-    Comparable min = Collections.min(task);
-    return (Runnable) min;
-  }
-  
-  @Override
-  public synchronized boolean offer(Runnable e) {
-    task.add((Comparable) e);
-    notify();
-    return true;
-  }
-  
-  @Override
-  public synchronized void put(Runnable e) throws InterruptedException {
-    task.add((Comparable) e);
-    notify();
-  }
-  
-  @Override
-  public synchronized boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException {
-    task.add((Comparable) e);
-    notify();
-    return true;
-  }
-  
-  @Override
-  public synchronized Runnable take() throws InterruptedException {
-    while (task.size() == 0) {
-      wait();
-    }
-    
-    return poll();
-  }
-  
-  @Override
-  public synchronized Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
-    if (task.size() == 0) {
-      wait(unit.toMillis(timeout));
-    }
-    
-    if (task.size() == 0)
-      return null;
-    
-    return poll();
-  }
-  
-  @Override
-  public synchronized int remainingCapacity() {
-    return Integer.MAX_VALUE;
-  }
-  
-  @Override
-  public synchronized int drainTo(Collection<? super Runnable> c) {
-    return drainTo(c, task.size());
-  }
-  
-  @Override
-  public synchronized int drainTo(Collection<? super Runnable> c, int maxElements) {
-    Collections.sort(task);
-    
-    int num = Math.min(task.size(), maxElements);
-    
-    Iterator<Comparable> iter = task.iterator();
-    for (int i = 0; i < num; i++) {
-      c.add((Runnable) iter.next());
-      iter.remove();
-    }
-    
-    return num;
-  }
-  
-  @Override
-  public synchronized Iterator<Runnable> iterator() {
-    Collections.sort(task);
-    
-    final Iterator<Comparable> iter = task.iterator();
-    
-    return new Iterator<Runnable>() {
-      
-      @Override
-      public boolean hasNext() {
-        return iter.hasNext();
-      }
-      
-      @Override
-      public Runnable next() {
-        return (Runnable) iter.next();
-      }
-      
-      @Override
-      public void remove() {
-        iter.remove();
-      }
-    };
-  }
-  
-  @Override
-  public synchronized int size() {
-    return task.size();
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java
deleted file mode 100644
index 086c5cd..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-public class CompactionStats {
-  private long entriesRead;
-  private long entriesWritten;
-  private long fileSize;
-  
-  CompactionStats(long er, long ew) {
-    this.setEntriesRead(er);
-    this.setEntriesWritten(ew);
-  }
-  
-  public CompactionStats() {}
-  
-  private void setEntriesRead(long entriesRead) {
-    this.entriesRead = entriesRead;
-  }
-  
-  public long getEntriesRead() {
-    return entriesRead;
-  }
-  
-  private void setEntriesWritten(long entriesWritten) {
-    this.entriesWritten = entriesWritten;
-  }
-  
-  public long getEntriesWritten() {
-    return entriesWritten;
-  }
-  
-  public void add(CompactionStats mcs) {
-    this.entriesRead += mcs.entriesRead;
-    this.entriesWritten += mcs.entriesWritten;
-  }
-  
-  public void setFileSize(long fileSize) {
-    this.fileSize = fileSize;
-  }
-  
-  public long getFileSize() {
-    return this.fileSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
deleted file mode 100644
index 8001cad..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.thrift.IterInfo;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.IteratorUtil;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.WrappingIterator;
-import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.system.DeletingIterator;
-import org.apache.accumulo.core.iterators.system.MultiIterator;
-import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
-import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
-import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
-import org.apache.accumulo.core.util.LocalityGroupUtil;
-import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
-import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.problems.ProblemReport;
-import org.apache.accumulo.server.problems.ProblemReportingIterator;
-import org.apache.accumulo.server.problems.ProblemReports;
-import org.apache.accumulo.server.problems.ProblemType;
-import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
-import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
-
-
-public class Compactor implements Callable<CompactionStats> {
-  
-  public static class CountingIterator extends WrappingIterator {
-    
-    private long count;
-    private ArrayList<CountingIterator> deepCopies;
-    private AtomicLong entriesRead;
-    
-    public CountingIterator deepCopy(IteratorEnvironment env) {
-      return new CountingIterator(this, env);
-    }
-    
-    private CountingIterator(CountingIterator other, IteratorEnvironment env) {
-      setSource(other.getSource().deepCopy(env));
-      count = 0;
-      this.deepCopies = other.deepCopies;
-      this.entriesRead = other.entriesRead;
-      deepCopies.add(this);
-    }
-    
-    public CountingIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong entriesRead) {
-      deepCopies = new ArrayList<Compactor.CountingIterator>();
-      this.setSource(source);
-      count = 0;
-      this.entriesRead = entriesRead;
-    }
-    
-    @Override
-    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) {
-      throw new UnsupportedOperationException();
-    }
-    
-    @Override
-    public void next() throws IOException {
-      super.next();
-      count++;
-      if (count % 1024 == 0) {
-        entriesRead.addAndGet(1024);
-      }
-    }
-    
-    public long getCount() {
-      long sum = 0;
-      for (CountingIterator dc : deepCopies) {
-        sum += dc.count;
-      }
-      
-      return count + sum;
-    }
-  }
-
-  private static final Logger log = Logger.getLogger(Compactor.class);
-  
-  static class CompactionCanceledException extends Exception {
-    private static final long serialVersionUID = 1L;
-  }
-  
-  static interface CompactionEnv {
-    boolean isCompactionEnabled();
-    
-    IteratorScope getIteratorScope();
-  }
-  
-  private Map<FileRef,DataFileValue> filesToCompact;
-  private InMemoryMap imm;
-  private FileRef outputFile;
-  private boolean propogateDeletes;
-  private TableConfiguration acuTableConf;
-  private CompactionEnv env;
-  private Configuration conf;
-  private VolumeManager fs;
-  protected KeyExtent extent;
-  private List<IteratorSetting> iterators;
-  
-  // things to report
-  private String currentLocalityGroup = "";
-  private long startTime;
-
-  private MajorCompactionReason reason;
-  protected MinorCompactionReason mincReason;
-  
-  private AtomicLong entriesRead = new AtomicLong(0);
-  private AtomicLong entriesWritten = new AtomicLong(0);
-  private DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
-  
-  private static AtomicLong nextCompactorID = new AtomicLong(0);
-  
-  // a unique id to identify a compactor
-  private long compactorID = nextCompactorID.getAndIncrement();
-
-  protected volatile Thread thread;
-
-  private synchronized void setLocalityGroup(String name) {
-    this.currentLocalityGroup = name;
-  }
-
-  private void clearStats() {
-    entriesRead.set(0);
-    entriesWritten.set(0);
-  }
-
-  protected static Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>());
-  
-  public static class CompactionInfo {
-    
-    private Compactor compactor;
-    private String localityGroup;
-    private long entriesRead;
-    private long entriesWritten;
-    
-    CompactionInfo(Compactor compactor) {
-      this.localityGroup = compactor.currentLocalityGroup;
-      this.entriesRead = compactor.entriesRead.get();
-      this.entriesWritten = compactor.entriesWritten.get();
-      this.compactor = compactor;
-    }
-
-    public long getID() {
-      return compactor.compactorID;
-    }
-    
-    public KeyExtent getExtent() {
-      return compactor.getExtent();
-    }
-    
-    public long getEntriesRead() {
-      return entriesRead;
-    }
-    
-    public long getEntriesWritten() {
-      return entriesWritten;
-    }
-
-    public Thread getThread() {
-      return compactor.thread;
-    }
-
-    public ActiveCompaction toThrift() {
-      
-      CompactionType type;
-      
-      if (compactor.imm != null)
-        if (compactor.filesToCompact.size() > 0)
-          type = CompactionType.MERGE;
-        else
-          type = CompactionType.MINOR;
-      else if (!compactor.propogateDeletes)
-        type = CompactionType.FULL;
-      else
-        type = CompactionType.MAJOR;
-      
-      CompactionReason reason;
-      
-      if (compactor.imm != null)
-        switch(compactor.mincReason){
-          case USER:
-            reason = CompactionReason.USER;
-            break;
-          case CLOSE:
-            reason = CompactionReason.CLOSE;
-            break;
-          case SYSTEM:
-          default:
-            reason = CompactionReason.SYSTEM;
-            break;
-        }
-      else
-        switch (compactor.reason) {
-          case USER:
-            reason = CompactionReason.USER;
-            break;
-          case CHOP:
-            reason = CompactionReason.CHOP;
-            break;
-          case IDLE:
-            reason = CompactionReason.IDLE;
-            break;
-          case NORMAL:
-          default:
-            reason = CompactionReason.SYSTEM;
-            break;
-        }
-      
-      List<IterInfo> iiList = new ArrayList<IterInfo>();
-      Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>();
-      
-      for (IteratorSetting iterSetting : compactor.iterators) {
-        iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
-        iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
-      }
-      List<String> filesToCompact = new ArrayList<String>();
-      for (FileRef ref : compactor.filesToCompact.keySet())
-        filesToCompact.add(ref.toString());
-      return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, filesToCompact, compactor.outputFile.toString(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
-    }
-  }
-  
-  public static List<CompactionInfo> getRunningCompactions() {
-    ArrayList<CompactionInfo> compactions = new ArrayList<Compactor.CompactionInfo>();
-    
-    synchronized (runningCompactions) {
-      for (Compactor compactor : runningCompactions) {
-        compactions.add(new CompactionInfo(compactor));
-      }
-    }
-    
-    return compactions;
-  }
-
-  Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
-      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) {
-    this.extent = extent;
-    this.conf = conf;
-    this.fs = fs;
-    this.filesToCompact = files;
-    this.imm = imm;
-    this.outputFile = outputFile;
-    this.propogateDeletes = propogateDeletes;
-    this.acuTableConf = acuTableConf;
-    this.env = env;
-    this.iterators = iterators;
-    this.reason = reason;
-    
-    startTime = System.currentTimeMillis();
-  }
-  
-  Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
-      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
-    this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null);
-  }
-  
-  public VolumeManager getFileSystem() {
-    return fs;
-  }
-  
-  KeyExtent getExtent() {
-    return extent;
-  }
-  
-  String getOutputFile() {
-    return outputFile.toString();
-  }
-  
-  @Override
-  public CompactionStats call() throws IOException, CompactionCanceledException {
-    
-    FileSKVWriter mfw = null;
-    
-    CompactionStats majCStats = new CompactionStats();
-
-    boolean remove = runningCompactions.add(this);
-    
-    clearStats();
-
-    String oldThreadName = Thread.currentThread().getName();
-    String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile;
-    Thread.currentThread().setName(newThreadName);
-    thread = Thread.currentThread();
-    try {
-      FileOperations fileFactory = FileOperations.getInstance();
-      FileSystem ns = this.fs.getFileSystemByPath(outputFile.path());
-      mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf);
-      
-      Map<String,Set<ByteSequence>> lGroups;
-      try {
-        lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf);
-      } catch (LocalityGroupConfigurationError e) {
-        throw new IOException(e);
-      }
-      
-      long t1 = System.currentTimeMillis();
-      
-      HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>();
-      
-      if (mfw.supportsLocalityGroups()) {
-        for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
-          setLocalityGroup(entry.getKey());
-          compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats);
-          allColumnFamilies.addAll(entry.getValue());
-        }
-      }
-      
-      setLocalityGroup("");
-      compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats);
-      
-      long t2 = System.currentTimeMillis();
-      
-      FileSKVWriter mfwTmp = mfw;
-      mfw = null; // set this to null so we do not try to close it again in finally if the close fails
-      mfwTmp.close(); // if the close fails it will cause the compaction to fail
-      
-      // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
-      try {
-        FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf);
-        openReader.close();
-      } catch (IOException ex) {
-        log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
-        throw ex;
-      }
-      
-      log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
-          majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
-      
-      majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf));
-      return majCStats;
-    } catch (IOException e) {
-      log.error(e, e);
-      throw e;
-    } catch (RuntimeException e) {
-      log.error(e, e);
-      throw e;
-    } finally {
-      Thread.currentThread().setName(oldThreadName);
-      if (remove) {
-        thread = null;
-        runningCompactions.remove(this);
-      }
-
-      try {
-        if (mfw != null) {
-          // compaction must not have finished successfully, so close its output file
-          try {
-            mfw.close();
-          } finally {
-            if (!fs.deleteRecursively(outputFile.path()))
-              if (fs.exists(outputFile.path()))
-                log.error("Unable to delete " + outputFile);
-          }
-        }
-      } catch (IOException e) {
-        log.warn(e, e);
-      }
-    }
-  }
-
-  private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException {
-    
-    List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
-    
-    for (FileRef mapFile : filesToCompact.keySet()) {
-      try {
-        
-        FileOperations fileFactory = FileOperations.getInstance();
-        FileSystem fs = this.fs.getFileSystemByPath(mapFile.path());
-        FileSKVIterator reader;
-        
-        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, conf, acuTableConf);
-        
-        readers.add(reader);
-        
-        SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader);
-        
-        if (filesToCompact.get(mapFile).isTimeSet()) {
-          iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
-        }
-        
-        iters.add(iter);
-        
-      } catch (Throwable e) {
-        
-        ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e));
-        
-        log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
-        // failed to open some map file... close the ones that were opened
-        for (FileSKVIterator reader : readers) {
-          try {
-            reader.close();
-          } catch (Throwable e2) {
-            log.warn("Failed to close map file", e2);
-          }
-        }
-        
-        readers.clear();
-        
-        if (e instanceof IOException)
-          throw (IOException) e;
-        throw new IOException("Failed to open map data files", e);
-      }
-    }
-    
-    return iters;
-  }
-  
-  private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats)
-      throws IOException, CompactionCanceledException {
-    ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size());
-    Span span = Trace.start("compact");
-    try {
-      long entriesCompacted = 0;
-      List<SortedKeyValueIterator<Key,Value>> iters = openMapDataFiles(lgName, readers);
-      
-      if (imm != null) {
-        iters.add(imm.compactionIterator());
-      }
-      
-      CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange()), entriesRead);
-      DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes);
-      ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
-      
-
-      // if(env.getIteratorScope() )
-      
-      TabletIteratorEnvironment iterEnv;
-      if (env.getIteratorScope() == IteratorScope.majc)
-        iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf);
-      else if (env.getIteratorScope() == IteratorScope.minc)
-        iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf);
-      else
-        throw new IllegalArgumentException();
-      
-      SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf,
-          iterators, iterEnv));
-      
-      itr.seek(extent.toDataRange(), columnFamilies, inclusive);
-      
-      if (!inclusive) {
-        mfw.startDefaultLocalityGroup();
-      } else {
-        mfw.startNewLocalityGroup(lgName, columnFamilies);
-      }
-      
-      Span write = Trace.start("write");
-      try {
-        while (itr.hasTop() && env.isCompactionEnabled()) {
-          mfw.append(itr.getTopKey(), itr.getTopValue());
-          itr.next();
-          entriesCompacted++;
-          
-          if (entriesCompacted % 1024 == 0) {
-            // Periodically update stats, do not want to do this too often since its volatile
-            entriesWritten.addAndGet(1024);
-          }
-        }
-
-        if (itr.hasTop() && !env.isCompactionEnabled()) {
-          // cancel major compaction operation
-          try {
-            try {
-              mfw.close();
-            } catch (IOException e) {
-              log.error(e, e);
-            }
-            fs.deleteRecursively(outputFile.path());
-          } catch (Exception e) {
-            log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
-          }
-          throw new CompactionCanceledException();
-        }
-        
-      } finally {
-        CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted);
-        majCStats.add(lgMajcStats);
-        write.stop();
-      }
-      
-    } finally {
-      // close sequence files opened
-      for (FileSKVIterator reader : readers) {
-        try {
-          reader.close();
-        } catch (Throwable e) {
-          log.warn("Failed to close map file", e);
-        }
-      }
-      span.stop();
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
deleted file mode 100644
index 7258a82..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.tserver.data.ServerConditionalMutation;
-import org.apache.hadoop.io.WritableComparator;
-
-/**
- * 
- */
-public class ConditionalMutationSet {
-
-  static interface DeferFilter {
-    void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred);
-  }
-  
-  static class DuplicateFitler implements DeferFilter {
-    public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
-      okMutations.add(scml.get(0));
-      for (int i = 1; i < scml.size(); i++) {
-        if (Arrays.equals(scml.get(i - 1).getRow(), scml.get(i).getRow())) {
-          deferred.add(scml.get(i));
-        } else {
-          okMutations.add(scml.get(i));
-        }
-      }
-    }
-  }
-  
-  static void defer(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferredMutations, DeferFilter filter) {
-    for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
-      List<ServerConditionalMutation> scml = entry.getValue();
-      List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(scml.size());
-      List<ServerConditionalMutation> deferred = new ArrayList<ServerConditionalMutation>();
-      filter.defer(scml, okMutations, deferred);
-      
-      if (deferred.size() > 0) {
-        scml.clear();
-        scml.addAll(okMutations);
-        List<ServerConditionalMutation> l = deferredMutations.get(entry.getKey());
-        if (l == null) {
-          l = deferred;
-          deferredMutations.put(entry.getKey(), l);
-        } else {
-          l.addAll(deferred);
-        }
-
-      }
-    }
-  }
-  
-  static void deferDuplicatesRows(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
-    defer(updates, deferred, new DuplicateFitler());
-  }
-
-  static void sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates) {
-    for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
-      // TODO check if its already in sorted order?
-      // TODO maybe the potential benefit of sorting is not worth the cost
-      Collections.sort(entry.getValue(), new Comparator<ServerConditionalMutation>() {
-        @Override
-        public int compare(ServerConditionalMutation o1, ServerConditionalMutation o2) {
-          return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
-        }
-      });
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java
deleted file mode 100644
index 65fa538..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-public class EndOfTableException extends Exception {
-  
-  /**
-	 * 
-	 */
-  private static final long serialVersionUID = 1L;
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
deleted file mode 100644
index 7349d51..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
+++ /dev/null
@@ -1,561 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Semaphore;
-
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
-import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
-import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
-import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.problems.ProblemReport;
-import org.apache.accumulo.server.problems.ProblemReportingIterator;
-import org.apache.accumulo.server.problems.ProblemReports;
-import org.apache.accumulo.server.problems.ProblemType;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-public class FileManager {
-  
-  private static final Logger log = Logger.getLogger(FileManager.class);
-  
-  int maxOpen;
-  
-  private static class OpenReader implements Comparable<OpenReader> {
-    long releaseTime;
-    FileSKVIterator reader;
-    String fileName;
-    
-    public OpenReader(String fileName, FileSKVIterator reader) {
-      this.fileName = fileName;
-      this.reader = reader;
-      this.releaseTime = System.currentTimeMillis();
-    }
-    
-    @Override
-    public int compareTo(OpenReader o) {
-      if (releaseTime < o.releaseTime) {
-        return -1;
-      } else if (releaseTime > o.releaseTime) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-    
-    @Override
-    public boolean equals(Object obj) {
-      if (obj instanceof OpenReader) {
-        return compareTo((OpenReader) obj) == 0;
-      }
-      return false;
-    }
-    
-    @Override
-    public int hashCode() {
-      return fileName.hashCode();
-    }
-  }
-  
-  private Map<String,List<OpenReader>> openFiles;
-  private HashMap<FileSKVIterator,String> reservedReaders;
-  
-  private Semaphore filePermits;
-  
-  private VolumeManager fs;
-  
-  // the data cache and index cache are allocated in
-  // TabletResourceManager and passed through the file opener to
-  // CachableBlockFile which can handle the caches being
-  // null if unallocated
-  private BlockCache dataCache = null;
-  private BlockCache indexCache = null;
-  
-  private long maxIdleTime;
-  
-  private final ServerConfiguration conf;
-  
-  private class IdleFileCloser implements Runnable {
-    
-    @Override
-    public void run() {
-      
-      long curTime = System.currentTimeMillis();
-      
-      ArrayList<FileSKVIterator> filesToClose = new ArrayList<FileSKVIterator>();
-      
-      // determine which files to close in a sync block, and then close the
-      // files outside of the sync block
-      synchronized (FileManager.this) {
-        Iterator<Entry<String,List<OpenReader>>> iter = openFiles.entrySet().iterator();
-        while (iter.hasNext()) {
-          Entry<String,List<OpenReader>> entry = iter.next();
-          List<OpenReader> ofl = entry.getValue();
-          
-          for (Iterator<OpenReader> oflIter = ofl.iterator(); oflIter.hasNext();) {
-            OpenReader openReader = oflIter.next();
-            
-            if (curTime - openReader.releaseTime > maxIdleTime) {
-              
-              filesToClose.add(openReader.reader);
-              oflIter.remove();
-            }
-          }
-          
-          if (ofl.size() == 0) {
-            iter.remove();
-          }
-        }
-      }
-      
-      closeReaders(filesToClose);
-      
-    }
-    
-  }
-  
-  /**
-   * 
-   * @param dataCache
-   *          : underlying file can and should be able to handle a null cache
-   * @param indexCache
-   *          : underlying file can and should be able to handle a null cache
-   */
-  FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
-    
-    if (maxOpen <= 0)
-      throw new IllegalArgumentException("maxOpen <= 0");
-    this.conf = conf;
-    this.dataCache = dataCache;
-    this.indexCache = indexCache;
-    
-    this.filePermits = new Semaphore(maxOpen, true);
-    this.maxOpen = maxOpen;
-    this.fs = fs;
-    
-    this.openFiles = new HashMap<String,List<OpenReader>>();
-    this.reservedReaders = new HashMap<FileSKVIterator,String>();
-    
-    this.maxIdleTime = conf.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
-    SimpleTimer.getInstance().schedule(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2);
-    
-  }
-  
-  private static int countReaders(Map<String,List<OpenReader>> files) {
-    int count = 0;
-    
-    for (List<OpenReader> list : files.values()) {
-      count += list.size();
-    }
-    
-    return count;
-  }
-  
-  private List<FileSKVIterator> takeLRUOpenFiles(int numToTake) {
-    
-    ArrayList<OpenReader> openReaders = new ArrayList<OpenReader>();
-    
-    for (Entry<String,List<OpenReader>> entry : openFiles.entrySet()) {
-      openReaders.addAll(entry.getValue());
-    }
-    
-    Collections.sort(openReaders);
-    
-    ArrayList<FileSKVIterator> ret = new ArrayList<FileSKVIterator>();
-    
-    for (int i = 0; i < numToTake; i++) {
-      OpenReader or = openReaders.get(i);
-      
-      List<OpenReader> ofl = openFiles.get(or.fileName);
-      if (!ofl.remove(or)) {
-        throw new RuntimeException("Failed to remove open reader that should have been there");
-      }
-      
-      if (ofl.size() == 0) {
-        openFiles.remove(or.fileName);
-      }
-      
-      ret.add(or.reader);
-    }
-    
-    return ret;
-  }
-  
-  private static <T> List<T> getFileList(String file, Map<String,List<T>> files) {
-    List<T> ofl = files.get(file);
-    if (ofl == null) {
-      ofl = new ArrayList<T>();
-      files.put(file, ofl);
-    }
-    
-    return ofl;
-  }
-  
-  private void closeReaders(List<FileSKVIterator> filesToClose) {
-    for (FileSKVIterator reader : filesToClose) {
-      try {
-        reader.close();
-      } catch (Exception e) {
-        log.error("Failed to close file " + e.getMessage(), e);
-      }
-    }
-  }
-  
-  private List<String> takeOpenFiles(Collection<String> files, List<FileSKVIterator> reservedFiles, Map<FileSKVIterator,String> readersReserved) {
-    List<String> filesToOpen = new LinkedList<String>(files);
-    for (Iterator<String> iterator = filesToOpen.iterator(); iterator.hasNext();) {
-      String file = iterator.next();
-      
-      List<OpenReader> ofl = openFiles.get(file);
-      if (ofl != null && ofl.size() > 0) {
-        OpenReader openReader = ofl.remove(ofl.size() - 1);
-        reservedFiles.add(openReader.reader);
-        readersReserved.put(openReader.reader, file);
-        if (ofl.size() == 0) {
-          openFiles.remove(file);
-        }
-        iterator.remove();
-      }
-      
-    }
-    return filesToOpen;
-  }
-  
-  private synchronized String getReservedReadeFilename(FileSKVIterator reader) {
-    return reservedReaders.get(reader);
-  }
-  
-  private List<FileSKVIterator> reserveReaders(Text table, Collection<String> files, boolean continueOnFailure) throws IOException {
-    
-    if (files.size() >= maxOpen) {
-      throw new IllegalArgumentException("requested files exceeds max open");
-    }
-    
-    if (files.size() == 0) {
-      return Collections.emptyList();
-    }
-    
-    List<String> filesToOpen = null;
-    List<FileSKVIterator> filesToClose = Collections.emptyList();
-    List<FileSKVIterator> reservedFiles = new ArrayList<FileSKVIterator>();
-    Map<FileSKVIterator,String> readersReserved = new HashMap<FileSKVIterator,String>();
-    
-    filePermits.acquireUninterruptibly(files.size());
-    
-    // now that the we are past the semaphore, we have the authority
-    // to open files.size() files
-    
-    // determine what work needs to be done in sync block
-    // but do the work of opening and closing files outside
-    // a synch block
-    synchronized (this) {
-      
-      filesToOpen = takeOpenFiles(files, reservedFiles, readersReserved);
-      
-      int numOpen = countReaders(openFiles);
-      
-      if (filesToOpen.size() + numOpen + reservedReaders.size() > maxOpen) {
-        filesToClose = takeLRUOpenFiles((filesToOpen.size() + numOpen + reservedReaders.size()) - maxOpen);
-      }
-    }
-    
-    // close files before opening files to ensure we stay under resource
-    // limitations
-    closeReaders(filesToClose);
-    
-    // open any files that need to be opened
-    for (String file : filesToOpen) {
-      try {
-        Path path = fs.getFullPath(ServerConstants.getTablesDirs(), file);
-        FileSystem ns = fs.getFileSystemByPath(path);
-        //log.debug("Opening "+file + " path " + path);
-        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
-            dataCache, indexCache);
-        reservedFiles.add(reader);
-        readersReserved.put(reader, file);
-      } catch (Exception e) {
-        
-        ProblemReports.getInstance().report(new ProblemReport(table.toString(), ProblemType.FILE_READ, file, e));
-        
-        if (continueOnFailure) {
-          // release the permit for the file that failed to open
-          filePermits.release(1);
-          log.warn("Failed to open file " + file + " " + e.getMessage() + " continuing...");
-        } else {
-          // close whatever files were opened
-          closeReaders(reservedFiles);
-          
-          filePermits.release(files.size());
-          
-          log.error("Failed to open file " + file + " " + e.getMessage());
-          throw new IOException("Failed to open " + file, e);
-        }
-      }
-    }
-    
-    synchronized (this) {
-      // update set of reserved readers
-      reservedReaders.putAll(readersReserved);
-    }
-    
-    return reservedFiles;
-  }
-  
-  private void releaseReaders(List<FileSKVIterator> readers, boolean sawIOException) {
-    // put files in openFiles
-    
-    synchronized (this) {
-      
-      // check that readers were actually reserved ... want to make sure a thread does
-      // not try to release readers they never reserved
-      if (!reservedReaders.keySet().containsAll(readers)) {
-        throw new IllegalArgumentException("Asked to release readers that were never reserved ");
-      }
-      
-      for (FileSKVIterator reader : readers) {
-        try {
-          reader.closeDeepCopies();
-        } catch (IOException e) {
-          log.warn(e, e);
-          sawIOException = true;
-        }
-      }
-      
-      for (FileSKVIterator reader : readers) {
-        String fileName = reservedReaders.remove(reader);
-        if (!sawIOException)
-          getFileList(fileName, openFiles).add(new OpenReader(fileName, reader));
-      }
-    }
-    
-    if (sawIOException)
-      closeReaders(readers);
-    
-    // decrement the semaphore
-    filePermits.release(readers.size());
-    
-  }
-  
-  static class FileDataSource implements DataSource {
-    
-    private SortedKeyValueIterator<Key,Value> iter;
-    private ArrayList<FileDataSource> deepCopies;
-    private boolean current = true;
-    private IteratorEnvironment env;
-    private String file;
-    
-    FileDataSource(String file, SortedKeyValueIterator<Key,Value> iter) {
-      this.file = file;
-      this.iter = iter;
-      this.deepCopies = new ArrayList<FileManager.FileDataSource>();
-    }
-    
-    public FileDataSource(IteratorEnvironment env, SortedKeyValueIterator<Key,Value> deepCopy, ArrayList<FileDataSource> deepCopies) {
-      this.iter = deepCopy;
-      this.env = env;
-      this.deepCopies = deepCopies;
-      deepCopies.add(this);
-    }
-    
-    @Override
-    public boolean isCurrent() {
-      return current;
-    }
-    
-    @Override
-    public DataSource getNewDataSource() {
-      current = true;
-      return this;
-    }
-    
-    @Override
-    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
-      return new FileDataSource(env, iter.deepCopy(env), deepCopies);
-    }
-    
-    @Override
-    public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
-      return iter;
-    }
-    
-    void unsetIterator() {
-      current = false;
-      iter = null;
-      for (FileDataSource fds : deepCopies) {
-        fds.current = false;
-        fds.iter = null;
-      }
-    }
-    
-    void setIterator(SortedKeyValueIterator<Key,Value> iter) {
-      current = false;
-      this.iter = iter;
-      for (FileDataSource fds : deepCopies) {
-        fds.current = false;
-        fds.iter = iter.deepCopy(fds.env);
-      }
-    }
-    
-  }
-  
-  public class ScanFileManager {
-    
-    private ArrayList<FileDataSource> dataSources;
-    private ArrayList<FileSKVIterator> tabletReservedReaders;
-    private KeyExtent tablet;
-    private boolean continueOnFailure;
-    
-    ScanFileManager(KeyExtent tablet) {
-      tabletReservedReaders = new ArrayList<FileSKVIterator>();
-      dataSources = new ArrayList<FileDataSource>();
-      this.tablet = tablet;
-      
-      continueOnFailure = conf.getTableConfiguration(tablet).getBoolean(Property.TABLE_FAILURES_IGNORE);
-      
-      if (tablet.isMeta()) {
-        continueOnFailure = false;
-      }
-    }
-    
-    private List<FileSKVIterator> openFileRefs(Collection<FileRef> files) throws TooManyFilesException, IOException {
-      List<String> strings = new ArrayList<String>(files.size());
-      for (FileRef ref : files)
-        strings.add(ref.path().toString());
-      return openFiles(strings);
-    }
-    
-    private List<FileSKVIterator> openFiles(Collection<String> files) throws TooManyFilesException, IOException {
-      // one tablet can not open more than maxOpen files, otherwise it could get stuck
-      // forever waiting on itself to release files
-      
-      if (tabletReservedReaders.size() + files.size() >= maxOpen) {
-        throw new TooManyFilesException("Request to open files would exceed max open files reservedReaders.size()=" + tabletReservedReaders.size()
-            + " files.size()=" + files.size() + " maxOpen=" + maxOpen + " tablet = " + tablet);
-      }
-      
-      List<FileSKVIterator> newlyReservedReaders = reserveReaders(tablet.getTableId(), files, continueOnFailure);
-      
-      tabletReservedReaders.addAll(newlyReservedReaders);
-      return newlyReservedReaders;
-    }
-    
-    synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
-      
-      List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet());
-      
-      ArrayList<InterruptibleIterator> iters = new ArrayList<InterruptibleIterator>();
-      
-      for (FileSKVIterator reader : newlyReservedReaders) {
-        String filename = getReservedReadeFilename(reader);
-        InterruptibleIterator iter;
-        if (detachable) {
-          FileDataSource fds = new FileDataSource(filename, reader);
-          dataSources.add(fds);
-          SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds);
-          iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, ssi);
-        } else {
-          iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, reader);
-        }
-        DataFileValue value = files.get(new FileRef(filename));
-        if (value.isTimeSet()) {
-          iter = new TimeSettingIterator(iter, value.getTime());
-        }
-        
-        iters.add(iter);
-      }
-      
-      return iters;
-    }
-    
-    synchronized void detach() {
-      
-      releaseReaders(tabletReservedReaders, false);
-      tabletReservedReaders.clear();
-      
-      for (FileDataSource fds : dataSources)
-        fds.unsetIterator();
-    }
-    
-    synchronized void reattach() throws IOException {
-      if (tabletReservedReaders.size() != 0)
-        throw new IllegalStateException();
-      
-      Collection<String> files = new ArrayList<String>();
-      for (FileDataSource fds : dataSources)
-        files.add(fds.file);
-      
-      List<FileSKVIterator> newlyReservedReaders = openFiles(files);
-      Map<String,List<FileSKVIterator>> map = new HashMap<String,List<FileSKVIterator>>();
-      for (FileSKVIterator reader : newlyReservedReaders) {
-        String fileName = getReservedReadeFilename(reader);
-        List<FileSKVIterator> list = map.get(fileName);
-        if (list == null) {
-          list = new LinkedList<FileSKVIterator>();
-          map.put(fileName, list);
-        }
-        
-        list.add(reader);
-      }
-      
-      for (FileDataSource fds : dataSources) {
-        FileSKVIterator reader = map.get(fds.file).remove(0);
-        fds.setIterator(reader);
-      }
-    }
-    
-    synchronized void releaseOpenFiles(boolean sawIOException) {
-      releaseReaders(tabletReservedReaders, sawIOException);
-      tabletReservedReaders.clear();
-      dataSources.clear();
-    }
-    
-    synchronized int getNumOpenFiles() {
-      return tabletReservedReaders.size();
-    }
-  }
-  
-  public ScanFileManager newScanFileManager(KeyExtent tablet) {
-    return new ScanFileManager(tablet);
-  }
-}


Mime
View raw message