accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [02/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT
Date Wed, 09 Apr 2014 17:57:33 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
index efadfae,0000000..09fbbd2
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
@@@ -1,279 -1,0 +1,278 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.util;
 +
 +import java.io.BufferedReader;
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.FileNotFoundException;
 +import java.io.FilenameFilter;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.io.UnsupportedEncodingException;
 +import java.net.Socket;
 +import java.net.URLEncoder;
 +import java.text.ParseException;
 +import java.text.SimpleDateFormat;
 +import java.util.Calendar;
 +import java.util.Date;
 +import java.util.Map;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
 +import javax.net.SocketFactory;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.Help;
 +import org.apache.commons.io.filefilter.WildcardFileFilter;
 +import org.apache.commons.lang.math.LongRange;
 +import org.apache.log4j.Category;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +import org.apache.log4j.spi.Filter;
 +import org.apache.log4j.spi.LocationInfo;
 +import org.apache.log4j.spi.LoggingEvent;
 +import org.apache.log4j.spi.ThrowableInformation;
 +import org.apache.log4j.varia.LevelRangeFilter;
 +import org.apache.log4j.xml.XMLLayout;
 +
 +import com.beust.jcommander.IStringConverter;
 +import com.beust.jcommander.Parameter;
 +
 +public class SendLogToChainsaw extends XMLLayout {
 +  
 +  private static Pattern logPattern = Pattern.compile(
 +      "^(\\d\\d)\\s(\\d\\d):(\\d\\d):(\\d\\d),(\\d\\d\\d)\\s\\[(.*)\\]\\s(TRACE|DEBUG|INFO|WARN|FATAL|ERROR)\\s*?:(.*)$", Pattern.UNIX_LINES);
 +  
 +  private File[] logFiles = null;
 +  
 +  private SocketFactory factory = SocketFactory.getDefault();
 +  
 +  private WildcardFileFilter fileFilter = null;
 +  
 +  private Socket socket = null;
 +  
 +  private Pattern lineFilter = null;
 +  
 +  private LongRange dateFilter = null;
 +  
 +  private LevelRangeFilter levelFilter = null;
 +  
 +  public SendLogToChainsaw(String directory, String fileNameFilter, String host, int port, Date start, Date end, String regex, String level) throws Exception {
 +    
 +    // Set up the file name filter
 +    if (null != fileNameFilter) {
 +      fileFilter = new WildcardFileFilter(fileNameFilter);
 +    } else {
 +      fileFilter = new WildcardFileFilter("*");
 +    }
 +    
 +    // Get the list of files that match
 +    File dir = new File(directory);
 +    if (dir.isDirectory()) {
 +      logFiles = dir.listFiles((FilenameFilter) fileFilter);
 +    } else {
 +      throw new IllegalArgumentException(directory + " is not a directory or is not readable.");
 +    }
 +    
 +    if (logFiles.length == 0) {
 +      throw new IllegalArgumentException("No files match the supplied filter.");
 +    }
 +    
 +    socket = factory.createSocket(host, port);
 +    
 +    lineFilter = Pattern.compile(regex);
 +    
 +    // Create Date Filter
 +    if (null != start) {
 +      if (end == null)
 +        end = new Date(System.currentTimeMillis());
 +      dateFilter = new LongRange(start.getTime(), end.getTime());
 +    }
 +    
 +    if (null != level) {
 +      Level base = Level.toLevel(level.toUpperCase());
 +      levelFilter = new LevelRangeFilter();
 +      levelFilter.setAcceptOnMatch(true);
 +      levelFilter.setLevelMin(base);
 +      levelFilter.setLevelMax(Level.FATAL);
 +    }
 +  }
 +  
 +  public void processLogFiles() throws IOException {
 +    String line = null;
 +    String out = null;
 +    InputStreamReader isReader = null;
 +    BufferedReader reader = null;
 +    try {
 +      for (File log : logFiles) {
 +        // Parse the server type and name from the log file name
 +        String threadName = log.getName().substring(0, log.getName().indexOf("."));
 +        try {
 +          isReader = new InputStreamReader(new FileInputStream(log), Constants.UTF8);
 +        } catch (FileNotFoundException e) {
 +          System.out.println("Unable to find file: " + log.getAbsolutePath());
 +          throw e;
 +	    }
 +        reader = new BufferedReader(isReader);
 +        
 +        try {
 +          line = reader.readLine();
 +          while (null != line) {
 +                out = convertLine(line, threadName);
 +                if (null != out) {
 +                  if (socket != null && socket.isConnected())
 +                    socket.getOutputStream().write(out.getBytes(Constants.UTF8));
 +                  else
 +                    System.err.println("Unable to send data to transport");
 +                }
 +              line = reader.readLine();
 +            }
 +        } catch (IOException e) {
 +            System.out.println("Error processing line: " + line + ". Output was " + out);
 +            throw e;
 +        } finally {
 +          if (reader != null) {
 +            reader.close();
 +          }
 +          if (isReader != null) {
 +            isReader.close();
 +          }
 +        }
 +      }
 +    } finally {
 +      if (socket != null && socket.isConnected()) {
 +        socket.close();
 +      }
 +    }
 +  }
 +  
 +  private String convertLine(String line, String threadName) throws UnsupportedEncodingException {
 +    String result = null;
 +    Matcher m = logPattern.matcher(line);
 +    if (m.matches()) {
 +      
 +      Calendar cal = Calendar.getInstance();
 +      cal.setTime(new Date(System.currentTimeMillis()));
 +      Integer date = Integer.parseInt(m.group(1));
 +      Integer hour = Integer.parseInt(m.group(2));
 +      Integer min = Integer.parseInt(m.group(3));
 +      Integer sec = Integer.parseInt(m.group(4));
 +      Integer ms = Integer.parseInt(m.group(5));
 +      String clazz = m.group(6);
 +      String level = m.group(7);
 +      String message = m.group(8);
 +      // Apply the regex filter if supplied
 +      if (null != lineFilter) {
 +        Matcher match = lineFilter.matcher(message);
 +        if (!match.matches())
 +          return null;
 +      }
 +      // URL encode the message
 +      message = URLEncoder.encode(message, "UTF-8");
 +      // Assume that we are processing logs from today.
 +      // If the date in the line is greater than today, then it must be
 +      // from the previous month.
 +      cal.set(Calendar.DATE, date);
 +      cal.set(Calendar.HOUR_OF_DAY, hour);
 +      cal.set(Calendar.MINUTE, min);
 +      cal.set(Calendar.SECOND, sec);
 +      cal.set(Calendar.MILLISECOND, ms);
 +      if (date > cal.get(Calendar.DAY_OF_MONTH)) {
 +        cal.add(Calendar.MONTH, -1);
 +      }
 +      long ts = cal.getTimeInMillis();
 +      // If this event is not between the start and end dates, then skip it.
 +      if (null != dateFilter && !dateFilter.containsLong(ts))
 +        return null;
 +      Category c = Logger.getLogger(clazz);
 +      Level l = Level.toLevel(level);
 +      LoggingEvent event = new LoggingEvent(clazz, c, ts, l, message, threadName, (ThrowableInformation) null, (String) null, (LocationInfo) null,
 +          (Map<?,?>) null);
 +      // Check the log level filter
 +      if (null != levelFilter && (levelFilter.decide(event) == Filter.DENY)) {
 +        return null;
 +      }
 +      result = format(event);
 +    }
 +    return result;
 +  }
 +  
 +  private static class DateConverter implements IStringConverter<Date> {
 +    @Override
 +    public Date convert(String value) {
 +      SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
 +      try {
 +        return formatter.parse(value);
 +      } catch (ParseException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +    
 +  }
 +  
 +  private static class Opts extends Help {
 +    
 +    @Parameter(names={"-d", "--logDirectory"}, description="ACCUMULO log directory path", required=true)
 +    String dir;
 +    
 +    @Parameter(names={"-f", "--fileFilter"}, description="filter to apply to names of logs")
 +    String filter;
 +    
 +    @Parameter(names={"-h", "--host"}, description="host where chainsaw is running", required=true)
 +    String hostname;
 +    
 +    @Parameter(names={"-p", "--port"}, description="port where XMLSocketReceiver is listening", required=true)
 +    int portnum;
 +    
 +    @Parameter(names={"-s", "--start"}, description="start date filter (yyyyMMddHHmmss)", required=true, converter=DateConverter.class)
 +    Date startDate;
 +    
 +    @Parameter(names={"-e", "--end"}, description="end date filter (yyyyMMddHHmmss)", required=true, converter=DateConverter.class)
 +    Date endDate;
 +    
 +    @Parameter(names={"-l", "--level"}, description="filter log level")
 +    String level;
 +    
 +    @Parameter(names={"-m", "--messageFilter"}, description="regex filter for log messages")
 +    String regex;
 +  }
 +  
 +  
 +  
 +  
 +  /**
 +   * 
 +   * @param args
 +   *   0: path to log directory parameter 
 +   *   1: filter to apply for logs to include (uses wildcards (i.e. logger* and IS case sensitive) parameter
 +   *   2: chainsaw host parameter 
 +   *   3: chainsaw port parameter 
 +   *   4: start date filter parameter 
 +   *   5: end date filter parameter 
 +   *   6: optional regex filter to match on each log4j message parameter 
 +   *   7: optional level filter
-    * @throws Exception
 +   */
 +  public static void main(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(SendLogToChainsaw.class.getName(), args);
 +    
 +    SendLogToChainsaw c = new SendLogToChainsaw(opts.dir, opts.filter, opts.hostname, opts.portnum, opts.startDate, opts.endDate, opts.regex, opts.level);
 +    c.processLogFiles();
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index 50476a2,0000000..fa4de30
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@@ -1,313 -1,0 +1,314 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.util;
 +
 +import java.io.IOException;
 +import java.lang.reflect.Field;
 +import java.net.InetSocketAddress;
 +import java.net.ServerSocket;
 +import java.net.Socket;
 +import java.net.UnknownHostException;
 +import java.nio.channels.ServerSocketChannel;
 +import java.util.Random;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ThreadPoolExecutor;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.LoggingRunnable;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.TBufferedSocket;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.thrift.metrics.ThriftMetrics;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TProcessor;
 +import org.apache.thrift.TProcessorFactory;
 +import org.apache.thrift.protocol.TProtocol;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.server.TThreadPoolServer;
 +import org.apache.thrift.transport.TNonblockingSocket;
 +import org.apache.thrift.transport.TServerTransport;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +
 +public class TServerUtils {
 +  private static final Logger log = Logger.getLogger(TServerUtils.class);
 +  
 +  public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
 +  
 +  public static class ServerPort {
 +    public final TServer server;
 +    public final int port;
 +    
 +    public ServerPort(TServer server, int port) {
 +      this.server = server;
 +      this.port = port;
 +    }
 +  }
 +  
 +  /**
 +   * Start a server, at the given port, or higher, if that port is not available.
 +   * 
 +   * @param portHintProperty
 +   *          the port to attempt to open, can be zero, meaning "any available port"
 +   * @param processor
 +   *          the service to be started
 +   * @param serverName
 +   *          the name of the class that is providing the service
 +   * @param threadName
 +   *          name this service's thread for better debugging
-    * @param portSearchProperty
-    * @param minThreadProperty
-    * @param timeBetweenThreadChecksProperty
 +   * @return the server object created, and the port actually used
 +   * @throws UnknownHostException
 +   *           when we don't know our own address
 +   */
 +  public static ServerPort startServer(AccumuloConfiguration conf, Property portHintProperty, TProcessor processor, String serverName, String threadName,
 +      Property portSearchProperty,
 +      Property minThreadProperty, 
 +      Property timeBetweenThreadChecksProperty, 
 +      Property maxMessageSizeProperty) throws UnknownHostException {
 +    int portHint = conf.getPort(portHintProperty);
 +    int minThreads = 2;
 +    if (minThreadProperty != null)
 +      minThreads = conf.getCount(minThreadProperty);
 +    long timeBetweenThreadChecks = 1000;
 +    if (timeBetweenThreadChecksProperty != null)
 +      timeBetweenThreadChecks = conf.getTimeInMillis(timeBetweenThreadChecksProperty);
 +    long maxMessageSize = 10 * 1000 * 1000;
 +    if (maxMessageSizeProperty != null)
 +      maxMessageSize = conf.getMemoryInBytes(maxMessageSizeProperty);
 +    boolean portSearch = false;
 +    if (portSearchProperty != null)
 +      portSearch = conf.getBoolean(portSearchProperty);
 +    Random random = new Random();
 +    for (int j = 0; j < 100; j++) {
 +      
 +      // Are we going to slide around, looking for an open port?
 +      int portsToSearch = 1;
 +      if (portSearch)
 +        portsToSearch = 1000;
 +      
 +      for (int i = 0; i < portsToSearch; i++) {
 +        int port = portHint + i;
 +        if (portHint != 0 && i > 0)
 +          port = 1024 + random.nextInt(65535 - 1024);
 +        if (port > 65535)
 +          port = 1024 + port % (65535 - 1024);
 +        try {
 +          return TServerUtils.startTServer(port, processor, serverName, threadName, minThreads, timeBetweenThreadChecks, maxMessageSize);
 +        } catch (Exception ex) {
 +          log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
 +          UtilWaitThread.sleep(250);
 +        }
 +      }
 +    }
 +    throw new UnknownHostException("Unable to find a listen port");
 +  }
 +  
 +  public static class TimedProcessor implements TProcessor {
 +    
 +    final TProcessor other;
 +    ThriftMetrics metrics = null;
 +    long idleStart = 0;
 +    
 +    TimedProcessor(TProcessor next, String serverName, String threadName) {
 +      this.other = next;
 +      // Register the metrics MBean
 +      try {
 +        metrics = new ThriftMetrics(serverName, threadName);
 +        metrics.register();
 +      } catch (Exception e) {
 +        log.error("Exception registering MBean with MBean Server", e);
 +      }
 +      idleStart = System.currentTimeMillis();
 +    }
 +    
 +    @Override
 +    public boolean process(TProtocol in, TProtocol out) throws TException {
 +      long now = 0;
 +      if (metrics.isEnabled()) {
 +        now = System.currentTimeMillis();
 +        metrics.add(ThriftMetrics.idle, (now - idleStart));
 +      }
 +      try {
 +        try {
 +          return other.process(in, out);
 +        } catch (NullPointerException ex) {
 +          // THRIFT-1447 - remove with thrift 0.9
 +          return true;
 +        }
 +      } finally {
 +        if (metrics.isEnabled()) {
 +          idleStart = System.currentTimeMillis();
 +          metrics.add(ThriftMetrics.execute, idleStart - now);
 +        }
 +      }
 +    }
 +  }
 +  
 +  public static class ClientInfoProcessorFactory extends TProcessorFactory {
 +    
 +    public ClientInfoProcessorFactory(TProcessor processor) {
 +      super(processor);
 +    }
 +    
++    @Override
 +    public TProcessor getProcessor(TTransport trans) {
 +      if (trans instanceof TBufferedSocket) {
 +        TBufferedSocket tsock = (TBufferedSocket) trans;
 +        clientAddress.set(tsock.getClientString());
 +      }
 +      return super.getProcessor(trans);
 +    }
 +  }
 +  
 +  public static class THsHaServer extends org.apache.thrift.server.THsHaServer {
 +    public THsHaServer(Args args) {
 +      super(args);
 +    }
 +    
++    @Override
 +    protected Runnable getRunnable(FrameBuffer frameBuffer) {
 +      return new Invocation(frameBuffer);
 +    }
 +    
 +    private class Invocation implements Runnable {
 +      
 +      private final FrameBuffer frameBuffer;
 +      
 +      public Invocation(final FrameBuffer frameBuffer) {
 +        this.frameBuffer = frameBuffer;
 +      }
 +      
++      @Override
 +      public void run() {
 +        if (frameBuffer.trans_ instanceof TNonblockingSocket) {
 +          TNonblockingSocket tsock = (TNonblockingSocket) frameBuffer.trans_;
 +          Socket sock = tsock.getSocketChannel().socket();
 +          clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
 +        }
 +        frameBuffer.invoke();
 +      }
 +    }
 +  }
 +  
 +  public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads,
 +      long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
 +    TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
 +    if (port == 0) {
 +      port = transport.getPort();
 +    }
 +    THsHaServer.Args options = new THsHaServer.Args(transport);
 +    options.protocolFactory(ThriftUtil.protocolFactory());
 +    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
 +    options.maxReadBufferBytes = maxMessageSize;
 +    options.stopTimeoutVal(5);
 +    /*
 +     * Create our own very special thread pool.
 +     */
 +    final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
 +    // periodically adjust the number of threads we need by checking how busy our threads are
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        if (pool.getCorePoolSize() <= pool.getActiveCount()) {
 +          int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
 +          log.info("Increasing server thread pool size on " + serverName + " to " + larger);
 +          pool.setMaximumPoolSize(larger);
 +          pool.setCorePoolSize(larger);
 +        } else {
 +          if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
 +            int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
 +            if (smaller != pool.getCorePoolSize()) {
 +              // there is a race condition here... the active count could be higher by the time
 +              // we decrease the core pool size... so the active count could end up higher than
 +              // the core pool size, in which case everything will be queued... the increase case
 +              // should handle this and prevent deadlock
 +              log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
 +              pool.setCorePoolSize(smaller);
 +            }
 +          }
 +        }
 +      }
 +    }, timeBetweenThreadChecks, timeBetweenThreadChecks);
 +    options.executorService(pool);
 +    processor = new TServerUtils.TimedProcessor(processor, serverName, threadName);
 +    options.processorFactory(new TProcessorFactory(processor));
 +    return new ServerPort(new THsHaServer(options), port);
 +  }
 +  
 +  public static ServerPort startThreadPoolServer(int port, TProcessor processor, String serverName, String threadName, int numThreads)
 +      throws TTransportException {
 +    
 +    // if port is zero, then we must bind to get the port number
 +    ServerSocket sock;
 +    try {
 +      sock = ServerSocketChannel.open().socket();
 +      sock.setReuseAddress(true);
 +      sock.bind(new InetSocketAddress(port));
 +      port = sock.getLocalPort();
 +    } catch (IOException ex) {
 +      throw new TTransportException(ex);
 +    }
 +    TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024);
 +    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
 +    options.protocolFactory(ThriftUtil.protocolFactory());
 +    options.transportFactory(ThriftUtil.transportFactory());
 +    processor = new TServerUtils.TimedProcessor(processor, serverName, threadName);
 +    options.processorFactory(new ClientInfoProcessorFactory(processor));
 +    return new ServerPort(new TThreadPoolServer(options), port);
 +  }
 +  
 +  public static ServerPort startTServer(int port, TProcessor processor, String serverName, String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize)
 +      throws TTransportException {
 +    ServerPort result = startHsHaServer(port, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
 +    // ServerPort result = startThreadPoolServer(port, processor, serverName, threadName, -1);
 +    final TServer finalServer = result.server;
 +    Runnable serveTask = new Runnable() {
++      @Override
 +      public void run() {
 +        try {
 +          finalServer.serve();
 +        } catch (Error e) {
 +          Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
 +        }
 +      }
 +    };
 +    serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
 +    Thread thread = new Daemon(serveTask, threadName);
 +    thread.start();
 +    return result;
 +  }
 +  
 +  // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
 +  public static void stopTServer(TServer s) {
 +    if (s == null)
 +      return;
 +    s.stop();
 +    try {
 +      Field f = s.getClass().getDeclaredField("executorService_");
 +      f.setAccessible(true);
 +      ExecutorService es = (ExecutorService) f.get(s);
 +      es.shutdownNow();
 +    } catch (Exception e) {
 +      TServerUtils.log.error("Unable to call shutdownNow", e);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
index c3f4a72,0000000..92e0674
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
@@@ -1,48 -1,0 +1,45 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.util;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import org.apache.accumulo.server.cli.ClientOpts;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.conf.DefaultConfiguration;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +
 +import com.beust.jcommander.Parameter;
 +
 +public class TableDiskUsage {
 +  
 +  static class Opts extends ClientOpts {
 +    @Parameter(description=" <table> { <table> ... } ")
 +    List<String> tables = new ArrayList<String>();
 +  }
 +  
-   /**
-    * @param args
-    */
 +  public static void main(String[] args) throws Exception {
 +    FileSystem fs = FileSystem.get(new Configuration());
 +    Opts opts = new Opts();
 +    opts.parseArgs(TableDiskUsage.class.getName(), args);
 +    Connector conn = opts.getConnector();
 +    org.apache.accumulo.core.util.TableDiskUsage.printDiskUsage(DefaultConfiguration.getInstance(), opts.tables, fs, conn, false);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
index 2fc0bd3,0000000..34c2151
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
@@@ -1,75 -1,0 +1,72 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.util;
 +
 +import java.util.List;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.Help;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooCache;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.zookeeper.ZooLock;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +
 +import com.beust.jcommander.Parameter;
 +
 +public class TabletServerLocks {
 +  
 +  static class Opts extends Help {
 +    @Parameter(names="-list")
 +    boolean list = false;
 +    @Parameter(names="-delete")
 +    String delete = null;
 +  }
-   /**
-    * @param args
-    */
 +  public static void main(String[] args) throws Exception {
 +    
 +    Instance instance = HdfsZooInstance.getInstance();
 +    String tserverPath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
 +    Opts opts = new Opts();
 +    opts.parseArgs(TabletServerLocks.class.getName(), args);
 +    
 +    ZooCache cache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
 +    
 +    if (opts.list) {
 +      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +      
 +      List<String> tabletServers = zoo.getChildren(tserverPath);
 +      
 +      for (String tabletServer : tabletServers) {
 +        byte[] lockData = ZooLock.getLockData(cache, tserverPath + "/" + tabletServer, null);
 +        String holder = null;
 +        if (lockData != null) {
 +          holder = new String(lockData, Constants.UTF8);
 +        }
 +        
 +        System.out.printf("%32s %16s%n", tabletServer, holder);
 +      }
 +    } else if (opts.delete != null) {
 +      ZooLock.deleteLock(tserverPath + "/" + args[1]);
 +    } else {
 +      System.out.println("Usage : " + TabletServerLocks.class.getName() + " -list|-delete <tserver lock>");
 +    }
 +    
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
index 5f7fd5e,0000000..1e2b4b5
mode 100644,000000..100644
--- a/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
@@@ -1,255 -1,0 +1,252 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.start.classloader;
 +
 +import java.io.File;
 +import java.io.FilenameFilter;
 +import java.io.IOException;
 +import java.net.MalformedURLException;
 +import java.net.URI;
 +import java.net.URISyntaxException;
 +import java.net.URL;
 +import java.net.URLClassLoader;
 +import java.util.ArrayList;
 +import java.util.Map;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
 +import javax.xml.parsers.DocumentBuilder;
 +import javax.xml.parsers.DocumentBuilderFactory;
 +
 +import org.apache.log4j.Logger;
 +import org.w3c.dom.Document;
 +import org.w3c.dom.Element;
 +import org.w3c.dom.Node;
 +import org.w3c.dom.NodeList;
 +
 +/**
 + * 
 + */
 +public class AccumuloClassLoader {
 +  
 +  public static final String CLASSPATH_PROPERTY_NAME = "general.classpaths";
 +  
 +  public static final String ACCUMULO_CLASSPATH_VALUE = 
 +      "$ACCUMULO_CONF_DIR,\n" + 
 +          "$ACCUMULO_HOME/lib/[^.].*.jar,\n" + 
 +          "$ZOOKEEPER_HOME/zookeeper[^.].*.jar,\n" + 
 +          "$HADOOP_CONF_DIR,\n" +
 +          "$HADOOP_PREFIX/[^.].*.jar,\n" + 
 +          "$HADOOP_PREFIX/lib/[^.].*.jar,\n" + 
 +          "$HADOOP_PREFIX/share/hadoop/common/.*.jar,\n" +
 +          "$HADOOP_PREFIX/share/hadoop/common/lib/.*.jar,\n" +
 +          "$HADOOP_PREFIX/share/hadoop/hdfs/.*.jar,\n" +
 +          "$HADOOP_PREFIX/share/hadoop/mapreduce/.*.jar,\n" +
 +          "/usr/lib/hadoop/[^.].*.jar,\n" +
 +          "/usr/lib/hadoop/lib/[^.].*.jar,\n" +
 +          "/usr/lib/hadoop-hdfs/[^.].*.jar,\n" +
 +          "/usr/lib/hadoop-mapreduce/[^.].*.jar,\n" +
 +          "/usr/lib/hadoop-yarn/[^.].*.jar,\n"
 +          ;
 +  
 +  private static String SITE_CONF;
 +  
 +  private static URLClassLoader classloader;
 +  
 +  private static Logger log = Logger.getLogger(AccumuloClassLoader.class);
 +  
 +  static {
 +    String configFile = System.getProperty("org.apache.accumulo.config.file", "accumulo-site.xml");
 +    if (System.getenv("ACCUMULO_CONF_DIR") != null) {
 +      // accumulo conf dir should be set
 +      SITE_CONF = System.getenv("ACCUMULO_CONF_DIR") + "/" + configFile;
 +    } else if (System.getenv("ACCUMULO_HOME") != null) {
 +      // if no accumulo conf dir, try accumulo home default
 +      SITE_CONF = System.getenv("ACCUMULO_HOME") + "/conf/" + configFile;
 +    } else {
 +      SITE_CONF = null;
 +    }
 +  }
 +  
 +  /**
 +   * Parses and XML Document for a property node for a <name> with the value propertyName if it finds one the function return that property's value for its
 +   * <value> node. If not found the function will return null
 +   * 
 +   * @param d
 +   *          XMLDocument to search through
 +   * @param propertyName
 +   */
 +  private static String getAccumuloClassPathStrings(Document d, String propertyName) {
 +    NodeList pnodes = d.getElementsByTagName("property");
 +    for (int i = pnodes.getLength() - 1; i >= 0; i--) {
 +      Element current_property = (Element) pnodes.item(i);
 +      Node cname = current_property.getElementsByTagName("name").item(0);
 +      if (cname != null && cname.getTextContent().compareTo(propertyName) == 0) {
 +        Node cvalue = current_property.getElementsByTagName("value").item(0);
 +        if (cvalue != null) {
 +          return cvalue.getTextContent();
 +        }
 +      }
 +    }
 +    return null;
 +  }
 +  
 +  /**
 +   * Looks for the site configuration file for Accumulo and if it has a property for propertyName return it otherwise returns defaultValue Should throw an
 +   * exception if the default configuration can not be read;
 +   * 
 +   * @param propertyName
 +   *          Name of the property to pull
 +   * @param defaultValue
 +   *          Value to default to if not found.
 +   * @return site or default class path String
 +   */
 +  
 +  public static String getAccumuloString(String propertyName, String defaultValue) {
 +    
 +    try {
 +      DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
 +      DocumentBuilder db = dbf.newDocumentBuilder();
 +      String site_classpath_string = null;
 +      try {
 +        Document site_conf = db.parse(SITE_CONF);
 +        site_classpath_string = getAccumuloClassPathStrings(site_conf, propertyName);
 +      } catch (Exception e) {
 +        /* we don't care because this is optional and we can use defaults */
 +      }
 +      if (site_classpath_string != null)
 +        return site_classpath_string;
 +      return defaultValue;
 +    } catch (Exception e) {
 +      throw new IllegalStateException("ClassPath Strings Lookup failed", e);
 +    }
 +  }
 +  
 +  /**
 +   * Replace environment variables in the classpath string with their actual value
-    * 
-    * @param classpath
-    * @param env
 +   */
 +  public static String replaceEnvVars(String classpath, Map<String,String> env) {
 +    Pattern envPat = Pattern.compile("\\$[A-Za-z][a-zA-Z0-9_]*");
 +    Matcher envMatcher = envPat.matcher(classpath);
 +    while (envMatcher.find(0)) {
 +      // name comes after the '$'
 +      String varName = envMatcher.group().substring(1);
 +      String varValue = env.get(varName);
 +      if (varValue == null) {
 +        varValue = "";
 +      }
 +      classpath = (classpath.substring(0, envMatcher.start()) + varValue + classpath.substring(envMatcher.end()));
 +      envMatcher.reset(classpath);
 +    }
 +    return classpath;
 +  }
 +  
 +  /**
 +   * Populate the list of URLs with the items in the classpath string
 +   * 
 +   * @param classpath
 +   * @param urls
 +   * @throws MalformedURLException
 +   */
 +  private static void addUrl(String classpath, ArrayList<URL> urls) throws MalformedURLException {
 +    classpath = classpath.trim();
 +    if (classpath.length() == 0)
 +      return;
 +    
 +    classpath = replaceEnvVars(classpath, System.getenv());
 +    
 +    // Try to make a URI out of the classpath
 +    URI uri = null;
 +    try {
 +      uri = new URI(classpath);
 +    } catch (URISyntaxException e) {
 +      // Not a valid URI
 +    }
 +    
 +    if (null == uri || !uri.isAbsolute() || (null != uri.getScheme() && uri.getScheme().equals("file://"))) {
 +      // Then treat this URI as a File.
 +      // This checks to see if the url string is a dir if it expand and get all jars in that directory
 +      final File extDir = new File(classpath);
 +      if (extDir.isDirectory())
 +        urls.add(extDir.toURI().toURL());
 +      else {
 +        if (extDir.getParentFile() != null) {
 +          File[] extJars = extDir.getParentFile().listFiles(new FilenameFilter() {
 +            @Override
 +            public boolean accept(File dir, String name) {
 +              return name.matches("^" + extDir.getName());
 +            }
 +          });
 +          if (extJars != null && extJars.length > 0) {
 +            for (File jar : extJars)
 +              urls.add(jar.toURI().toURL());
 +          } else {
 +            log.debug("ignoring classpath entry " + classpath);
 +          }
 +        } else {
 +          log.debug("ignoring classpath entry " + classpath);
 +        }
 +      }
 +    } else {
 +      urls.add(uri.toURL());
 +    }
 +    
 +  }
 +  
 +  private static ArrayList<URL> findAccumuloURLs() throws IOException {
 +    String cp = getAccumuloString(AccumuloClassLoader.CLASSPATH_PROPERTY_NAME, AccumuloClassLoader.ACCUMULO_CLASSPATH_VALUE);
 +    if (cp == null)
 +      return new ArrayList<URL>();
 +    String[] cps = replaceEnvVars(cp, System.getenv()).split(",");
 +    ArrayList<URL> urls = new ArrayList<URL>();
 +    for (String classpath : cps) {
 +      if (!classpath.startsWith("#")) {
 +        addUrl(classpath, urls);
 +      }
 +    }
 +    return urls;
 +  }
 +  
 +  public static synchronized ClassLoader getClassLoader() throws IOException {
 +    if (classloader == null) {
 +      ArrayList<URL> urls = findAccumuloURLs();
 +      
 +      ClassLoader parentClassLoader = AccumuloClassLoader.class.getClassLoader();
 +      
 +      log.debug("Create 2nd tier ClassLoader using URLs: " + urls.toString());
 +      URLClassLoader aClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentClassLoader) {
 +        @Override
 +        protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
 +          
 +          if (name.startsWith("org.apache.accumulo.start.classloader.vfs")) {
 +            Class<?> c = findLoadedClass(name);
 +            if (c == null) {
 +              try {
 +                // try finding this class here instead of parent
 +                findClass(name);
 +              } catch (ClassNotFoundException e) {}
 +            }
 +          }
 +          return super.loadClass(name, resolve);
 +        }
 +      };
 +      classloader = aClassLoader;
 +    }
 +    
 +    return classloader;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/start/src/main/java/org/apache/accumulo/start/classloader/vfs/ContextManager.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/ContextManager.java
index 7742cbe,0000000..e1ff55e
mode 100644,000000..100644
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/ContextManager.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/ContextManager.java
@@@ -1,207 -1,0 +1,205 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.start.classloader.vfs;
 +
 +import java.io.IOException;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +
 +import org.apache.commons.vfs2.FileSystemException;
 +import org.apache.commons.vfs2.FileSystemManager;
 +
 +public class ContextManager {
 +  
 +  // there is a lock per context so that one context can initialize w/o blocking another context
 +  private class Context {
 +    AccumuloReloadingVFSClassLoader loader;
 +    ContextConfig cconfig;
 +    boolean closed = false;
 +    
 +    Context(ContextConfig cconfig) {
 +      this.cconfig = cconfig;
 +    }
 +    
 +    synchronized ClassLoader getClassLoader() throws FileSystemException {
 +      if (closed)
 +        return null;
 +      
 +      if (loader == null) {
 +        loader = new AccumuloReloadingVFSClassLoader(cconfig.uris, vfs, parent, cconfig.preDelegation);
 +      }
 +      
 +      return loader.getClassLoader();
 +    }
 +    
 +    synchronized void close() {
 +      closed = true;
 +      loader.close();
 +      loader = null;
 +    }
 +  }
 +  
 +  private Map<String,Context> contexts = new HashMap<String,Context>();
 +  
 +  private volatile ContextsConfig config;
 +  private FileSystemManager vfs;
 +  private ReloadingClassLoader parent;
 +  
 +  ContextManager(FileSystemManager vfs, ReloadingClassLoader parent) {
 +    this.vfs = vfs;
 +    this.parent = parent;
 +  }
 +  
 +  public static class ContextConfig {
 +    String uris;
 +    boolean preDelegation;
 +    
 +    public ContextConfig(String uris, boolean preDelegation) {
 +      this.uris = uris;
 +      this.preDelegation = preDelegation;
 +    }
 +    
 +    @Override
 +    public boolean equals(Object o) {
 +      if (o instanceof ContextConfig) {
 +        ContextConfig oc = (ContextConfig) o;
 +        
 +        return uris.equals(oc.uris) && preDelegation == oc.preDelegation;
 +      }
 +      
 +      return false;
 +    }
 +    
 +    @Override
 +    public int hashCode() {
 +      return uris.hashCode() + (preDelegation ? Boolean.TRUE : Boolean.FALSE).hashCode();
 +    }
 +  }
 +  
 +  public interface ContextsConfig {
 +    ContextConfig getContextConfig(String context);
 +  }
 +  
 +  public static class DefaultContextsConfig implements ContextsConfig {
 +    
 +    private Iterable<Entry<String,String>> config;
 +    
 +    public DefaultContextsConfig(Iterable<Entry<String,String>> config) {
 +      this.config = config;
 +    }
 +    
 +    @Override
 +    public ContextConfig getContextConfig(String context) {
 +      
 +      String key = AccumuloVFSClassLoader.VFS_CONTEXT_CLASSPATH_PROPERTY + context;
 +      
 +      String uris = null;
 +      boolean preDelegate = true;
 +      
 +      Iterator<Entry<String,String>> iter = config.iterator();
 +      while (iter.hasNext()) {
 +        Entry<String,String> entry = iter.next();
 +        if (entry.getKey().equals(key)) {
 +          uris = entry.getValue();
 +        }
 +        
 +        if (entry.getKey().equals(key + ".delegation") && entry.getValue().trim().equalsIgnoreCase("post")) {
 +          preDelegate = false;
 +        }
 +      }
 +      
 +      if (uris != null)
 +        return new ContextConfig(uris, preDelegate);
 +      
 +      return null;
 +    }
 +  }
 +
 +  /**
 +   * configuration must be injected for ContextManager to work
-    * 
-    * @param config
 +   */
 +  public synchronized void setContextConfig(ContextsConfig config) {
 +    if (this.config != null)
 +      throw new IllegalStateException("Context manager config already set");
 +    this.config = config;
 +  }
 +  
 +  public ClassLoader getClassLoader(String contextName) throws FileSystemException {
 +    
 +    ContextConfig cconfig = config.getContextConfig(contextName);
 +    
 +    if (cconfig == null)
 +      throw new IllegalArgumentException("Unknown context " + contextName);
 +    
 +    Context context = null;
 +    Context contextToClose = null;
 +    
 +    synchronized (this) {
 +      // only manipulate internal data structs in this sync block... avoid creating or closing classloader, reading config, etc... basically avoid operations
 +      // that may block
 +      context = contexts.get(contextName);
 +      
 +      if (context == null) {
 +        context = new Context(cconfig);
 +        contexts.put(contextName, context);
 +      } else if (!context.cconfig.equals(cconfig)) {
 +        contextToClose = context;
 +        context = new Context(cconfig);
 +        contexts.put(contextName, context);
 +      }
 +    }
 +    
 +    if (contextToClose != null)
 +      contextToClose.close();
 +    
 +    ClassLoader loader = context.getClassLoader();
 +    if (loader == null) {
 +      // ooppss, context was closed by another thread, try again
 +      return getClassLoader(contextName);
 +    }
 +    
 +    return loader;
 +    
 +  }
 +  
 +  public <U> Class<? extends U> loadClass(String context, String classname, Class<U> extension) throws ClassNotFoundException {
 +    try {
 +      return getClassLoader(context).loadClass(classname).asSubclass(extension);
 +    } catch (IOException e) {
 +      throw new ClassNotFoundException("IO Error loading class " + classname, e);
 +    }
 +  }
 +  
 +  public void removeUnusedContexts(Set<String> inUse) {
 +    
 +    Map<String,Context> unused;
 +    
 +    synchronized (this) {
 +      unused = new HashMap<String,Context>(contexts);
 +      unused.keySet().removeAll(inUse);
 +      contexts.keySet().removeAll(unused.keySet());
 +    }
 +    
 +    for (Context context : unused.values()) {
 +      // close outside of lock
 +      context.close();
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
index 0a6931f,0000000..277c741
mode 100644,000000..100644
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
@@@ -1,52 -1,0 +1,47 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.start.classloader.vfs;
 +
 +import org.apache.commons.vfs2.FileObject;
 +import org.apache.commons.vfs2.FileSystemException;
 +import org.apache.commons.vfs2.FileSystemManager;
 +import org.apache.commons.vfs2.impl.VFSClassLoader;
 +
 +/**
 + * 
 + */
 +public class PostDelegatingVFSClassLoader extends VFSClassLoader {
 +  
-   /**
-    * @param files
-    * @param manager
-    * @param parent
-    * @throws FileSystemException
-    */
 +  public PostDelegatingVFSClassLoader(FileObject[] files, FileSystemManager manager, ClassLoader parent) throws FileSystemException {
 +    super(files, manager, parent);
 +  }
 +  
++  @Override
 +  protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
 +    Class<?> c = findLoadedClass(name);
 +    if (c == null) {
 +      try {
 +        // try finding this class here instead of parent
 +        findClass(name);
 +      } catch (ClassNotFoundException e) {
 +
 +      }
 +    }
 +    return super.loadClass(name, resolve);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
index 92b2720,0000000..104ea09
mode 100644,000000..100644
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
@@@ -1,164 -1,0 +1,159 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.start.classloader.vfs.providers;
 +
 +import java.io.IOException;
 +import java.io.UnsupportedEncodingException;
 +import java.net.URLDecoder;
 +import java.util.Collection;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.commons.vfs2.CacheStrategy;
 +import org.apache.commons.vfs2.Capability;
 +import org.apache.commons.vfs2.FileName;
 +import org.apache.commons.vfs2.FileObject;
 +import org.apache.commons.vfs2.FileSystemException;
 +import org.apache.commons.vfs2.FileSystemOptions;
 +import org.apache.commons.vfs2.provider.AbstractFileName;
 +import org.apache.commons.vfs2.provider.AbstractFileSystem;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +
 +/**
 + * A VFS FileSystem that interacts with HDFS.
 + * 
 + * @since 2.1
 + */
 +public class HdfsFileSystem extends AbstractFileSystem
 +{
 +    private static final Log log = LogFactory.getLog(HdfsFileSystem.class);
 +
 +    private FileSystem fs;
 +
-     /**
-      * 
-      * @param rootName
-      * @param fileSystemOptions
-      */
 +    protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions)
 +    {
 +        super(rootName, null, fileSystemOptions);
 +    }
 +
 +    /**
 +     * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(java.util.Collection)
 +     */
 +    @Override
 +    protected void addCapabilities(final Collection<Capability> capabilities)
 +    {
 +        capabilities.addAll(HdfsFileProvider.CAPABILITIES);
 +    }
 +
 +    /**
 +     * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close()
 +     */
 +    @Override
 +    public void close()
 +    {
 +        try
 +        {
 +            if (null != fs)
 +            {
 +                fs.close();
 +            }
 +        }
 +        catch (final IOException e)
 +        {
 +            throw new RuntimeException("Error closing HDFS client", e);
 +        }
 +        super.close();
 +    }
 +
 +    /**
 +     * @see org.apache.commons.vfs2.provider.AbstractFileSystem#createFile(org.apache.commons.vfs2.provider.AbstractFileName)
 +     */
 +    @Override
 +    protected FileObject createFile(final AbstractFileName name) throws Exception
 +    {
 +        throw new FileSystemException("Operation not supported");
 +    }
 +
 +    /**
 +     * @see org.apache.commons.vfs2.provider.AbstractFileSystem#resolveFile(org.apache.commons.vfs2.FileName)
 +     */
 +    @Override
 +    public FileObject resolveFile(final FileName name) throws FileSystemException
 +    {
 +
 +        synchronized (this)
 +        {
 +            if (null == this.fs)
 +            {
 +                final String hdfsUri = name.getRootURI();
 +                final Configuration conf = new Configuration(true);
 +                conf.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsUri);
 +                this.fs = null;
 +                try
 +                {
 +                    fs = org.apache.hadoop.fs.FileSystem.get(conf);
 +                }
 +                catch (final IOException e)
 +                {
 +                    log.error("Error connecting to filesystem " + hdfsUri, e);
 +                    throw new FileSystemException("Error connecting to filesystem " + hdfsUri, e);
 +                }
 +            }
 +        }
 +
 +        boolean useCache = (null != getContext().getFileSystemManager().getFilesCache());
 +        FileObject file;
 +        if (useCache)
 +        {
 +            file = this.getFileFromCache(name);
 +        }
 +        else
 +        {
 +            file = null;
 +        }
 +        if (null == file)
 +        {
 +            String path = null;
 +            try
 +            {
 +                path = URLDecoder.decode(name.getPath(), "UTF-8");
 +            }
 +            catch (final UnsupportedEncodingException e)
 +            {
 +                path = name.getPath();
 +            }
 +            final Path filePath = new Path(path);
 +            file = new HdfsFileObject((AbstractFileName) name, this, fs, filePath);
 +            if (useCache)
 +            {
 +        this.putFileToCache(file);
 +            }
 +      
 +    }
 +    
 +    /**
 +     * resync the file information if requested
 +     */
 +    if (getFileSystemManager().getCacheStrategy().equals(CacheStrategy.ON_RESOLVE)) {
 +      file.refresh();
 +    }
 +    
 +    return file;
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
index 65cf80c,0000000..7b1313a
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
@@@ -1,129 -1,0 +1,120 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
- import java.io.IOException;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.accumulo.core.client.impl.MasterClient;
- import org.apache.accumulo.core.master.MasterNotRunningException;
 +import org.apache.accumulo.core.master.thrift.MasterClientService;
 +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 +import org.apache.accumulo.core.master.thrift.RecoveryStatus;
 +import org.apache.accumulo.core.master.thrift.TableInfo;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.monitor.Monitor;
 +import org.apache.accumulo.server.security.SecurityConstants;
- import org.apache.thrift.transport.TTransportException;
 +
 +public class GetMasterStats {
-   /**
-    * @param args
-    * @throws MasterNotRunningException
-    * @throws IOException
-    * @throws TTransportException
-    */
 +  public static void main(String[] args) throws Exception {
 +    MasterClientService.Iface client = null;
 +    MasterMonitorInfo stats = null;
 +    try {
 +      client = MasterClient.getConnectionWithRetry(HdfsZooInstance.getInstance());
 +      stats = client.getMasterStats(Tracer.traceInfo(), SecurityConstants.getSystemCredentials());
 +    } finally {
 +      if (client != null)
 +        MasterClient.close(client);
 +    }
 +    out(0, "State: " + stats.state.name());
 +    out(0, "Goal State: " + stats.goalState.name());
 +    if (stats.serversShuttingDown != null && stats.serversShuttingDown.size() > 0) {
 +      out(0, "Servers to shutdown");
 +      for (String server : stats.serversShuttingDown) {
 +        out(1, "%s", server);
 +      }
 +    }
 +    out(0, "Unassigned tablets: %d", stats.unassignedTablets);
 +    if (stats.badTServers != null && stats.badTServers.size() > 0) {
 +      out(0, "Bad servers");
 +      
 +      for (Entry<String,Byte> entry : stats.badTServers.entrySet()) {
 +        out(1, "%s: %d", entry.getKey(), (int) entry.getValue());
 +      }
 +    }
 +    if (stats.tableMap != null && stats.tableMap.size() > 0) {
 +      out(0, "Tables");
 +      for (Entry<String,TableInfo> entry : stats.tableMap.entrySet()) {
 +        TableInfo v = entry.getValue();
 +        out(1, "%s", entry.getKey());
 +        out(2, "Records: %d", v.recs);
 +        out(2, "Records in Memory: %d", v.recsInMemory);
 +        out(2, "Tablets: %d", v.tablets);
 +        out(2, "Online Tablets: %d", v.onlineTablets);
 +        out(2, "Ingest Rate: %.2f", v.ingestRate);
 +        out(2, "Query Rate: %.2f", v.queryRate);
 +      }
 +    }
 +    if (stats.tServerInfo != null && stats.tServerInfo.size() > 0) {
 +      out(0, "Tablet Servers");
 +      long now = System.currentTimeMillis();
 +      for (TabletServerStatus server : stats.tServerInfo) {
 +        TableInfo summary = Monitor.summarizeTableStats(server);
 +        out(1, "Name: %s", server.name);
 +        out(2, "Ingest: %.2f", summary.ingestRate);
 +        out(2, "Last Contact: %s", server.lastContact);
 +        out(2, "OS Load Average: %.2f", server.osLoad);
 +        out(2, "Queries: %.2f", summary.queryRate);
 +        out(2, "Time Difference: %.1f", ((now - server.lastContact) / 1000.));
 +        out(2, "Total Records: %d", summary.recs);
 +        out(2, "Lookups: %d", server.lookups);
 +        if (server.holdTime > 0)
 +          out(2, "Hold Time: %d", server.holdTime);
 +        if (server.tableMap != null && server.tableMap.size() > 0) {
 +          out(2, "Tables");
 +          for (Entry<String,TableInfo> status : server.tableMap.entrySet()) {
 +            TableInfo info = status.getValue();
 +            out(3, "Table: %s", status.getKey());
 +            out(4, "Tablets: %d", info.onlineTablets);
 +            out(4, "Records: %d", info.recs);
 +            out(4, "Records in Memory: %d", info.recsInMemory);
 +            out(4, "Ingest: %.2f", info.ingestRate);
 +            out(4, "Queries: %.2f", info.queryRate);
 +            out(4, "Major Compacting: %d", info.majors == null ? 0 : info.majors.running);
 +            out(4, "Queued for Major Compaction: %d", info.majors == null ? 0 : info.majors.queued);
 +            out(4, "Minor Compacting: %d", info.minors == null ? 0 : info.minors.running);
 +            out(4, "Queued for Minor Compaction: %d", info.minors == null ? 0 : info.minors.queued);
 +          }
 +        }
 +        out(2, "Recoveries: %d", server.logSorts.size());
 +        for (RecoveryStatus sort : server.logSorts) {
 +          out(3, "File: %s", sort.name);
 +          out(3, "Progress: %.2f%%", sort.progress * 100);
 +          out(3, "Time running: %s", sort.runtime / 1000.);
 +        }
 +      }
 +    }
 +  }
 +  
 +  private static void out(int indent, String string, Object... args) {
 +    for (int i = 0; i < indent; i++) {
 +      System.out.print(" ");
 +    }
 +    System.out.println(String.format(string, args));
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
index 73b73f4,0000000..16e7a98
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
@@@ -1,198 -1,0 +1,195 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.concurrent.ConcurrentSkipListMap;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.util.FastFormat;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.tabletserver.NativeMap;
 +import org.apache.hadoop.io.Text;
 +
 +public class NativeMapPerformanceTest {
 +  
 +  private static final byte ROW_PREFIX[] = new byte[] {'r'};
 +  private static final byte COL_PREFIX[] = new byte[] {'c'};
 +  
 +  static Key nk(int r, int c) {
 +    return new Key(new Text(FastFormat.toZeroPaddedString(r, 9, 10, ROW_PREFIX)), new Text(FastFormat.toZeroPaddedString(c, 6, 10, COL_PREFIX)));
 +  }
 +  
 +  static Mutation nm(int r) {
 +    return new Mutation(new Text(FastFormat.toZeroPaddedString(r, 9, 10, ROW_PREFIX)));
 +  }
 +  
 +  static Text ET = new Text();
 +  
 +  private static void pc(Mutation m, int c, Value v) {
 +    m.put(new Text(FastFormat.toZeroPaddedString(c, 6, 10, COL_PREFIX)), ET, Long.MAX_VALUE, v);
 +  }
 +  
 +  static void runPerformanceTest(int numRows, int numCols, int numLookups, String mapType) {
 +    
 +    SortedMap<Key,Value> tm = null;
 +    NativeMap nm = null;
 +    
 +    if (mapType.equals("SKIP_LIST"))
 +      tm = new ConcurrentSkipListMap<Key,Value>();
 +    else if (mapType.equals("TREE_MAP"))
 +      tm = Collections.synchronizedSortedMap(new TreeMap<Key,Value>());
 +    else if (mapType.equals("NATIVE_MAP"))
 +      nm = new NativeMap();
 +    else
 +      throw new IllegalArgumentException(" map type must be SKIP_LIST, TREE_MAP, or NATIVE_MAP");
 +    
 +    Random rand = new Random(19);
 +    
 +    // puts
 +    long tps = System.currentTimeMillis();
 +    
 +    if (nm != null) {
 +      for (int i = 0; i < numRows; i++) {
 +        int row = rand.nextInt(1000000000);
 +        Mutation m = nm(row);
 +        for (int j = 0; j < numCols; j++) {
 +          int col = rand.nextInt(1000000);
 +          Value val = new Value("test".getBytes(Constants.UTF8));
 +          pc(m, col, val);
 +        }
 +        nm.mutate(m, i);
 +      }
 +    } else {
 +      for (int i = 0; i < numRows; i++) {
 +        int row = rand.nextInt(1000000000);
 +        for (int j = 0; j < numCols; j++) {
 +          int col = rand.nextInt(1000000);
 +          Key key = nk(row, col);
 +          Value val = new Value("test".getBytes(Constants.UTF8));
 +          tm.put(key, val);
 +        }
 +      }
 +    }
 +    
 +    long tpe = System.currentTimeMillis();
 +    
 +    // Iteration
 +    Iterator<Entry<Key,Value>> iter;
 +    if (nm != null) {
 +      iter = nm.iterator();
 +    } else {
 +      iter = tm.entrySet().iterator();
 +    }
 +    
 +    long tis = System.currentTimeMillis();
 +    
 +    while (iter.hasNext()) {
 +      iter.next();
 +    }
 +    
 +    long tie = System.currentTimeMillis();
 +    
 +    rand = new Random(19);
 +    int rowsToLookup[] = new int[numLookups];
 +    int colsToLookup[] = new int[numLookups];
 +    for (int i = 0; i < Math.min(numLookups, numRows); i++) {
 +      int row = rand.nextInt(1000000000);
 +      int col = -1;
 +      for (int j = 0; j < numCols; j++) {
 +        col = rand.nextInt(1000000);
 +      }
 +      
 +      rowsToLookup[i] = row;
 +      colsToLookup[i] = col;
 +    }
 +    
 +    // get
 +    
 +    long tgs = System.currentTimeMillis();
 +    if (nm != null) {
 +      for (int i = 0; i < numLookups; i++) {
 +        Key key = nk(rowsToLookup[i], colsToLookup[i]);
 +        if (nm.get(key) == null) {
 +          throw new RuntimeException("Did not find " + rowsToLookup[i] + " " + colsToLookup[i] + " " + i);
 +        }
 +      }
 +    } else {
 +      for (int i = 0; i < numLookups; i++) {
 +        Key key = nk(rowsToLookup[i], colsToLookup[i]);
 +        if (tm.get(key) == null) {
 +          throw new RuntimeException("Did not find " + rowsToLookup[i] + " " + colsToLookup[i] + " " + i);
 +        }
 +      }
 +    }
 +    long tge = System.currentTimeMillis();
 +    
 +    long memUsed = 0;
 +    if (nm != null) {
 +      memUsed = nm.getMemoryUsed();
 +    }
 +    
 +    int size = (nm == null ? tm.size() : nm.size());
 +    
 +    // delete
 +    long tds = System.currentTimeMillis();
 +    
 +    if (nm != null)
 +      nm.delete();
 +    
 +    long tde = System.currentTimeMillis();
 +    
 +    if (tm != null)
 +      tm.clear();
 +    
 +    System.gc();
 +    System.gc();
 +    System.gc();
 +    System.gc();
 +    
 +    UtilWaitThread.sleep(3000);
 +    
 +    System.out.printf("mapType:%10s   put rate:%,6.2f  scan rate:%,6.2f  get rate:%,6.2f  delete time : %6.2f  mem : %,d%n", "" + mapType, (numRows * numCols)
 +        / ((tpe - tps) / 1000.0), (size) / ((tie - tis) / 1000.0), numLookups / ((tge - tgs) / 1000.0), (tde - tds) / 1000.0, memUsed);
 +    
 +  }
 +  
-   /**
-    * @param args
-    */
 +  public static void main(String[] args) {
 +    
 +    if (args.length != 3) {
 +      throw new IllegalArgumentException("Usage : " + NativeMapPerformanceTest.class.getName() + " <map type> <rows> <columns>");
 +    }
 +    
 +    String mapType = args[0];
 +    int rows = Integer.parseInt(args[1]);
 +    int cols = Integer.parseInt(args[2]);
 +    
 +    runPerformanceTest(rows, cols, 10000, mapType);
 +    runPerformanceTest(rows, cols, 10000, mapType);
 +    runPerformanceTest(rows, cols, 10000, mapType);
 +    
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
index 8411c86,0000000..5115541
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
@@@ -1,277 -1,0 +1,274 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.util.OpTimer;
 +import org.apache.accumulo.server.tabletserver.NativeMap;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +
 +public class NativeMapStressTest {
 +  
 +  private static final Logger log = Logger.getLogger(NativeMapStressTest.class);
 +  
-   /**
-    * @param args
-    */
 +  public static void main(String[] args) {
 +    testLotsOfMapDeletes(true);
 +    testLotsOfMapDeletes(false);
 +    testLotsOfOverwrites();
 +    testLotsOfGetsAndScans();
 +  }
 +  
 +  private static void put(NativeMap nm, String row, String val, int mc) {
 +    Mutation m = new Mutation(new Text(row));
 +    m.put(new Text(), new Text(), Long.MAX_VALUE, new Value(val.getBytes(Constants.UTF8)));
 +    nm.mutate(m, mc);
 +  }
 +  
 +  private static void testLotsOfGetsAndScans() {
 +    
 +    ArrayList<Thread> threads = new ArrayList<Thread>();
 +    
 +    final int numThreads = 8;
 +    final int totalGets = 100000000;
 +    final int mapSizePerThread = (int) (4000000 / (double) numThreads);
 +    final int getsPerThread = (int) (totalGets / (double) numThreads);
 +    
 +    for (int tCount = 0; tCount < numThreads; tCount++) {
 +      Runnable r = new Runnable() {
 +        @Override
 +        public void run() {
 +          NativeMap nm = new NativeMap();
 +          
 +          Random r = new Random();
 +          
 +          OpTimer opTimer = new OpTimer(log, Level.INFO);
 +          
 +          opTimer.start("Creating map of size " + mapSizePerThread);
 +          
 +          for (int i = 0; i < mapSizePerThread; i++) {
 +            String row = String.format("r%08d", i);
 +            String val = row + "v";
 +            put(nm, row, val, i);
 +          }
 +          
 +          opTimer.stop("Created map of size " + nm.size() + " in %DURATION%");
 +          
 +          opTimer.start("Doing " + getsPerThread + " gets()");
 +          
 +          for (int i = 0; i < getsPerThread; i++) {
 +            String row = String.format("r%08d", r.nextInt(mapSizePerThread));
 +            String val = row + "v";
 +            
 +            Value value = nm.get(new Key(new Text(row)));
 +            if (value == null || !value.toString().equals(val)) {
 +              log.error("nm.get(" + row + ") failed");
 +            }
 +          }
 +          
 +          opTimer.stop("Finished " + getsPerThread + " gets in %DURATION%");
 +          
 +          int scanned = 0;
 +          
 +          opTimer.start("Doing " + getsPerThread + " random iterations");
 +          
 +          for (int i = 0; i < getsPerThread; i++) {
 +            int startRow = r.nextInt(mapSizePerThread);
 +            String row = String.format("r%08d", startRow);
 +            
 +            Iterator<Entry<Key,Value>> iter = nm.iterator(new Key(new Text(row)));
 +            
 +            int count = 0;
 +            
 +            while (iter.hasNext() && count < 10) {
 +              String row2 = String.format("r%08d", startRow + count);
 +              String val2 = row2 + "v";
 +              
 +              Entry<Key,Value> entry = iter.next();
 +              if (!entry.getValue().toString().equals(val2) || !entry.getKey().equals(new Key(new Text(row2)))) {
 +                log.error("nm.iter(" + row2 + ") failed row = " + row + " count = " + count + " row2 = " + row + " val2 = " + val2);
 +              }
 +              
 +              count++;
 +            }
 +            
 +            scanned += count;
 +          }
 +          
 +          opTimer.stop("Finished " + getsPerThread + " random iterations (scanned = " + scanned + ") in %DURATION%");
 +          
 +          nm.delete();
 +        }
 +      };
 +      
 +      Thread t = new Thread(r);
 +      t.start();
 +      
 +      threads.add(t);
 +    }
 +    
 +    for (Thread thread : threads) {
 +      try {
 +        thread.join();
 +      } catch (InterruptedException e) {
 +        e.printStackTrace();
 +        throw new RuntimeException(e);
 +      }
 +    }
 +  }
 +  
 +  private static void testLotsOfMapDeletes(final boolean doRemoves) {
 +    final int numThreads = 8;
 +    final int rowRange = 10000;
 +    final int mapsPerThread = 50;
 +    final int totalInserts = 100000000;
 +    final int insertsPerMapPerThread = (int) (totalInserts / (double) numThreads / mapsPerThread);
 +    
 +    System.out.println("insertsPerMapPerThread " + insertsPerMapPerThread);
 +    
 +    ArrayList<Thread> threads = new ArrayList<Thread>();
 +    
 +    for (int i = 0; i < numThreads; i++) {
 +      Runnable r = new Runnable() {
 +        @Override
 +        public void run() {
 +          
 +          int inserts = 0;
 +          int removes = 0;
 +          
 +          for (int i = 0; i < mapsPerThread; i++) {
 +            
 +            NativeMap nm = new NativeMap();
 +            
 +            for (int j = 0; j < insertsPerMapPerThread; j++) {
 +              String row = String.format("r%08d", j % rowRange);
 +              String val = row + "v";
 +              put(nm, row, val, j);
 +              inserts++;
 +            }
 +            
 +            if (doRemoves) {
 +              Iterator<Entry<Key,Value>> iter = nm.iterator();
 +              while (iter.hasNext()) {
 +                iter.next();
 +                iter.remove();
 +                removes++;
 +              }
 +            }
 +            
 +            nm.delete();
 +          }
 +          
 +          System.out.println("inserts " + inserts + " removes " + removes + " " + Thread.currentThread().getName());
 +        }
 +      };
 +      
 +      Thread t = new Thread(r);
 +      t.start();
 +      
 +      threads.add(t);
 +    }
 +    
 +    for (Thread thread : threads) {
 +      try {
 +        thread.join();
 +      } catch (InterruptedException e) {
 +        e.printStackTrace();
 +        throw new RuntimeException(e);
 +      }
 +    }
 +  }
 +  
 +  private static void testLotsOfOverwrites() {
 +    final Map<Integer,NativeMap> nativeMaps = new HashMap<Integer,NativeMap>();
 +    
 +    int numThreads = 8;
 +    final int insertsPerThread = (int) (100000000 / (double) numThreads);
 +    final int rowRange = 10000;
 +    final int numMaps = 50;
 +    
 +    ArrayList<Thread> threads = new ArrayList<Thread>();
 +    
 +    for (int i = 0; i < numThreads; i++) {
 +      Runnable r = new Runnable() {
 +        @Override
 +        public void run() {
 +          Random r = new Random();
 +          int inserts = 0;
 +          
 +          for (int i = 0; i < insertsPerThread / 100.0; i++) {
 +            int map = r.nextInt(numMaps);
 +            
 +            NativeMap nm;
 +            
 +            synchronized (nativeMaps) {
 +              nm = nativeMaps.get(map);
 +              if (nm == null) {
 +                nm = new NativeMap();
 +                nativeMaps.put(map, nm);
 +                
 +              }
 +            }
 +            
 +            synchronized (nm) {
 +              for (int j = 0; j < 100; j++) {
 +                String row = String.format("r%08d", r.nextInt(rowRange));
 +                String val = row + "v";
 +                put(nm, row, val, j);
 +                inserts++;
 +              }
 +            }
 +          }
 +          
 +          System.out.println("inserts " + inserts + " " + Thread.currentThread().getName());
 +        }
 +      };
 +      
 +      Thread t = new Thread(r);
 +      t.start();
 +      
 +      threads.add(t);
 +    }
 +    
 +    for (Thread thread : threads) {
 +      try {
 +        thread.join();
 +      } catch (InterruptedException e) {
 +        e.printStackTrace();
 +        throw new RuntimeException(e);
 +      }
 +    }
 +    
 +    Set<Entry<Integer,NativeMap>> es = nativeMaps.entrySet();
 +    for (Entry<Integer,NativeMap> entry : es) {
 +      entry.getValue().delete();
 +    }
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
index a35ca66,0000000..0d52f12
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
@@@ -1,181 -1,0 +1,180 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.io.IOException;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.UUID;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.server.util.reflection.CounterUtils;
 +import org.apache.accumulo.test.continuous.ContinuousIngest.BaseOpts;
 +import org.apache.accumulo.test.continuous.ContinuousIngest.ShortConverter;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.WritableComparator;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +
 +import com.beust.jcommander.Parameter;
 +import com.beust.jcommander.validators.PositiveInteger;
 +
 +/**
 + * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to
 + * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes.
 + * 
 + */
 +public class ContinuousMoru extends Configured implements Tool {
 +  private static final String PREFIX = ContinuousMoru.class.getSimpleName() + ".";
 +  private static final String MAX_CQ = PREFIX + "MAX_CQ";
 +  private static final String MAX_CF = PREFIX + "MAX_CF";
 +  private static final String MAX = PREFIX + "MAX";
 +  private static final String MIN = PREFIX + "MIN";
 +  private static final String CI_ID = PREFIX + "CI_ID";
 +  
 +  static enum Counts {
 +    SELF_READ;
 +  }
 +  
 +  public static class CMapper extends Mapper<Key,Value,Text,Mutation> {
 +    
 +    private short max_cf;
 +    private short max_cq;
 +    private Random random;
 +    private String ingestInstanceId;
 +    private byte[] iiId;
 +    private long count;
 +    
 +    private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility();
 +    
 +    @Override
 +    public void setup(Context context) throws IOException, InterruptedException {
 +      int max_cf = context.getConfiguration().getInt(MAX_CF, -1);
 +      int max_cq = context.getConfiguration().getInt(MAX_CQ, -1);
 +      
 +      if (max_cf > Short.MAX_VALUE || max_cq > Short.MAX_VALUE)
 +        throw new IllegalArgumentException();
 +      
 +      this.max_cf = (short) max_cf;
 +      this.max_cq = (short) max_cq;
 +      
 +      random = new Random();
 +      ingestInstanceId = context.getConfiguration().get(CI_ID);
 +      iiId = ingestInstanceId.getBytes(Constants.UTF8);
 +      
 +      count = 0;
 +    }
 +    
 +    @Override
 +    public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
 +      
 +      ContinuousWalk.validate(key, data);
 +      
 +      if (WritableComparator.compareBytes(iiId, 0, iiId.length, data.get(), 0, iiId.length) != 0) {
 +        // only rewrite data not written by this M/R job
 +        byte[] val = data.get();
 +        
 +        int offset = ContinuousWalk.getPrevRowOffset(val);
 +        if (offset > 0) {
 +          long rowLong = Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16);
 +          Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData()
 +              .toArray(), random, true);
 +          context.write(null, m);
 +        }
 +        
 +      } else {
 +        CounterUtils.increment(context.getCounter(Counts.SELF_READ));
 +      }
 +    }
 +  }
 +  
 +  static class Opts extends BaseOpts {
 +    @Parameter(names = "--maxColF", description = "maximum column family value to use", converter=ShortConverter.class)
 +    short maxColF = Short.MAX_VALUE;
 +    
 +    @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter=ShortConverter.class)
 +    short maxColQ = Short.MAX_VALUE;
 +    
 +    @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
 +    int maxMaps = 0;
 +  }
 +  
 +  @Override
 +  public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException {
 +    Opts opts = new Opts();
 +    BatchWriterOpts bwOpts = new BatchWriterOpts();
 +    opts.parseArgs(ContinuousMoru.class.getName(), args, bwOpts);
 +    
 +    Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
 +    job.setJarByClass(this.getClass());
 +    
 +    job.setInputFormatClass(AccumuloInputFormat.class);
 +    opts.setAccumuloConfigs(job);
 +    
 +    // set up ranges
 +    try {
 +      Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +      AccumuloInputFormat.setRanges(job, ranges);
 +      AccumuloInputFormat.setAutoAdjustRanges(job, false);
 +    } catch (Exception e) {
 +      throw new IOException(e);
 +    }
 +    
 +    job.setMapperClass(CMapper.class);
 +    
 +    job.setNumReduceTasks(0);
 +    
 +    job.setOutputFormatClass(AccumuloOutputFormat.class);
 +    AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig());
 +    
 +    Configuration conf = job.getConfiguration();
 +    conf.setLong(MIN, opts.min);
 +    conf.setLong(MAX, opts.max);
 +    conf.setInt(MAX_CF, opts.maxColF);
 +    conf.setInt(MAX_CQ, opts.maxColQ);
 +    conf.set(CI_ID, UUID.randomUUID().toString());
 +    
 +    job.waitForCompletion(true);
 +    opts.stopTracing();
 +    return job.isSuccessful() ? 0 : 1;
 +  }
 +  
 +  /**
 +   * 
 +   * @param args
 +   *          instanceName zookeepers username password table columns outputpath
-    * @throws Exception
 +   */
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousMoru(), args);
 +    if (res != 0)
 +      System.exit(res);
 +  }
 +}


Mime
View raw message