accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [41/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules
Date Fri, 01 Nov 2013 00:56:20 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java
new file mode 100644
index 0000000..affe12f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.zookeeper.KeeperException;
+
+public class SystemPropUtil {
+  public static boolean setSystemProperty(String property, String value) throws KeeperException, InterruptedException {
+    Property p = Property.getPropertyByKey(property);
+    if ((p != null && !p.getType().isValidFormat(value)) || !Property.isValidZooPropertyKey(property))
+      return false;
+    
+    // create the zk node for this property and set it's data to the specified value
+    String zPath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZCONFIG + "/" + property;
+    ZooReaderWriter.getInstance().putPersistentData(zPath, value.getBytes(), NodeExistsPolicy.OVERWRITE);
+    
+    return true;
+  }
+  
+  public static void removeSystemProperty(String property) throws InterruptedException, KeeperException {
+    String zPath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZCONFIG + "/" + property;
+    ZooReaderWriter.getInstance().recursiveDelete(zPath, NodeMissingPolicy.FAIL);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java
new file mode 100644
index 0000000..2962a52
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import org.apache.accumulo.core.util.TBufferedSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+// Thrift-959 removed the small buffer from TSocket; this adds it back for servers
+public class TBufferedServerSocket extends TServerTransport {
+  
+  // expose acceptImpl
+  static class TServerSocket extends org.apache.thrift.transport.TServerSocket {
+    public TServerSocket(ServerSocket serverSocket) {
+      super(serverSocket);
+    }
+    
+    public TSocket acceptImplPublic() throws TTransportException {
+      return acceptImpl();
+    }
+  }
+  
+  final TServerSocket impl;
+  final int bufferSize;
+  
+  public TBufferedServerSocket(ServerSocket serverSocket, int bufferSize) {
+    this.impl = new TServerSocket(serverSocket);
+    this.bufferSize = bufferSize;
+  }
+  
+  @Override
+  public void listen() throws TTransportException {
+    impl.listen();
+  }
+  
+  @Override
+  public void close() {
+    impl.close();
+  }
+  
+  // Wrap accepted sockets using buffered IO
+  @Override
+  protected TTransport acceptImpl() throws TTransportException {
+    TSocket sock = impl.acceptImplPublic();
+    try {
+      return new TBufferedSocket(sock, this.bufferSize);
+    } catch (IOException e) {
+      throw new TTransportException(e);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
new file mode 100644
index 0000000..4154d9d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.server.util;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Wrapper around ServerSocketChannel.
+ * 
+ * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
+ * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+  private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
+
+  /**
+   * This channel is where all the nonblocking magic happens.
+   */
+  private ServerSocketChannel serverSocketChannel = null;
+
+  /**
+   * Underlying ServerSocket object
+   */
+  private ServerSocket serverSocket_ = null;
+
+  /**
+   * Timeout for client sockets from accept
+   */
+  private int clientTimeout_ = 0;
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port) throws TTransportException {
+    this(port, 0);
+  }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
+    this(new InetSocketAddress(port), clientTimeout);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+    this(bindAddr, 0);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+    clientTimeout_ = clientTimeout;
+    try {
+      serverSocketChannel = ServerSocketChannel.open();
+      serverSocketChannel.configureBlocking(false);
+
+      // Make server socket
+      serverSocket_ = serverSocketChannel.socket();
+      // Prevent 2MSL delay problem on server restarts
+      serverSocket_.setReuseAddress(true);
+      // Bind to listening port
+      serverSocket_.bind(bindAddr);
+    } catch (IOException ioe) {
+      serverSocket_ = null;
+      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+    }
+  }
+
+  public void listen() throws TTransportException {
+    // Make sure not to block on accept
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.setSoTimeout(0);
+      } catch (SocketException sx) {
+        sx.printStackTrace();
+      }
+    }
+  }
+
+  protected TNonblockingSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+    }
+    try {
+      SocketChannel socketChannel = serverSocketChannel.accept();
+      if (socketChannel == null) {
+        return null;
+      }
+
+      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+      tsocket.setTimeout(clientTimeout_);
+      return tsocket;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void registerSelector(Selector selector) {
+    try {
+      // Register the server socket channel, indicating an interest in
+      // accepting new connections
+      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+    } catch (ClosedChannelException e) {
+      // this shouldn't happen, ideally...
+      // TODO: decide what to do with this.
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        log.warn("WARNING: Could not close server socket: " + iox.getMessage());
+      }
+      serverSocket_ = null;
+    }
+  }
+
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
+  public int getPort() {
+    return serverSocket_.getLocalPort();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
new file mode 100644
index 0000000..8abd104
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.TBufferedSocket;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.metrics.ThriftMetrics;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.net.HostAndPort;
+
+public class TServerUtils {
+  private static final Logger log = Logger.getLogger(TServerUtils.class);
+  
+  public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
+  
+  public static class ServerAddress {
+    public final TServer server;
+    public final HostAndPort address;
+    
+    public ServerAddress(TServer server, HostAndPort address) {
+      this.server = server;
+      this.address = address;
+    }
+  }
+  
+  /**
+   * Start a server, at the given port, or higher, if that port is not available.
+   * 
+   * @param portHintProperty
+   *          the port to attempt to open, can be zero, meaning "any available port"
+   * @param processor
+   *          the service to be started
+   * @param serverName
+   *          the name of the class that is providing the service
+   * @param threadName
+   *          name this service's thread for better debugging
+   * @param portSearchProperty
+   * @param minThreadProperty
+   * @param timeBetweenThreadChecksProperty
+   * @return the server object created, and the port actually used
+   * @throws UnknownHostException
+   *           when we don't know our own address
+   */
+  public static ServerAddress startServer(AccumuloConfiguration conf, String address, Property portHintProperty, TProcessor processor, String serverName,
+      String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
+      throws UnknownHostException {
+    int portHint = conf.getPort(portHintProperty);
+    int minThreads = 2;
+    if (minThreadProperty != null)
+      minThreads = conf.getCount(minThreadProperty);
+    long timeBetweenThreadChecks = 1000;
+    if (timeBetweenThreadChecksProperty != null)
+      timeBetweenThreadChecks = conf.getTimeInMillis(timeBetweenThreadChecksProperty);
+    long maxMessageSize = 10 * 1000 * 1000;
+    if (maxMessageSizeProperty != null)
+      maxMessageSize = conf.getMemoryInBytes(maxMessageSizeProperty);
+    boolean portSearch = false;
+    if (portSearchProperty != null)
+      portSearch = conf.getBoolean(portSearchProperty);
+    // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
+    TServerUtils.TimedProcessor timedProcessor = new TServerUtils.TimedProcessor(processor, serverName, threadName);
+    Random random = new Random();
+    for (int j = 0; j < 100; j++) {
+      
+      // Are we going to slide around, looking for an open port?
+      int portsToSearch = 1;
+      if (portSearch)
+        portsToSearch = 1000;
+      
+      for (int i = 0; i < portsToSearch; i++) {
+        int port = portHint + i;
+        if (portHint != 0 && i > 0)
+          port = 1024 + random.nextInt(65535 - 1024);
+        if (port > 65535)
+          port = 1024 + port % (65535 - 1024);
+        try {
+          HostAndPort addr = HostAndPort.fromParts(address, port);
+          return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads, timeBetweenThreadChecks, maxMessageSize);
+        } catch (Exception ex) {
+          log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
+          UtilWaitThread.sleep(250);
+        }
+      }
+    }
+    throw new UnknownHostException("Unable to find a listen port");
+  }
+  
+  public static class TimedProcessor implements TProcessor {
+    
+    final TProcessor other;
+    ThriftMetrics metrics = null;
+    long idleStart = 0;
+    
+    TimedProcessor(TProcessor next, String serverName, String threadName) {
+      this.other = next;
+      // Register the metrics MBean
+      try {
+        metrics = new ThriftMetrics(serverName, threadName);
+        metrics.register();
+      } catch (Exception e) {
+        log.error("Exception registering MBean with MBean Server", e);
+      }
+      idleStart = System.currentTimeMillis();
+    }
+    
+    @Override
+    public boolean process(TProtocol in, TProtocol out) throws TException {
+      long now = 0;
+      if (metrics.isEnabled()) {
+        now = System.currentTimeMillis();
+        metrics.add(ThriftMetrics.idle, (now - idleStart));
+      }
+      try {
+        try {
+          return other.process(in, out);
+        } catch (NullPointerException ex) {
+          // THRIFT-1447 - remove with thrift 0.9
+          return true;
+        }
+      } finally {
+        if (metrics.isEnabled()) {
+          idleStart = System.currentTimeMillis();
+          metrics.add(ThriftMetrics.execute, idleStart - now);
+        }
+      }
+    }
+  }
+  
+  public static class ClientInfoProcessorFactory extends TProcessorFactory {
+    
+    public ClientInfoProcessorFactory(TProcessor processor) {
+      super(processor);
+    }
+    
+    @Override
+    public TProcessor getProcessor(TTransport trans) {
+      if (trans instanceof TBufferedSocket) {
+        TBufferedSocket tsock = (TBufferedSocket) trans;
+        clientAddress.set(tsock.getClientString());
+      }
+      return super.getProcessor(trans);
+    }
+  }
+  
+  public static class THsHaServer extends org.apache.thrift.server.THsHaServer {
+    public THsHaServer(Args args) {
+      super(args);
+    }
+    
+    @Override
+    protected Runnable getRunnable(FrameBuffer frameBuffer) {
+      return new Invocation(frameBuffer);
+    }
+    
+    private class Invocation implements Runnable {
+      
+      private final FrameBuffer frameBuffer;
+      
+      public Invocation(final FrameBuffer frameBuffer) {
+        this.frameBuffer = frameBuffer;
+      }
+      
+      @Override
+      public void run() {
+        if (frameBuffer.trans_ instanceof TNonblockingSocket) {
+          TNonblockingSocket tsock = (TNonblockingSocket) frameBuffer.trans_;
+          Socket sock = tsock.getSocketChannel().socket();
+          clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+        }
+        frameBuffer.invoke();
+      }
+    }
+  }
+  
+  public static ServerAddress startHsHaServer(HostAndPort address, TProcessor processor, final String serverName, String threadName, final int numThreads,
+      long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+    TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
+    // check for the special "bind to everything address"
+    String hostname = address.getHostText();
+    if (hostname.equals("0.0.0.0")) {
+      // can't get the address from the bind, so we'll do our best to invent our hostname
+      try {
+        hostname = InetAddress.getLocalHost().getHostName();
+      } catch (UnknownHostException e) {
+        throw new TTransportException(e);
+      }
+    }
+    int port = address.getPort();
+    if (port == 0) {
+      port = transport.getPort();
+    }
+    THsHaServer.Args options = new THsHaServer.Args(transport);
+    options.protocolFactory(ThriftUtil.protocolFactory());
+    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+    options.stopTimeoutVal(5);
+    /*
+     * Create our own very special thread pool.
+     */
+    final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+    // periodically adjust the number of threads we need by checking how busy our threads are
+    SimpleTimer.getInstance().schedule(new Runnable() {
+      @Override
+      public void run() {
+        if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+          int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+          log.info("Increasing server thread pool size on " + serverName + " to " + larger);
+          pool.setMaximumPoolSize(larger);
+          pool.setCorePoolSize(larger);
+        } else {
+          if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+            int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
+            if (smaller != pool.getCorePoolSize()) {
+              // there is a race condition here... the active count could be higher by the time
+              // we decrease the core pool size... so the active count could end up higher than
+              // the core pool size, in which case everything will be queued... the increase case
+              // should handle this and prevent deadlock
+              log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
+              pool.setCorePoolSize(smaller);
+            }
+          }
+        }
+      }
+    }, timeBetweenThreadChecks, timeBetweenThreadChecks);
+    options.executorService(pool);
+    options.processorFactory(new TProcessorFactory(processor));
+    return new ServerAddress(new THsHaServer(options), HostAndPort.fromParts(hostname, port));
+  }
+  
+  public static ServerAddress startThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads)
+      throws TTransportException {
+    
+    // if port is zero, then we must bind to get the port number
+    ServerSocket sock;
+    try {
+      sock = ServerSocketChannel.open().socket();
+      sock.setReuseAddress(true);
+      sock.bind(new InetSocketAddress(address.getHostText(), address.getPort()));
+      address = HostAndPort.fromParts(address.getHostText(), sock.getLocalPort());
+    } catch (IOException ex) {
+      throw new TTransportException(ex);
+    }
+    TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024);
+    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+    options.protocolFactory(ThriftUtil.protocolFactory());
+    options.transportFactory(ThriftUtil.transportFactory());
+    options.processorFactory(new ClientInfoProcessorFactory(processor));
+    return new ServerAddress(new TThreadPoolServer(options), address);
+  }
+  
+  public static ServerAddress startTServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize)
+      throws TTransportException {
+    return startTServer(address, new TimedProcessor(processor, serverName, threadName), serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
+  }
+  
+  public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize)
+      throws TTransportException {
+    ServerAddress result = startHsHaServer(address, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
+    //ServerAddress result = startThreadPoolServer(address, processor, serverName, threadName, -1);
+    final TServer finalServer = result.server;
+    Runnable serveTask = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          finalServer.serve();
+        } catch (Error e) {
+          Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+        }
+      }
+    };
+    serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
+    Thread thread = new Daemon(serveTask, threadName);
+    thread.start();
+    return result;
+  }
+  
+  // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
+  public static void stopTServer(TServer s) {
+    if (s == null)
+      return;
+    s.stop();
+    try {
+      Field f = s.getClass().getDeclaredField("executorService_");
+      f.setAccessible(true);
+      ExecutorService es = (ExecutorService) f.get(s);
+      es.shutdownNow();
+    } catch (Exception e) {
+      TServerUtils.log.error("Unable to call shutdownNow", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java b/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
new file mode 100644
index 0000000..b7019e6
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.NumUtil;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+import com.beust.jcommander.Parameter;
+
+public class TableDiskUsage {
+  
+  private static final Logger log = Logger.getLogger(Logger.class);
+  private int nextInternalId = 0;
+  private Map<String,Integer> internalIds = new HashMap<String,Integer>();
+  private Map<Integer,String> externalIds = new HashMap<Integer,String>();
+  private Map<String,Integer[]> tableFiles = new HashMap<String,Integer[]>();
+  private Map<String,Long> fileSizes = new HashMap<String,Long>();
+  
+  void addTable(String tableId) {
+    if (internalIds.containsKey(tableId))
+      throw new IllegalArgumentException("Already added table " + tableId);
+    
+    int iid = nextInternalId++;
+    
+    internalIds.put(tableId, iid);
+    externalIds.put(iid, tableId);
+  }
+  
+  void linkFileAndTable(String tableId, String file) {
+    int internalId = internalIds.get(tableId);
+    
+    Integer[] tables = tableFiles.get(file);
+    if (tables == null) {
+      tables = new Integer[internalIds.size()];
+      for (int i = 0; i < tables.length; i++)
+        tables[i] = 0;
+      tableFiles.put(file, tables);
+    }
+    
+    tables[internalId] = 1;
+  }
+  
+  void addFileSize(String file, long size) {
+    fileSizes.put(file, size);
+  }
+  
+  Map<List<String>,Long> calculateUsage() {
+    
+    Map<List<Integer>,Long> usage = new HashMap<List<Integer>,Long>();
+    
+    for (Entry<String,Integer[]> entry : tableFiles.entrySet()) {
+      log.info("fileSizes " + fileSizes + " key " + Arrays.asList(entry.getKey()));
+      List<Integer> key = Arrays.asList(entry.getValue());
+      Long size = fileSizes.get(entry.getKey());
+      
+      Long tablesUsage = usage.get(key);
+      if (tablesUsage == null)
+        tablesUsage = 0l;
+      
+      tablesUsage += size;
+      
+      usage.put(key, tablesUsage);
+      
+    }
+    
+    Map<List<String>,Long> externalUsage = new HashMap<List<String>,Long>();
+    
+    for (Entry<List<Integer>,Long> entry : usage.entrySet()) {
+      List<String> externalKey = new ArrayList<String>();
+      List<Integer> key = entry.getKey();
+      for (int i = 0; i < key.size(); i++)
+        if (key.get(i) != 0)
+          externalKey.add(externalIds.get(i));
+      
+      externalUsage.put(externalKey, entry.getValue());
+    }
+    
+    return externalUsage;
+  }
+  
+  public interface Printer {
+    void print(String line);
+  }
+  
+  public static void printDiskUsage(AccumuloConfiguration acuConf, Collection<String> tables, VolumeManager fs, Connector conn, boolean humanReadable)
+      throws TableNotFoundException, IOException {
+    printDiskUsage(acuConf, tables, fs, conn, new Printer() {
+      @Override
+      public void print(String line) {
+        System.out.println(line);
+      }
+    }, humanReadable);
+  }
+  
+  public static Map<TreeSet<String>,Long> getDiskUsage(AccumuloConfiguration acuConf, Set<String> tableIds, VolumeManager fs, Connector conn) throws IOException {
+    TableDiskUsage tdu = new TableDiskUsage();
+    
+    for (String tableId : tableIds)
+      tdu.addTable(tableId);
+    
+    HashSet<String> tablesReferenced = new HashSet<String>(tableIds);
+    HashSet<String> emptyTableIds = new HashSet<String>();
+    
+    for (String tableId : tableIds) {
+      Scanner mdScanner = null;
+      try {
+        mdScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+      } catch (TableNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+      mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+      mdScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+      
+      if (!mdScanner.iterator().hasNext()) {
+        emptyTableIds.add(tableId);
+      }
+      
+      for (Entry<Key,Value> entry : mdScanner) {
+        String file = entry.getKey().getColumnQualifier().toString();
+        String parts[] = file.split("/");
+        String uniqueName = parts[parts.length - 1];
+        if (file.contains(":") || file.startsWith("../")) {
+          String ref = parts[parts.length - 3];
+          if (!ref.equals(tableId)) {
+            tablesReferenced.add(ref);
+          }
+        }
+        
+        tdu.linkFileAndTable(tableId, uniqueName);
+      }
+    }
+    
+    for (String tableId : tablesReferenced) {
+      for (String tableDir : ServerConstants.getTablesDirs()) {
+        FileStatus[] files = fs.globStatus(new Path(tableDir + "/" + tableId + "/*/*"));
+        if (files != null) {
+          for (FileStatus fileStatus : files) {
+            // Assumes that all filenames are unique
+            String name = fileStatus.getPath().getName();
+            tdu.addFileSize(name, fileStatus.getLen());
+          }
+        }
+      }
+    }
+    
+    HashMap<String,String> reverseTableIdMap = new HashMap<String,String>();
+    for (Entry<String,String> entry : conn.tableOperations().tableIdMap().entrySet())
+      reverseTableIdMap.put(entry.getValue(), entry.getKey());
+    
+    TreeMap<TreeSet<String>,Long> usage = new TreeMap<TreeSet<String>,Long>(new Comparator<TreeSet<String>>() {
+      
+      @Override
+      public int compare(TreeSet<String> o1, TreeSet<String> o2) {
+        int len1 = o1.size();
+        int len2 = o2.size();
+        
+        int min = Math.min(len1, len2);
+        
+        Iterator<String> iter1 = o1.iterator();
+        Iterator<String> iter2 = o2.iterator();
+        
+        int count = 0;
+        
+        while (count < min) {
+          String s1 = iter1.next();
+          String s2 = iter2.next();
+          
+          int cmp = s1.compareTo(s2);
+          
+          if (cmp != 0)
+            return cmp;
+          
+          count++;
+        }
+        
+        return len1 - len2;
+      }
+    });
+    
+    for (Entry<List<String>,Long> entry : tdu.calculateUsage().entrySet()) {
+      TreeSet<String> tableNames = new TreeSet<String>();
+      for (String tableId : entry.getKey())
+        tableNames.add(reverseTableIdMap.get(tableId));
+      
+      usage.put(tableNames, entry.getValue());
+    }
+    
+    if (!emptyTableIds.isEmpty()) {
+      TreeSet<String> emptyTables = new TreeSet<String>();
+      for (String tableId : emptyTableIds) {
+        emptyTables.add(reverseTableIdMap.get(tableId));
+      }
+      usage.put(emptyTables, 0L);
+    }
+    
+    return usage;
+  }
+  
+  public static void printDiskUsage(AccumuloConfiguration acuConf, Collection<String> tables, VolumeManager fs, Connector conn, Printer printer,
+      boolean humanReadable) throws TableNotFoundException, IOException {
+    
+    HashSet<String> tableIds = new HashSet<String>();
+    
+    for (String tableName : tables) {
+      String tableId = conn.tableOperations().tableIdMap().get(tableName);
+      if (tableId == null)
+        throw new TableNotFoundException(null, tableName, "Table " + tableName + " not found");
+      
+      tableIds.add(tableId);
+    }
+    
+    Map<TreeSet<String>,Long> usage = getDiskUsage(acuConf, tableIds, fs, conn);
+    
+    String valueFormat = humanReadable ? "%9s" : "%,24d";
+    for (Entry<TreeSet<String>,Long> entry : usage.entrySet()) {
+      Object value = humanReadable ? NumUtil.bigNumberForSize(entry.getValue()) : entry.getValue();
+      printer.print(String.format(valueFormat + " %s", value, entry.getKey()));
+    }
+  }
+  
+  static class Opts extends ClientOpts {
+    @Parameter(description = " <table> { <table> ... } ")
+    List<String> tables = new ArrayList<String>();
+  }
+  
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    VolumeManager fs = VolumeManagerImpl.get();
+    Opts opts = new Opts();
+    opts.parseArgs(TableDiskUsage.class.getName(), args);
+    Connector conn = opts.getConnector();
+    org.apache.accumulo.server.util.TableDiskUsage.printDiskUsage(DefaultConfiguration.getInstance(), opts.tables, fs, conn, false);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/TableInfoUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TableInfoUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/TableInfoUtil.java
new file mode 100644
index 0000000..cc91ef3
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TableInfoUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.master.thrift.Compacting;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+
+/**
+ * 
+ */
+public class TableInfoUtil {
+  
+  public static void add(TableInfo total, TableInfo more) {
+    if (total.minors == null)
+      total.minors = new Compacting();
+    if (total.majors == null)
+      total.majors = new Compacting();
+    if (total.scans == null)
+      total.scans = new Compacting();
+    if (more.minors != null) {
+      total.minors.running += more.minors.running;
+      total.minors.queued += more.minors.queued;
+    }
+    if (more.majors != null) {
+      total.majors.running += more.majors.running;
+      total.majors.queued += more.majors.queued;
+    }
+    if (more.scans != null) {
+      total.scans.running += more.scans.running;
+      total.scans.queued += more.scans.queued;
+    }
+    total.onlineTablets += more.onlineTablets;
+    total.recs += more.recs;
+    total.recsInMemory += more.recsInMemory;
+    total.tablets += more.tablets;
+    total.ingestRate += more.ingestRate;
+    total.ingestByteRate += more.ingestByteRate;
+    total.queryRate += more.queryRate;
+    total.queryByteRate += more.queryByteRate;
+    total.scanRate += more.scanRate;
+  }
+  
+  public static TableInfo summarizeTableStats(TabletServerStatus status) {
+    TableInfo summary = new TableInfo();
+    summary.majors = new Compacting();
+    summary.minors = new Compacting();
+    summary.scans = new Compacting();
+    for (TableInfo rates : status.tableMap.values()) {
+      TableInfoUtil.add(summary, rates);
+    }
+    return summary;
+  }
+  
+  public static Map<String,Double> summarizeTableStats(MasterMonitorInfo mmi) {
+    Map<String,Double> compactingByTable = new HashMap<String,Double>();
+    if (mmi != null && mmi.tServerInfo != null) {
+      for (TabletServerStatus status : mmi.tServerInfo) {
+        if (status != null && status.tableMap != null) {
+          for (String table : status.tableMap.keySet()) {
+            Double holdTime = compactingByTable.get(table);
+            compactingByTable.put(table, Math.max(holdTime == null ? 0. : holdTime.doubleValue(), status.holdTime));
+          }
+        }
+      }
+    }
+    return compactingByTable;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/TablePropUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TablePropUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/TablePropUtil.java
new file mode 100644
index 0000000..cdee8fb
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TablePropUtil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.zookeeper.KeeperException;
+
+public class TablePropUtil {
+  public static boolean setTableProperty(String tableId, String property, String value) throws KeeperException, InterruptedException {
+    if (!isPropertyValid(property, value))
+      return false;
+    
+    // create the zk node for per-table properties for this table if it doesn't already exist
+    String zkTablePath = getTablePath(tableId);
+    ZooReaderWriter.getInstance().putPersistentData(zkTablePath, new byte[0], NodeExistsPolicy.SKIP);
+    
+    // create the zk node for this property and set it's data to the specified value
+    String zPath = zkTablePath + "/" + property;
+    ZooReaderWriter.getInstance().putPersistentData(zPath, value.getBytes(), NodeExistsPolicy.OVERWRITE);
+    
+    return true;
+  }
+  
+  public static boolean isPropertyValid(String property, String value) {
+    Property p = Property.getPropertyByKey(property);
+    if ((p != null && !p.getType().isValidFormat(value)) || !Property.isValidTablePropertyKey(property))
+      return false;
+    
+    return true;
+  }
+  
+  public static void removeTableProperty(String tableId, String property) throws InterruptedException, KeeperException {
+    String zPath = getTablePath(tableId) + "/" + property;
+    ZooReaderWriter.getInstance().recursiveDelete(zPath, NodeMissingPolicy.SKIP);
+  }
+  
+  private static String getTablePath(String tablename) {
+    return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTABLES + "/" + tablename + Constants.ZTABLE_CONF;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/TabletIterator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TabletIterator.java b/server/base/src/main/java/org/apache/accumulo/server/util/TabletIterator.java
new file mode 100644
index 0000000..8dd414b
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TabletIterator.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * This class iterates over the metadata table returning all key values for a tablet in one chunk. As it scans the metadata table it checks the correctness of
+ * the metadata table, and rescans if needed. So the tablet key/values returned by this iterator should satisfy the sorted linked list property of the metadata
+ * table.
+ * 
+ * The purpose of this is to hide inconsistencies caused by splits and detect anomalies in the metadata table.
+ * 
+ * If a tablet that was returned by this iterator is subsequently deleted from the metadata table, then this iterator will throw a TabletDeletedException. This
+ * could occur when a table is merged.
+ * 
+ * 
+ */
+public class TabletIterator implements Iterator<Map<Key,Value>> {
+  
+  private static final Logger log = Logger.getLogger(TabletIterator.class);
+  
+  private SortedMap<Key,Value> currentTabletKeys;
+  
+  private Text lastTablet;
+  
+  private Scanner scanner;
+  private Iterator<Entry<Key,Value>> iter;
+  
+  private boolean returnPrevEndRow;
+  
+  private boolean returnDir;
+  
+  private Range range;
+  
+  public static class TabletDeletedException extends RuntimeException {
+    
+    /**
+		 * 
+		 */
+    
+    private static final long serialVersionUID = 1L;
+    
+    public TabletDeletedException(String msg) {
+      super(msg);
+    }
+  }
+  
+  /*
+   * public TabletIterator(String table, boolean returnPrevEndRow){
+   * 
+   * }
+   */
+  
+  /**
+   * 
+   * @param s
+   *          A scanner over the entire metadata table configure to fetch needed columns.
+   */
+  
+  public TabletIterator(Scanner s, Range range, boolean returnPrevEndRow, boolean returnDir) {
+    this.scanner = s;
+    this.range = range;
+    this.scanner.setRange(range);
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+    TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+    this.iter = s.iterator();
+    this.returnPrevEndRow = returnPrevEndRow;
+    this.returnDir = returnDir;
+  }
+  
+  @Override
+  public boolean hasNext() {
+    while (currentTabletKeys == null) {
+      
+      currentTabletKeys = scanToPrevEndRow();
+      if (currentTabletKeys.size() == 0) {
+        break;
+      }
+      
+      Key prevEndRowKey = currentTabletKeys.lastKey();
+      Value prevEndRowValue = currentTabletKeys.get(prevEndRowKey);
+      
+      if (!TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(prevEndRowKey)) {
+        log.debug(currentTabletKeys);
+        throw new RuntimeException("Unexpected key " + prevEndRowKey);
+      }
+      
+      Text per = KeyExtent.decodePrevEndRow(prevEndRowValue);
+      Text lastEndRow;
+      
+      if (lastTablet == null) {
+        lastEndRow = null;
+      } else {
+        lastEndRow = new KeyExtent(lastTablet, (Text) null).getEndRow();
+        
+        // do table transition sanity check
+        String lastTable = new KeyExtent(lastTablet, (Text) null).getTableId().toString();
+        String currentTable = new KeyExtent(prevEndRowKey.getRow(), (Text) null).getTableId().toString();
+        
+        if (!lastTable.equals(currentTable) && (per != null || lastEndRow != null)) {
+          log.info("Metadata inconsistency on table transition : " + lastTable + " " + currentTable + " " + per + " " + lastEndRow);
+          
+          currentTabletKeys = null;
+          resetScanner();
+          
+          UtilWaitThread.sleep(250);
+          
+          continue;
+        }
+      }
+      
+      boolean perEqual = (per == null && lastEndRow == null) || (per != null && lastEndRow != null && per.equals(lastEndRow));
+      
+      if (!perEqual) {
+        
+        log.info("Metadata inconsistency : " + per + " != " + lastEndRow + " metadataKey = " + prevEndRowKey);
+        
+        currentTabletKeys = null;
+        resetScanner();
+        
+        UtilWaitThread.sleep(250);
+        
+        continue;
+        
+      }
+      // this tablet is good, so set it as the last tablet
+      lastTablet = prevEndRowKey.getRow();
+    }
+    
+    return currentTabletKeys.size() > 0;
+  }
+  
+  @Override
+  public Map<Key,Value> next() {
+    
+    if (!hasNext())
+      throw new NoSuchElementException();
+    
+    Map<Key,Value> tmp = currentTabletKeys;
+    currentTabletKeys = null;
+    
+    Set<Entry<Key,Value>> es = tmp.entrySet();
+    Iterator<Entry<Key,Value>> esIter = es.iterator();
+    
+    while (esIter.hasNext()) {
+      Map.Entry<Key,Value> entry = esIter.next();
+      if (!returnPrevEndRow && TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(entry.getKey())) {
+        esIter.remove();
+      }
+      
+      if (!returnDir && TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(entry.getKey())) {
+        esIter.remove();
+      }
+    }
+    
+    return tmp;
+  }
+  
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+  
+  private SortedMap<Key,Value> scanToPrevEndRow() {
+    
+    Text curMetaDataRow = null;
+    
+    TreeMap<Key,Value> tm = new TreeMap<Key,Value>();
+    
+    boolean sawPrevEndRow = false;
+    
+    while (true) {
+      while (iter.hasNext()) {
+        Entry<Key,Value> entry = iter.next();
+        
+        if (curMetaDataRow == null) {
+          curMetaDataRow = entry.getKey().getRow();
+        }
+        
+        if (!curMetaDataRow.equals(entry.getKey().getRow())) {
+          // tablet must not have a prev end row, try scanning again
+          break;
+        }
+        
+        tm.put(entry.getKey(), entry.getValue());
+        
+        if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(entry.getKey())) {
+          sawPrevEndRow = true;
+          break;
+        }
+      }
+      
+      if (!sawPrevEndRow && tm.size() > 0) {
+        log.warn("Metadata problem : tablet " + curMetaDataRow + " has no prev end row");
+        resetScanner();
+        curMetaDataRow = null;
+        tm.clear();
+        UtilWaitThread.sleep(250);
+      } else {
+        break;
+      }
+    }
+    
+    return tm;
+  }
+  
+  protected void resetScanner() {
+    
+    Range range;
+    
+    if (lastTablet == null) {
+      range = this.range;
+    } else {
+      // check to see if the last tablet still exist
+      range = new Range(lastTablet, true, lastTablet, true);
+      scanner.setRange(range);
+      int count = 0;
+      for (@SuppressWarnings("unused")
+      Entry<Key,Value> entry : scanner) {
+        count++;
+      }
+      
+      if (count == 0)
+        throw new TabletDeletedException("Tablet " + lastTablet + " was deleted while iterating");
+      
+      // start right after the last good tablet
+      range = new Range(new Key(lastTablet).followingKey(PartialKey.ROW), true, this.range.getEndKey(), this.range.isEndKeyInclusive());
+    }
+    
+    log.info("Resetting " + MetadataTable.NAME + " scanner to " + range);
+    
+    scanner.setRange(range);
+    iter = scanner.iterator();
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java b/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
new file mode 100644
index 0000000..14cf37b
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.tablets.UniqueNameAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class TabletOperations {
+  
+  private static final Logger log = Logger.getLogger(TabletOperations.class);
+  
+  public static String createTabletDirectory(VolumeManager fs, String tableId, Text endRow) {
+    String lowDirectory;
+    
+    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+    String volume = fs.choose(ServerConstants.getTablesDirs());
+    
+    while (true) {
+      try {
+        if (endRow == null) {
+          lowDirectory = Constants.DEFAULT_TABLET_LOCATION;
+          Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
+          if (fs.exists(lowDirectoryPath) || fs.mkdirs(lowDirectoryPath)) {
+            FileSystem pathFs = fs.getFileSystemByPath(lowDirectoryPath);
+            return lowDirectoryPath.makeQualified(pathFs.getUri(), pathFs.getWorkingDirectory()).toString();
+          }
+          log.warn("Failed to create " + lowDirectoryPath + " for unknown reason");
+        } else {
+          lowDirectory = "/" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
+          Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" +  lowDirectory);
+          if (fs.exists(lowDirectoryPath))
+            throw new IllegalStateException("Dir exist when it should not " + lowDirectoryPath);
+          if (fs.mkdirs(lowDirectoryPath)) {
+            FileSystem lowDirectoryFs = fs.getFileSystemByPath(lowDirectoryPath);
+            return lowDirectoryPath.makeQualified(lowDirectoryFs.getUri(), lowDirectoryFs.getWorkingDirectory()).toString();
+          }
+        }
+      } catch (IOException e) {
+        log.warn(e);
+      }
+      
+      log.warn("Failed to create dir for tablet in table " + tableId + " in volume " + volume + " + will retry ...");
+      UtilWaitThread.sleep(3000);
+      
+    }
+  }
+  
+  public static String createTabletDirectory(String tableDir, Text endRow) {
+    while (true) {
+      try {
+        VolumeManager fs = VolumeManagerImpl.get();
+        return createTabletDirectory(fs, tableDir, endRow);
+      } catch (IOException e) {
+        log.warn(e);
+      }
+      UtilWaitThread.sleep(3000);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
new file mode 100644
index 0000000..dfb05d0
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.List;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+import com.beust.jcommander.Parameter;
+
+public class TabletServerLocks {
+  
+  static class Opts extends Help {
+    @Parameter(names="-list")
+    boolean list = false;
+    @Parameter(names="-delete")
+    String delete = null;
+  }
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    
+    Instance instance = HdfsZooInstance.getInstance();
+    String tserverPath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+    Opts opts = new Opts();
+    opts.parseArgs(TabletServerLocks.class.getName(), args);
+    
+    ZooCache cache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+    
+    if (opts.list) {
+      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+      
+      List<String> tabletServers = zoo.getChildren(tserverPath);
+      
+      for (String tabletServer : tabletServers) {
+        byte[] lockData = ZooLock.getLockData(cache, tserverPath + "/" + tabletServer, null);
+        String holder = null;
+        if (lockData != null) {
+          holder = new String(lockData);
+        }
+        
+        System.out.printf("%32s %16s%n", tabletServer, holder);
+      }
+    } else if (opts.delete != null) {
+      ZooLock.deleteLock(tserverPath + "/" + args[1]);
+    } else {
+      System.out.println("Usage : " + TabletServerLocks.class.getName() + " -list|-delete <tserver lock>");
+    }
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
new file mode 100644
index 0000000..5938bc0
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.thrift.InitialMultiScan;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.accumulo.core.metadata.MetadataServicer;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.apache.thrift.TServiceClient;
+
+import com.beust.jcommander.Parameter;
+
+public class VerifyTabletAssignments {
+  
+  static class Opts extends ClientOpts {
+    @Parameter(names = {"-v", "--verbose"}, description = "verbose mode (prints locations of tablets)")
+    boolean verbose = false;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(VerifyTabletAssignments.class.getName(), args);
+    
+    Connector conn = opts.getConnector();
+    for (String table : conn.tableOperations().list())
+      checkTable(opts, table, null);
+    
+  }
+  
+  private static void checkTable(final Opts opts, String tableName, HashSet<KeyExtent> check) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException, InterruptedException {
+    
+    if (check == null)
+      System.out.println("Checking table " + tableName);
+    else
+      System.out.println("Checking table " + tableName + " again, failures " + check.size());
+    
+    TreeMap<KeyExtent,String> tabletLocations = new TreeMap<KeyExtent,String>();
+    
+    Connector conn = opts.getConnector();
+    final Instance inst = conn.getInstance();
+    String tableId = Tables.getNameToIdMap(inst).get(tableName);
+    Credentials credentials = new Credentials(opts.principal, opts.getToken());
+    MetadataServicer.forTableId(inst, credentials, tableId).getTabletLocations(tabletLocations);
+    
+    final HashSet<KeyExtent> failures = new HashSet<KeyExtent>();
+    
+    Map<String,List<KeyExtent>> extentsPerServer = new TreeMap<String,List<KeyExtent>>();
+    
+    for (Entry<KeyExtent,String> entry : tabletLocations.entrySet()) {
+      KeyExtent keyExtent = entry.getKey();
+      String loc = entry.getValue();
+      if (loc == null)
+        System.out.println(" Tablet " + keyExtent + " has no location");
+      else if (opts.verbose)
+        System.out.println(" Tablet " + keyExtent + " is located at " + loc);
+      
+      if (loc != null) {
+        List<KeyExtent> extentList = extentsPerServer.get(loc);
+        if (extentList == null) {
+          extentList = new ArrayList<KeyExtent>();
+          extentsPerServer.put(loc, extentList);
+        }
+        
+        if (check == null || check.contains(keyExtent))
+          extentList.add(keyExtent);
+      }
+    }
+    
+    ExecutorService tp = Executors.newFixedThreadPool(20);
+    final ServerConfiguration conf = new ServerConfiguration(inst);
+    for (final Entry<String,List<KeyExtent>> entry : extentsPerServer.entrySet()) {
+      Runnable r = new Runnable() {
+        
+        @Override
+        public void run() {
+          try {
+            checkTabletServer(inst, conf.getConfiguration(), new Credentials(opts.principal, opts.getToken()), entry, failures);
+          } catch (Exception e) {
+            System.err.println("Failure on ts " + entry.getKey() + " " + e.getMessage());
+            e.printStackTrace();
+            failures.addAll(entry.getValue());
+          }
+        }
+        
+      };
+      
+      tp.execute(r);
+    }
+    
+    tp.shutdown();
+    
+    while (!tp.awaitTermination(1, TimeUnit.HOURS)) {}
+    
+    if (failures.size() > 0)
+      checkTable(opts, tableName, failures);
+  }
+  
+  private static void checkFailures(String server, HashSet<KeyExtent> failures, MultiScanResult scanResult) {
+    for (TKeyExtent tke : scanResult.failures.keySet()) {
+      KeyExtent ke = new KeyExtent(tke);
+      System.out.println(" Tablet " + ke + " failed at " + server);
+      failures.add(ke);
+    }
+  }
+  
+  private static void checkTabletServer(Instance inst, AccumuloConfiguration conf, Credentials creds, Entry<String,List<KeyExtent>> entry,
+      HashSet<KeyExtent> failures) throws ThriftSecurityException, TException, NoSuchScanIDException {
+    TabletClientService.Iface client = ThriftUtil.getTServerClient(entry.getKey(), conf);
+    
+    Map<TKeyExtent,List<TRange>> batch = new TreeMap<TKeyExtent,List<TRange>>();
+    
+    for (KeyExtent keyExtent : entry.getValue()) {
+      Text row = keyExtent.getEndRow();
+      Text row2 = null;
+      
+      if (row == null) {
+        row = keyExtent.getPrevEndRow();
+        
+        if (row != null) {
+          row = new Text(row);
+          row.append(new byte[] {'a'}, 0, 1);
+        } else {
+          row = new Text("1234567890");
+        }
+        
+        row2 = new Text(row);
+        row2.append(new byte[] {'!'}, 0, 1);
+      } else {
+        row = new Text(row);
+        row2 = new Text(row);
+        
+        row.getBytes()[row.getLength() - 1] = (byte) (row.getBytes()[row.getLength() - 1] - 1);
+      }
+      
+      Range r = new Range(row, true, row2, false);
+      batch.put(keyExtent.toThrift(), Collections.singletonList(r.toThrift()));
+    }
+    TInfo tinfo = Tracer.traceInfo();
+    Map<String,Map<String,String>> emptyMapSMapSS = Collections.emptyMap();
+    List<IterInfo> emptyListIterInfo = Collections.emptyList();
+    List<TColumn> emptyListColumn = Collections.emptyList();
+    InitialMultiScan is = client.startMultiScan(tinfo, creds.toThrift(inst), batch, emptyListColumn, emptyListIterInfo, emptyMapSMapSS,
+        Authorizations.EMPTY.getAuthorizationsBB(), false);
+    if (is.result.more) {
+      MultiScanResult result = client.continueMultiScan(tinfo, is.scanID);
+      checkFailures(entry.getKey(), failures, result);
+      
+      while (result.more) {
+        result = client.continueMultiScan(tinfo, is.scanID);
+        checkFailures(entry.getKey(), failures, result);
+      }
+    }
+    
+    client.closeMultiScan(tinfo, is.scanID);
+    
+    ThriftUtil.returnClient((TServiceClient) client);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java
new file mode 100644
index 0000000..37edb1a
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.beust.jcommander.Parameter;
+
+public class ZooKeeperMain {
+  
+  static class Opts extends Help {
+    
+    @Parameter(names = {"-z", "--keepers"}, description = "Comma separated list of zookeeper hosts (host:port,host:port)")
+    String servers = null;
+    
+    @Parameter(names = {"-t", "--timeout"}, description = "timeout, in seconds to timeout the zookeeper connection")
+    long timeout = 30;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(ZooKeeperMain.class.getName(), args);
+    FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+    String baseDir = ServerConstants.getBaseDirs()[0];
+    System.out.println("Using " + fs.makeQualified(new Path(baseDir + "/instance_id")) + " to lookup accumulo instance");
+    Instance instance = HdfsZooInstance.getInstance();
+    if (opts.servers == null) {
+      opts.servers = instance.getZooKeepers();
+    }
+    System.out.println("The accumulo instance id is " + instance.getInstanceID());
+    if (!opts.servers.contains("/"))
+      opts.servers += "/accumulo/" + instance.getInstanceID();
+    org.apache.zookeeper.ZooKeeperMain.main(new String[] {"-server", opts.servers, "-timeout", "" + (opts.timeout * 1000)});
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
new file mode 100644
index 0000000..489d4bc
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.List;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+public class ZooZap {
+  
+  static boolean verbose = false;
+  
+  /**
+   * @param args
+   */
+  private static void message(String msg) {
+    if (verbose)
+      System.out.println(msg);
+  }
+  
+  static class Opts extends ClientOpts {
+    @Parameter(names="-master", description="remove master locks")
+    boolean zapMaster = false;
+    @Parameter(names="-tservers", description="remove tablet server locks")
+    boolean zapTservers = false;
+    @Parameter(names="-tracers", description="remove tracer locks")
+    boolean zapTracers = false;
+    @Parameter(names="-verbose", description="print out messages about progress")
+    boolean verbose = false;
+  }
+  
+  public static void main(String[] args) {
+    Opts opts = new Opts();
+    opts.parseArgs(ZooZap.class.getName(), args);
+    
+    if (!opts.zapMaster && !opts.zapTservers && !opts.zapTracers)
+    {
+        new JCommander(opts).usage();
+        return;
+    }
+    
+    String iid = opts.getInstance().getInstanceID();
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    
+    if (opts.zapMaster) {
+      String masterLockPath = Constants.ZROOT + "/" + iid + Constants.ZMASTER_LOCK;
+      
+      zapDirectory(zoo, masterLockPath);
+    }
+    
+    if (opts.zapTservers) {
+      String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS;
+      try {
+        List<String> children = zoo.getChildren(tserversPath);
+        for (String child : children) {
+          message("Deleting " + tserversPath + "/" + child + " from zookeeper");
+          
+          if (opts.zapMaster)
+            ZooReaderWriter.getInstance().recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP);
+          else {
+            String path = tserversPath + "/" + child;
+            if (zoo.getChildren(path).size() > 0) {
+              if (!ZooLock.deleteLock(path, "tserver")) {
+                message("Did not delete " + tserversPath + "/" + child);
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    
+    if (opts.zapTracers) {
+      String path = Constants.ZROOT + "/" + iid + Constants.ZTRACERS;
+      zapDirectory(zoo, path);
+    }
+    
+  }
+  
+  private static void zapDirectory(IZooReaderWriter zoo, String path) {
+    try {
+      List<String> children = zoo.getChildren(path);
+      for (String child : children) {
+        message("Deleting " + path + "/" + child + " from zookeeper");
+        zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/time/BaseRelativeTime.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/time/BaseRelativeTime.java b/server/base/src/main/java/org/apache/accumulo/server/util/time/BaseRelativeTime.java
new file mode 100644
index 0000000..393c6d2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/time/BaseRelativeTime.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util.time;
+
+/**
+ * Provide time from a local source and a hint from a time source.
+ * 
+ * RelativeTime and BaseRelativeTime are separated to provide unit tests of the core functionality of Relative timekeeping.
+ * 
+ */
+public class BaseRelativeTime implements ProvidesTime {
+  
+  private long diff = 0;
+  private long lastReportedTime = 0;
+  ProvidesTime local;
+  
+  BaseRelativeTime(ProvidesTime real, long lastReportedTime) {
+    this.local = real;
+    this.lastReportedTime = lastReportedTime;
+  }
+  
+  BaseRelativeTime(ProvidesTime real) {
+    this(real, 0);
+  }
+  
+  @Override
+  synchronized public long currentTime() {
+    long localNow = local.currentTime();
+    long result = localNow + diff;
+    if (result < lastReportedTime)
+      return lastReportedTime;
+    lastReportedTime = result;
+    return result;
+  }
+  
+  synchronized public void updateTime(long advice) {
+    long localNow = local.currentTime();
+    long diff = advice - localNow;
+    // smooth in 20% of the change, not the whole thing.
+    this.diff = (this.diff * 4 / 5) + diff / 5;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/time/ProvidesTime.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/time/ProvidesTime.java b/server/base/src/main/java/org/apache/accumulo/server/util/time/ProvidesTime.java
new file mode 100644
index 0000000..92f0d2c
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/time/ProvidesTime.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util.time;
+
+/**
+ * An interface for anything that returns the time in the same format as System.currentTimeMillis().
+ * 
+ */
+public interface ProvidesTime {
+  public long currentTime();
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/time/RelativeTime.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/time/RelativeTime.java b/server/base/src/main/java/org/apache/accumulo/server/util/time/RelativeTime.java
new file mode 100644
index 0000000..99581e9
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/time/RelativeTime.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util.time;
+
+/**
+ * Provide time from System time and hints from another time source.
+ * 
+ * Provides a convenient static replacement for System.currentTimeMillis()
+ */
+public class RelativeTime extends BaseRelativeTime {
+  
+  private RelativeTime() {
+    super(new SystemTime());
+  }
+  
+  private static BaseRelativeTime instance = new RelativeTime();
+  
+  public static BaseRelativeTime getInstance() {
+    return instance;
+  }
+  
+  public static void setInstance(BaseRelativeTime newInstance) {
+    instance = newInstance;
+  }
+  
+  public static long currentTimeMillis() {
+    return getInstance().currentTime();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java b/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java
new file mode 100644
index 0000000..499b0de
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util.time;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Generic singleton timer: don't use it if you are going to do anything that will take very long. Please use it to reduce the number of threads dedicated to
+ * simple events.
+ * 
+ */
+public class SimpleTimer {
+  
+  static class LoggingTimerTask extends TimerTask {
+    
+    private Runnable task;
+    
+    LoggingTimerTask(Runnable task) {
+      this.task = task;
+    }
+    
+    @Override
+    public void run() {
+      try {
+        task.run();
+      } catch (Throwable t) {
+        Logger.getLogger(LoggingTimerTask.class).warn("Timer task failed " + task.getClass().getName() + " " + t.getMessage(), t);
+      }
+    }
+    
+  }
+
+  private static SimpleTimer instance;
+  private Timer timer;
+  
+  public static synchronized SimpleTimer getInstance() {
+    if (instance == null)
+      instance = new SimpleTimer();
+    return instance;
+  }
+  
+  private SimpleTimer() {
+    timer = new Timer("SimpleTimer", true);
+  }
+  
+  public void schedule(Runnable task, long delay) {
+    timer.schedule(new LoggingTimerTask(task), delay);
+  }
+  
+  public void schedule(Runnable task, long delay, long period) {
+    timer.schedule(new LoggingTimerTask(task), delay, period);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/time/SystemTime.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/time/SystemTime.java b/server/base/src/main/java/org/apache/accumulo/server/util/time/SystemTime.java
new file mode 100644
index 0000000..c421f5f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/time/SystemTime.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util.time;
+
+/**
+ * The most obvious implementation of ProvidesTime.
+ * 
+ */
+public class SystemTime implements ProvidesTime {
+  
+  @Override
+  public long currentTime() {
+    return System.currentTimeMillis();
+  }
+  
+}


Mime
View raw message