accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [40/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT
Date Wed, 09 Apr 2014 17:58:11 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
----------------------------------------------------------------------
diff --cc minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
index a366c16,0000000..e2b2f83
mode 100644,000000..100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
@@@ -1,392 -1,0 +1,382 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.minicluster;
 +
 +import java.io.BufferedReader;
 +import java.io.BufferedWriter;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.InputStreamReader;
 +import java.io.OutputStreamWriter;
 +import java.io.Writer;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Properties;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.gc.SimpleGarbageCollector;
 +import org.apache.accumulo.server.master.Master;
 +import org.apache.accumulo.server.tabletserver.TabletServer;
 +import org.apache.accumulo.server.util.Initialize;
 +import org.apache.accumulo.server.util.PortUtils;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.start.Main;
 +import org.apache.zookeeper.server.ZooKeeperServerMain;
 +
 +/**
 + * A utility class that will create Zookeeper and Accumulo processes that write all of their data to a single local directory. This class makes it easy to test
 + * code against a real Accumulo instance. Its much more accurate for testing than MockAccumulo, but much slower than MockAccumulo.
 + * 
 + * @since 1.5.0
 + */
 +public class MiniAccumuloCluster {
 +  
 +  private static final String INSTANCE_SECRET = "DONTTELL";
 +  private static final String INSTANCE_NAME = "miniInstance";
 +  
 +  private static class LogWriter extends Thread {
 +    private BufferedReader in;
 +    private BufferedWriter out;
 +    
-     /**
-      * @throws IOException
-      */
 +    public LogWriter(InputStream stream, File logFile) throws IOException {
 +      this.setDaemon(true);
 +      this.in = new BufferedReader(new InputStreamReader(stream, Constants.UTF8));
 +      out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(logFile), Constants.UTF8));
 +      
 +      SimpleTimer.getInstance().schedule(new Runnable() {
 +        @Override
 +        public void run() {
 +          try {
 +            flush();
 +          } catch (IOException e) {
 +            e.printStackTrace();
 +          }
 +        }
 +      }, 1000, 1000);
 +    }
 +    
 +    public synchronized void flush() throws IOException {
 +      if (out != null)
 +        out.flush();
 +    }
 +    
 +    @Override
 +    public void run() {
 +      String line;
 +      
 +      try {
 +        while ((line = in.readLine()) != null) {
 +          out.append(line);
 +          out.append("\n");
 +        }
 +        
 +        synchronized (this) {
 +          out.close();
 +          out = null;
 +          in.close();
 +        }
 +        
 +      } catch (IOException e) {}
 +    }
 +  }
 +  
 +  private File libDir;
 +  private File libExtDir;
 +  private File confDir;
 +  private File zooKeeperDir;
 +  private File accumuloDir;
 +  private File zooCfgFile;
 +  private File logDir;
 +  private File walogDir;
 +  
 +  private Process zooKeeperProcess;
 +  private Process masterProcess;
 +  private Process gcProcess;
 +  
 +  private int zooKeeperPort;
 +  
 +  private List<LogWriter> logWriters = new ArrayList<MiniAccumuloCluster.LogWriter>();
 +  
 +  private MiniAccumuloConfig config;
 +  private Process[] tabletServerProcesses;
 +  
 +  private Process exec(Class<? extends Object> clazz, String... args) throws IOException {
 +    String javaHome = System.getProperty("java.home");
 +    String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
 +    String classpath = System.getProperty("java.class.path");
 +    
 +    classpath = confDir.getAbsolutePath() + File.pathSeparator + classpath;
 +    
 +    String className = clazz.getCanonicalName();
 +    
 +    ArrayList<String> argList = new ArrayList<String>();
 +    
 +    argList.addAll(Arrays.asList(javaBin, "-cp", classpath, "-Xmx128m", "-XX:+UseConcMarkSweepGC", "-XX:CMSInitiatingOccupancyFraction=75",
 +        "-Dapple.awt.UIElement=true", Main.class.getName(), className));
 +    
 +    argList.addAll(Arrays.asList(args));
 +    
 +    ProcessBuilder builder = new ProcessBuilder(argList);
 +    
 +    builder.environment().put("ACCUMULO_HOME", config.getDir().getAbsolutePath());
 +    builder.environment().put("ACCUMULO_LOG_DIR", logDir.getAbsolutePath());
 +    
 +    // if we're running under accumulo.start, we forward these env vars
 +    String env = System.getenv("HADOOP_PREFIX");
 +    if (env != null)
 +      builder.environment().put("HADOOP_PREFIX", env);
 +    env = System.getenv("ZOOKEEPER_HOME");
 +    if (env != null)
 +      builder.environment().put("ZOOKEEPER_HOME", env);
 +    
 +    Process process = builder.start();
 +    
 +    LogWriter lw;
 +    lw = new LogWriter(process.getErrorStream(), new File(logDir, clazz.getSimpleName() + "_" + process.hashCode() + ".err"));
 +    logWriters.add(lw);
 +    lw.start();
 +    lw = new LogWriter(process.getInputStream(), new File(logDir, clazz.getSimpleName() + "_" + process.hashCode() + ".out"));
 +    logWriters.add(lw);
 +    lw.start();
 +    
 +    return process;
 +  }
 +  
 +  private void appendProp(Writer fileWriter, Property key, String value, Map<String,String> siteConfig) throws IOException {
 +    appendProp(fileWriter, key.getKey(), value, siteConfig);
 +  }
 +  
 +  private void appendProp(Writer fileWriter, String key, String value, Map<String,String> siteConfig) throws IOException {
 +    if (!siteConfig.containsKey(key))
 +      fileWriter.append("<property><name>" + key + "</name><value>" + value + "</value></property>\n");
 +  }
 +
 +  /**
 +   * Sets a given key with a random port for the value on the site config if it doesn't already exist.
 +   */
 +  private void mergePropWithRandomPort(Map<String,String> siteConfig, String key) {
 +    if (!siteConfig.containsKey(key)) {
 +      siteConfig.put(key, "0");
 +    }
 +  }
 +  
 +  /**
 +   * 
 +   * @param dir
 +   *          An empty or nonexistant temp directoy that Accumulo and Zookeeper can store data in. Creating the directory is left to the user. Java 7, Guava,
 +   *          and Junit provide methods for creating temporary directories.
 +   * @param rootPassword
 +   *          Initial root password for instance.
-    * @throws IOException
 +   */
 +  public MiniAccumuloCluster(File dir, String rootPassword) throws IOException {
 +    this(new MiniAccumuloConfig(dir, rootPassword));
 +  }
 +  
 +  /**
 +   * @param config
 +   *          initial configuration
-    * @throws IOException
 +   */
 +  
 +  public MiniAccumuloCluster(MiniAccumuloConfig config) throws IOException {
 +    
 +    if (config.getDir().exists() && !config.getDir().isDirectory())
 +      throw new IllegalArgumentException("Must pass in directory, " + config.getDir() + " is a file");
 +    
 +    if (config.getDir().exists() && config.getDir().list().length != 0)
 +      throw new IllegalArgumentException("Directory " + config.getDir() + " is not empty");
 +    
 +    this.config = config;
 +    
 +    libDir = new File(config.getDir(), "lib");
 +    libExtDir = new File(libDir, "ext");
 +    confDir = new File(config.getDir(), "conf");
 +    accumuloDir = new File(config.getDir(), "accumulo");
 +    zooKeeperDir = new File(config.getDir(), "zookeeper");
 +    logDir = new File(config.getDir(), "logs");
 +    walogDir = new File(config.getDir(), "walogs");
 +    
 +    confDir.mkdirs();
 +    accumuloDir.mkdirs();
 +    zooKeeperDir.mkdirs();
 +    logDir.mkdirs();
 +    walogDir.mkdirs();
 +    libDir.mkdirs();
 +    
 +    // Avoid the classloader yelling that the general.dynamic.classpaths value is invalid because
 +    // $ACCUMULO_HOME/lib/ext isn't defined.
 +    libExtDir.mkdirs();
 +    
 +    zooKeeperPort = PortUtils.getRandomFreePort();
 +    
 +    File siteFile = new File(confDir, "accumulo-site.xml");
 +    
 +    OutputStreamWriter fileWriter = new OutputStreamWriter(new FileOutputStream(siteFile), Constants.UTF8);
 +    fileWriter.append("<configuration>\n");
 +    
 +    HashMap<String,String> siteConfig = new HashMap<String,String>(config.getSiteConfig());
 +    
 +    appendProp(fileWriter, Property.INSTANCE_DFS_URI, "file:///", siteConfig);
 +    appendProp(fileWriter, Property.INSTANCE_DFS_DIR, accumuloDir.getAbsolutePath(), siteConfig);
 +    appendProp(fileWriter, Property.INSTANCE_ZK_HOST, "localhost:" + zooKeeperPort, siteConfig);
 +    appendProp(fileWriter, Property.INSTANCE_SECRET, INSTANCE_SECRET, siteConfig);
 +    appendProp(fileWriter, Property.TSERV_PORTSEARCH, "true", siteConfig);
 +    appendProp(fileWriter, Property.LOGGER_DIR, walogDir.getAbsolutePath(), siteConfig);
 +    appendProp(fileWriter, Property.TSERV_DATACACHE_SIZE, "10M", siteConfig);
 +    appendProp(fileWriter, Property.TSERV_INDEXCACHE_SIZE, "10M", siteConfig);
 +    appendProp(fileWriter, Property.TSERV_MAXMEM, "50M", siteConfig);
 +    appendProp(fileWriter, Property.TSERV_WALOG_MAX_SIZE, "100M", siteConfig);
 +    appendProp(fileWriter, Property.TSERV_NATIVEMAP_ENABLED, "false", siteConfig);
 +    appendProp(fileWriter, Property.TRACE_TOKEN_PROPERTY_PREFIX + ".password", config.getRootPassword(), siteConfig);
 +    appendProp(fileWriter, Property.GC_CYCLE_DELAY, "4s", siteConfig);
 +    appendProp(fileWriter, Property.GC_CYCLE_START, "0s", siteConfig);
 +    mergePropWithRandomPort(siteConfig, Property.MASTER_CLIENTPORT.getKey());
 +    mergePropWithRandomPort(siteConfig, Property.TRACE_PORT.getKey());
 +    mergePropWithRandomPort(siteConfig, Property.TSERV_CLIENTPORT.getKey());
 +    mergePropWithRandomPort(siteConfig, Property.MONITOR_PORT.getKey());
 +    mergePropWithRandomPort(siteConfig, Property.GC_PORT.getKey());
 +    
 +    // since there is a small amount of memory, check more frequently for majc... setting may not be needed in 1.5
 +    appendProp(fileWriter, Property.TSERV_MAJC_DELAY, "3", siteConfig);
 +    
 +    // ACCUMULO-1472 -- Use the classpath, not what might be installed on the system.
 +    // We have to set *something* here, otherwise the AccumuloClassLoader will default to pulling from 
 +    // environment variables (e.g. ACCUMULO_HOME, HADOOP_HOME/PREFIX) which will result in multiple copies
 +    // of artifacts on the classpath as they'll be provided by the invoking application
 +    appendProp(fileWriter, Property.GENERAL_CLASSPATHS, libDir.getAbsolutePath() + "/[^.].*.jar", siteConfig);
 +    appendProp(fileWriter, Property.GENERAL_DYNAMIC_CLASSPATHS, libExtDir.getAbsolutePath() + "/[^.].*.jar", siteConfig);
 +
 +    for (Entry<String,String> entry : siteConfig.entrySet())
 +      fileWriter.append("<property><name>" + entry.getKey() + "</name><value>" + entry.getValue() + "</value></property>\n");
 +    fileWriter.append("</configuration>\n");
 +    fileWriter.close();
 +    
 +    zooCfgFile = new File(confDir, "zoo.cfg");
 +    fileWriter = new OutputStreamWriter(new FileOutputStream(zooCfgFile), Constants.UTF8);
 +    
 +    // zookeeper uses Properties to read its config, so use that to write in order to properly escape things like Windows paths
 +    Properties zooCfg = new Properties();
 +    zooCfg.setProperty("tickTime", "1000");
 +    zooCfg.setProperty("initLimit", "10");
 +    zooCfg.setProperty("syncLimit", "5");
 +    zooCfg.setProperty("clientPort", zooKeeperPort + "");
 +    zooCfg.setProperty("maxClientCnxns", "100");
 +    zooCfg.setProperty("dataDir", zooKeeperDir.getAbsolutePath());
 +    zooCfg.store(fileWriter, null);
 +    
 +    fileWriter.close();
 +    
 +  }
 +  
 +  /**
 +   * Starts Accumulo and Zookeeper processes. Can only be called once.
 +   * 
-    * @throws IOException
-    * @throws InterruptedException
 +   * @throws IllegalStateException
 +   *           if already started
 +   */
 +  
 +  public void start() throws IOException, InterruptedException {
 +    if (zooKeeperProcess != null)
 +      throw new IllegalStateException("Already started");
 +    
 +    Runtime.getRuntime().addShutdownHook(new Thread() {
 +      @Override
 +      public void run() {
 +        try {
 +          MiniAccumuloCluster.this.stop();
 +        } catch (IOException e) {
 +          e.printStackTrace();
 +        } catch (InterruptedException e) {
 +          e.printStackTrace();
 +        }
 +      }
 +    });
 +    
 +    zooKeeperProcess = exec(Main.class, ZooKeeperServerMain.class.getName(), zooCfgFile.getAbsolutePath());
 +    
 +    // sleep a little bit to let zookeeper come up before calling init, seems to work better
 +    UtilWaitThread.sleep(250);
 +    
 +    Process initProcess = exec(Initialize.class, "--instance-name", INSTANCE_NAME, "--password", config.getRootPassword());
 +    int ret = initProcess.waitFor();
 +    if (ret != 0) {
 +      throw new RuntimeException("Initialize process returned " + ret + ". Check the logs in " + logDir + " for errors.");
 +    }
 +    
 +    tabletServerProcesses = new Process[config.getNumTservers()];
 +    for (int i = 0; i < config.getNumTservers(); i++) {
 +      tabletServerProcesses[i] = exec(TabletServer.class);
 +    }
 +    
 +    masterProcess = exec(Master.class);
 +    
 +    gcProcess = exec(SimpleGarbageCollector.class);
 +  }
 +  
 +  /**
 +   * @return Accumulo instance name
 +   */
 +  
 +  public String getInstanceName() {
 +    return INSTANCE_NAME;
 +  }
 +  
 +  /**
 +   * @return zookeeper connection string
 +   */
 +  
 +  public String getZooKeepers() {
 +    return "localhost:" + zooKeeperPort;
 +  }
 +  
 +  /**
 +   * Stops Accumulo and Zookeeper processes. If stop is not called, there is a shutdown hook that is setup to kill the processes. Howerver its probably best to
 +   * call stop in a finally block as soon as possible.
-    * 
-    * @throws IOException
-    * @throws InterruptedException
 +   */
 +  
 +  public void stop() throws IOException, InterruptedException {
 +    if (zooKeeperProcess != null) {
 +      zooKeeperProcess.destroy();
 +      zooKeeperProcess.waitFor();
 +    }
 +    if (masterProcess != null) {
 +      masterProcess.destroy();
 +      masterProcess.waitFor();
 +    }
 +    if (tabletServerProcesses != null) {
 +      for (Process tserver : tabletServerProcesses) {
 +        tserver.destroy();
 +        tserver.waitFor();
 +      }
 +    }
 +    
 +    for (LogWriter lw : logWriters)
 +      lw.flush();
 +
 +    if (gcProcess != null) {
 +      gcProcess.destroy();
 +      gcProcess.waitFor();
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/proxy/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
----------------------------------------------------------------------
diff --cc proxy/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
index 99a3218,0000000..c0049a0
mode 100644,000000..100644
--- a/proxy/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
+++ b/proxy/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
@@@ -1,488 -1,0 +1,478 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.proxy;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Properties;
 +import java.util.Set;
 +import java.util.TreeMap;
 +
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.iterators.user.RegExFilter;
 +import org.apache.accumulo.proxy.thrift.BatchScanOptions;
 +import org.apache.accumulo.proxy.thrift.ColumnUpdate;
 +import org.apache.accumulo.proxy.thrift.IteratorSetting;
 +import org.apache.accumulo.proxy.thrift.Key;
 +import org.apache.accumulo.proxy.thrift.KeyValue;
 +import org.apache.accumulo.proxy.thrift.Range;
 +import org.apache.accumulo.proxy.thrift.ScanColumn;
 +import org.apache.accumulo.proxy.thrift.ScanOptions;
 +import org.apache.accumulo.proxy.thrift.ScanResult;
 +import org.apache.accumulo.proxy.thrift.TimeType;
 +import org.apache.thrift.protocol.TCompactProtocol;
 +import org.apache.thrift.server.TServer;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +public class TestProxyReadWrite {
 +  protected static TServer proxy;
 +  protected static Thread thread;
 +  protected static TestProxyClient tpc;
 +  protected static ByteBuffer userpass;
 +  protected static final int port = 10194;
 +  protected static final String testtable = "testtable";
 +  
 +  @SuppressWarnings("serial")
 +  @BeforeClass
 +  public static void setup() throws Exception {
 +    Properties prop = new Properties();
 +    prop.setProperty("useMockInstance", "true");
 +    prop.put("tokenClass", PasswordToken.class.getName());
 +    
 +    proxy = Proxy.createProxyServer(Class.forName("org.apache.accumulo.proxy.thrift.AccumuloProxy"), Class.forName("org.apache.accumulo.proxy.ProxyServer"),
 +        port, TCompactProtocol.Factory.class, prop);
 +    thread = new Thread() {
 +      @Override
 +      public void run() {
 +        proxy.serve();
 +      }
 +    };
 +    thread.start();
 +    tpc = new TestProxyClient("localhost", port);
 +    userpass = tpc.proxy().login("root", new TreeMap<String, String>() {{put("password",""); }});
 +  }
 +  
 +  @AfterClass
 +  public static void tearDown() throws InterruptedException {
 +    proxy.stop();
 +    thread.join();
 +  }
 +  
 +  @Before
 +  public void makeTestTable() throws Exception {
 +    tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS);
 +  }
 +  
 +  @After
 +  public void deleteTestTable() throws Exception {
 +    tpc.proxy().deleteTable(userpass, testtable);
 +  }
 +  
 +  private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String value) {
 +    ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes()));
 +    update.setValue(value.getBytes());
 +    mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update));
 +  }
 +  
 +  private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String vis, String value) {
 +    ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes()));
 +    update.setValue(value.getBytes());
 +    update.setColVisibility(vis.getBytes());
 +    mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update));
 +  }
 +  
 +  /**
 +   * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a range so only the entries between -Inf...5 come back (there should be
 +   * 50,000)
-    * 
-    * @throws Exception
 +   */
 +  @Test
 +  public void readWriteBatchOneShotWithRange() throws Exception {
 +    int maxInserts = 100000;
 +    Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
 +    String format = "%1$05d";
 +    for (int i = 0; i < maxInserts; i++) {
 +      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
 +      
 +      if (i % 1000 == 0 || i == maxInserts - 1) {
 +        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
 +        mutations.clear();
 +      }
 +    }
 +    
 +    Key stop = new Key();
 +    stop.setRow("5".getBytes());
 +    BatchScanOptions options = new BatchScanOptions();
 +    options.ranges = Collections.singletonList(new Range(null, false, stop, false));
 +    String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options);
 +    
 +    int i = 0;
 +    boolean hasNext = true;
 +    
 +    int k = 1000;
 +    while (hasNext) {
 +      ScanResult kvList = tpc.proxy().nextK(cookie, k);
 +      i += kvList.getResultsSize();
 +      hasNext = kvList.isMore();
 +    }
 +    assertEquals(i, 50000);
 +  }
 +
 +  /**
 +   * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a columnFamily so only the entries with specified column family come back (there should be
 +   * 50,000)
-    * 
-    * @throws Exception
 +   */
 +  @Test
 +  public void readWriteBatchOneShotWithColumnFamilyOnly() throws Exception {
 +    int maxInserts = 100000;
 +    Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
 +    String format = "%1$05d";
 +    for (int i = 0; i < maxInserts; i++) {
 +	
 +      addMutation(mutations, String.format(format, i), "cf" + (i % 2) , "cq" + (i % 2), Util.randString(10));
 +      
 +      if (i % 1000 == 0 || i == maxInserts - 1) {
 +        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
 +        mutations.clear();
 +      }
 +    }
 +    
 +    BatchScanOptions options = new BatchScanOptions();
 +
 +	ScanColumn sc = new ScanColumn();
 +	sc.colFamily = ByteBuffer.wrap("cf0".getBytes());
 +
 +    options.columns = Collections.singletonList(sc);
 +    String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options);
 +    
 +    int i = 0;
 +    boolean hasNext = true;
 +    
 +    int k = 1000;
 +    while (hasNext) {
 +      ScanResult kvList = tpc.proxy().nextK(cookie, k);
 +      i += kvList.getResultsSize();
 +      hasNext = kvList.isMore();
 +    }
 +    assertEquals(i, 50000);
 +  }
 +
 +
 +  /**
 +   * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a columnFamily + columnQualififer so only the entries with specified column 
 +   * come back (there should be 50,000)
-    * 
-    * @throws Exception
 +   */
 +  @Test
 +  public void readWriteBatchOneShotWithFullColumn() throws Exception {
 +    int maxInserts = 100000;
 +    Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
 +    String format = "%1$05d";
 +    for (int i = 0; i < maxInserts; i++) {
 +	
 +      addMutation(mutations, String.format(format, i), "cf" + (i % 2) , "cq" + (i % 2), Util.randString(10));
 +      
 +      if (i % 1000 == 0 || i == maxInserts - 1) {
 +        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
 +        mutations.clear();
 +      }
 +    }
 +    
 +    BatchScanOptions options = new BatchScanOptions();
 +
 +	ScanColumn sc = new ScanColumn();
 +	sc.colFamily = ByteBuffer.wrap("cf0".getBytes());
 +	sc.colQualifier = ByteBuffer.wrap("cq0".getBytes());
 +
 +    options.columns = Collections.singletonList(sc);
 +    String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options);
 +    
 +    int i = 0;
 +    boolean hasNext = true;
 +    
 +    int k = 1000;
 +    while (hasNext) {
 +      ScanResult kvList = tpc.proxy().nextK(cookie, k);
 +      i += kvList.getResultsSize();
 +      hasNext = kvList.isMore();
 +    }
 +    assertEquals(i, 50000);
 +  }
 +
 +
 +  /**
 +   * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Filter the results so only the even numbers come back.
-    * 
-    * @throws Exception
 +   */
 +  @Test
 +  public void readWriteBatchOneShotWithFilterIterator() throws Exception {
 +    int maxInserts = 10000;
 +    Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
 +    String format = "%1$05d";
 +    for (int i = 0; i < maxInserts; i++) {
 +      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
 +      
 +      if (i % 1000 == 0 || i == maxInserts - 1) {
 +        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
 +        mutations.clear();
 +      }
 +      
 +    }
 +    
 +    String regex = ".*[02468]";
 +    
 +    org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
 +    RegExFilter.setRegexs(is, regex, null, null, null, false);
 +    
 +    IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
 +    ScanOptions opts = new ScanOptions();
 +    opts.iterators = Collections.singletonList(pis);
 +    String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
 +    
 +    int i = 0;
 +    boolean hasNext = true;
 +    
 +    int k = 1000;
 +    while (hasNext) {
 +      ScanResult kvList = tpc.proxy().nextK(cookie, k);
 +      for (KeyValue kv : kvList.getResults()) {
 +        assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
 +        
 +        i += 2;
 +      }
 +      hasNext = kvList.isMore();
 +    }
 +  }
 +  
 +  @Test
 +  public void readWriteOneShotWithRange() throws Exception {
 +    int maxInserts = 100000;
 +    Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
 +    String format = "%1$05d";
 +    for (int i = 0; i < maxInserts; i++) {
 +      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
 +      
 +      if (i % 1000 == 0 || i == maxInserts - 1) {
 +        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
 +        mutations.clear();
 +      }
 +    }
 +    
 +    Key stop = new Key();
 +    stop.setRow("5".getBytes());
 +    ScanOptions opts = new ScanOptions();
 +    opts.range = new Range(null, false, stop, false);
 +    String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
 +    
 +    int i = 0;
 +    boolean hasNext = true;
 +    
 +    int k = 1000;
 +    while (hasNext) {
 +      ScanResult kvList = tpc.proxy().nextK(cookie, k);
 +      i += kvList.getResultsSize();
 +      hasNext = kvList.isMore();
 +    }
 +    assertEquals(i, 50000);
 +  }
 +  
 +  /**
 +   * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Filter the results so only the even numbers come back.
-    * 
-    * @throws Exception
 +   */
 +  @Test
 +  public void readWriteOneShotWithFilterIterator() throws Exception {
 +    int maxInserts = 10000;
 +    Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
 +    String format = "%1$05d";
 +    for (int i = 0; i < maxInserts; i++) {
 +      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
 +      
 +      if (i % 1000 == 0 || i == maxInserts - 1) {
 +        
 +        tpc.proxy().updateAndFlush(userpass, testtable, mutations);
 +        mutations.clear();
 +        
 +      }
 +      
 +    }
 +    
 +    String regex = ".*[02468]";
 +    
 +    org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
 +    RegExFilter.setRegexs(is, regex, null, null, null, false);
 +    
 +    IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
 +    ScanOptions opts = new ScanOptions();
 +    opts.iterators = Collections.singletonList(pis);
 +    String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
 +    
 +    int i = 0;
 +    boolean hasNext = true;
 +    
 +    int k = 1000;
 +    while (hasNext) {
 +      ScanResult kvList = tpc.proxy().nextK(cookie, k);
 +      for (KeyValue kv : kvList.getResults()) {
 +        assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
 +        
 +        i += 2;
 +      }
 +      hasNext = kvList.isMore();
 +    }
 +  }
 +  
 +  // @Test
 +  // This test takes kind of a long time. Enable it if you think you may have memory issues.
 +  public void manyWritesAndReads() throws Exception {
 +    int maxInserts = 1000000;
 +    Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
 +    String format = "%1$06d";
 +    String writer = tpc.proxy().createWriter(userpass, testtable, null);
 +    for (int i = 0; i < maxInserts; i++) {
 +      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
 +      
 +      if (i % 1000 == 0 || i == maxInserts - 1) {
 +        
 +        tpc.proxy().update(writer, mutations);
 +        mutations.clear();
 +        
 +      }
 +      
 +    }
 +    
 +    tpc.proxy().flush(writer);
 +    tpc.proxy().closeWriter(writer);
 +    
 +    String cookie = tpc.proxy().createScanner(userpass, testtable, null);
 +    
 +    int i = 0;
 +    boolean hasNext = true;
 +    
 +    int k = 1000;
 +    while (hasNext) {
 +      ScanResult kvList = tpc.proxy().nextK(cookie, k);
 +      for (KeyValue kv : kvList.getResults()) {
 +        assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
 +        i++;
 +      }
 +      hasNext = kvList.isMore();
 +      if (hasNext)
 +        assertEquals(k, kvList.getResults().size());
 +    }
 +    assertEquals(maxInserts, i);
 +  }
 +  
 +  @Test
 +  public void asynchReadWrite() throws Exception {
 +    int maxInserts = 10000;
 +    Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
 +    String format = "%1$05d";
 +    String writer = tpc.proxy().createWriter(userpass, testtable, null);
 +    for (int i = 0; i < maxInserts; i++) {
 +      addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
 +      
 +      if (i % 1000 == 0 || i == maxInserts - 1) {
 +        tpc.proxy().update(writer, mutations);
 +        mutations.clear();
 +      }
 +    }
 +    
 +    tpc.proxy().flush(writer);
 +    tpc.proxy().closeWriter(writer);
 +    
 +    String regex = ".*[02468]";
 +    
 +    org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
 +    RegExFilter.setRegexs(is, regex, null, null, null, false);
 +    
 +    IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
 +    ScanOptions opts = new ScanOptions();
 +    opts.iterators = Collections.singletonList(pis);
 +    String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
 +    
 +    int i = 0;
 +    boolean hasNext = true;
 +    
 +    int k = 1000;
 +    int numRead = 0;
 +    while (hasNext) {
 +      ScanResult kvList = tpc.proxy().nextK(cookie, k);
 +      for (KeyValue kv : kvList.getResults()) {
 +        assertEquals(i, Integer.parseInt(new String(kv.getKey().getRow())));
 +        numRead++;
 +        i += 2;
 +      }
 +      hasNext = kvList.isMore();
 +    }
 +    assertEquals(maxInserts / 2, numRead);
 +  }
 +  
 +  @Test
 +  public void testVisibility() throws Exception {
 +    
 +    Set<ByteBuffer> auths = new HashSet<ByteBuffer>();
 +    auths.add(ByteBuffer.wrap("even".getBytes()));
 +    tpc.proxy().changeUserAuthorizations(userpass, "root", auths);
 +    
 +    int maxInserts = 10000;
 +    Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
 +    String format = "%1$05d";
 +    String writer = tpc.proxy().createWriter(userpass, testtable, null);
 +    for (int i = 0; i < maxInserts; i++) {
 +      if (i % 2 == 0)
 +        addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, "even", Util.randString(10));
 +      else
 +        addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, "odd", Util.randString(10));
 +      
 +      if (i % 1000 == 0 || i == maxInserts - 1) {
 +        tpc.proxy().update(writer, mutations);
 +        mutations.clear();
 +      }
 +    }
 +    
 +    tpc.proxy().flush(writer);
 +    tpc.proxy().closeWriter(writer);
 +    ScanOptions opts = new ScanOptions();
 +    opts.authorizations = auths;
 +    String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
 +    
 +    int i = 0;
 +    boolean hasNext = true;
 +    
 +    int k = 1000;
 +    int numRead = 0;
 +    while (hasNext) {
 +      ScanResult kvList = tpc.proxy().nextK(cookie, k);
 +      for (KeyValue kv : kvList.getResults()) {
 +        assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
 +        i += 2;
 +        numRead++;
 +      }
 +      hasNext = kvList.isMore();
 +      
 +    }
 +    assertEquals(maxInserts / 2, numRead);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
index 442294f,0000000..05806ca
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
+++ b/server/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
@@@ -1,30 -1,0 +1,27 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.conf;
 +
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +
 +public class ConfigSanityCheck {
 +  
-   /**
-    * @param args
-    */
 +  public static void main(String[] args) {
 +    new ServerConfiguration(HdfsZooInstance.getInstance()).getConfiguration();
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
index 4f9d33a,0000000..01626ad
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
+++ b/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
@@@ -1,191 -1,0 +1,190 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.logger;
 +
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.Help;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.tabletserver.log.DfsLogger;
 +import org.apache.accumulo.server.tabletserver.log.MultiReader;
 +import org.apache.accumulo.server.trace.TraceFileSystem;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +
 +import com.beust.jcommander.JCommander;
 +import com.beust.jcommander.Parameter;
 +
 +public class LogReader {
 +  
 +  static class Opts extends Help {
 +    @Parameter(names = "-r", description = "print only mutations associated with the given row")
 +    String row;
 +    @Parameter(names = "-m", description = "limit the number of mutations printed per row")
 +    int maxMutations = 5;
 +    @Parameter(names = "-t", description = "print only mutations that fall within the given key extent")
 +    String extent;
 +    @Parameter(names = "-p", description = "search for a row that matches the given regex")
 +    String regexp;
 +    @Parameter(description = "<logfile> { <logfile> ... }")
 +    List<String> files = new ArrayList<String>();
 +  }
 +  
 +  /**
 +   * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
 +   * 
 +   * @param args
 +   *          - first argument is the file to print
-    * @throws IOException
 +   */
 +  public static void main(String[] args) throws IOException {
 +    Opts opts = new Opts();
 +    opts.parseArgs(LogReader.class.getName(), args);
 +    Configuration conf = CachedConfiguration.getInstance();
 +    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration()));
 +    FileSystem local = TraceFileSystem.wrap(FileSystem.getLocal(conf));
 +    
 +    Matcher rowMatcher = null;
 +    KeyExtent ke = null;
 +    Text row = null;
 +    if (opts.files.isEmpty()) {
 +      new JCommander(opts).usage();
 +      return;
 +    }
 +    if (opts.row != null)
 +      row = new Text(opts.row);
 +    if (opts.extent != null) {
 +      String sa[] = opts.extent.split(";");
 +      ke = new KeyExtent(new Text(sa[0]), new Text(sa[1]), new Text(sa[2]));
 +    }
 +    if (opts.regexp != null) {
 +      Pattern pattern = Pattern.compile(opts.regexp);
 +      rowMatcher = pattern.matcher("");
 +    }
 +    
 +    Set<Integer> tabletIds = new HashSet<Integer>();
 +    
 +    for (String file : opts.files) {
 +      
 +      Map<String, String> meta = new HashMap<String, String>();
 +      Path path = new Path(file);
 +      LogFileKey key = new LogFileKey();
 +      LogFileValue value = new LogFileValue();
 +      
 +      if (fs.isFile(path)) {
 +        // read log entries from a simple hdfs file
 +        FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
 +        try {
 +          while (true) {
 +            try {
 +              key.readFields(f);
 +              value.readFields(f);
 +            } catch (EOFException ex) {
 +              break;
 +            }
 +            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
 +          }
 +        } finally {
 +          f.close();
 +        }
 +      } else if (local.isFile(path)) {
 +        // read log entries from a simple file
 +        FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
 +        try {
 +          while (true) {
 +            try {
 +              key.readFields(f);
 +              value.readFields(f);
 +            } catch (EOFException ex) {
 +              break;
 +            }
 +            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
 +          }
 +        } finally {
 +          f.close();
 +        }
 +      } else {
 +        // read the log entries sorted in a map file
 +        MultiReader input = new MultiReader(fs, conf, file);
 +        while (input.next(key, value)) {
 +          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
 +        }
 +      }
 +    }
 +  }
 +  
 +  public static void printLogEvent(LogFileKey key, LogFileValue value, Text row, Matcher rowMatcher, KeyExtent ke, Set<Integer> tabletIds, int maxMutations) {
 +    
 +    if (ke != null) {
 +      if (key.event == LogEvents.DEFINE_TABLET) {
 +        if (key.tablet.equals(ke)) {
 +          tabletIds.add(key.tid);
 +        } else {
 +          return;
 +        }
 +      } else if (!tabletIds.contains(key.tid)) {
 +        return;
 +      }
 +    }
 +    
 +    if (row != null || rowMatcher != null) {
 +      if (key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) {
 +        boolean found = false;
 +        for (Mutation m : value.mutations) {
 +          if (row != null && new Text(m.getRow()).equals(row)) {
 +            found = true;
 +            break;
 +          }
 +          
 +          if (rowMatcher != null) {
 +            rowMatcher.reset(new String(m.getRow(), Constants.UTF8));
 +            if (rowMatcher.matches()) {
 +              found = true;
 +              break;
 +            }
 +          }
 +        }
 +        
 +        if (!found)
 +          return;
 +      } else {
 +        return;
 +      }
 +      
 +    }
 +    
 +    System.out.println(key);
 +    System.out.println(LogFileValue.format(value, maxMutations));
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
index 4930bc2,0000000..e14008a
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
@@@ -1,152 -1,0 +1,144 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.balancer;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.SortedMap;
 +
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.master.thrift.TableInfo;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletMigration;
 +import org.apache.thrift.TException;
 +
 +/**
 + * A chaotic load balancer used for testing. It constantly shuffles tablets, preventing them from resting in a single location for very long. This is not
 + * designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer.
 + */
 +public class ChaoticLoadBalancer extends TabletBalancer {
 +  Random r = new Random();
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.master.balancer.TabletBalancer#getAssignments(java.util.SortedMap, java.util.Map, java.util.Map)
-    */
 +  @Override
 +  public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
 +      Map<KeyExtent,TServerInstance> assignments) {
 +    long total = assignments.size() + unassigned.size();
 +    long avg = (long) Math.ceil(((double) total) / current.size());
 +    Map<TServerInstance,Long> toAssign = new HashMap<TServerInstance,Long>();
 +    List<TServerInstance> tServerArray = new ArrayList<TServerInstance>();
 +    for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
 +      long numTablets = 0;
 +      for (TableInfo ti : e.getValue().getTableMap().values()) {
 +        numTablets += ti.tablets;
 +      }
 +      if (numTablets < avg) {
 +        tServerArray.add(e.getKey());
 +        toAssign.put(e.getKey(), avg - numTablets);
 +      }
 +    }
 +
 +    for (KeyExtent ke : unassigned.keySet())
 +    {
 +      int index = r.nextInt(tServerArray.size());
 +      TServerInstance dest = tServerArray.get(index);
 +      assignments.put(ke, dest);
 +      long remaining = toAssign.get(dest).longValue() - 1;
 +      if (remaining == 0) {
 +        tServerArray.remove(index);
 +        toAssign.remove(dest);
 +      } else {
 +        toAssign.put(dest, remaining);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Will balance randomly, maintaining distribution
 +   */
 +  @Override
 +  public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
 +    Map<TServerInstance,Long> numTablets = new HashMap<TServerInstance,Long>();
 +    List<TServerInstance> underCapacityTServer = new ArrayList<TServerInstance>();
 +
 +    if (!migrations.isEmpty())
 +      return 100;
 +
 +    boolean moveMetadata = r.nextInt(4) == 0;
 +    long totalTablets = 0;
 +    for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
 +      long tabletCount = 0;
 +      for (TableInfo ti : e.getValue().getTableMap().values()) {
 +        tabletCount += ti.tablets;
 +      }
 +      numTablets.put(e.getKey(), tabletCount);
 +      underCapacityTServer.add(e.getKey());
 +      totalTablets += tabletCount;
 +    }
 +    // totalTablets is fuzzy due to asynchronicity of the stats
 +    // *1.2 to handle fuzziness, and prevent locking for 'perfect' balancing scenarios
 +    long avg = (long) Math.ceil(((double) totalTablets) / current.size() * 1.2);
 +    
 +    for (Entry<TServerInstance, TabletServerStatus> e : current.entrySet())
 +    {
 +      for (String table : e.getValue().getTableMap().keySet())
 +      {
 +        if (!moveMetadata && "!METADATA".equals(table))
 +          continue;
 +        try {
 +          for (TabletStats ts : getOnlineTabletsForTable(e.getKey(), table)) {
 +            KeyExtent ke = new KeyExtent(ts.extent);
 +            int index = r.nextInt(underCapacityTServer.size());
 +            TServerInstance dest = underCapacityTServer.get(index);
 +            if (dest.equals(e.getKey()))
 +              continue;
 +            migrationsOut.add(new TabletMigration(ke, e.getKey(), dest));
 +            if (numTablets.put(dest, numTablets.get(dest) + 1) > avg)
 +              underCapacityTServer.remove(index);
 +            if (numTablets.put(e.getKey(), numTablets.get(e.getKey()) - 1) <= avg && !underCapacityTServer.contains(e.getKey()))
 +              underCapacityTServer.add(e.getKey());
 +            
 +            // We can get some craziness with only 1 tserver, so lets make sure there's always an option!
 +            if (underCapacityTServer.isEmpty())
 +              underCapacityTServer.addAll(numTablets.keySet());
 +          }
 +        } catch (ThriftSecurityException e1) {
 +          // Shouldn't happen, but carry on if it does
 +          e1.printStackTrace();
 +        } catch (TException e1) {
 +          // Shouldn't happen, but carry on if it does
 +          e1.printStackTrace();
 +        }
 +      }
 +    }
 +    
 +    return 100;
 +  }
 +  
-   /*
-    * (non-Javadoc)
-    * 
-    * @see org.apache.accumulo.server.master.balancer.TabletBalancer#init(org.apache.accumulo.server.conf.ServerConfiguration)
-    */
 +  @Override
 +  public void init(ServerConfiguration conf) {
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
index d6dce2f,0000000..69387d3
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
@@@ -1,150 -1,0 +1,148 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.balancer;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.SortedMap;
 +
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletMigration;
 +import org.apache.accumulo.server.security.SecurityConstants;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.transport.TTransportException;
 +
 +public abstract class TabletBalancer {
 +  
 +  private static final Logger log = Logger.getLogger(TabletBalancer.class);
 +  
 +  protected ServerConfiguration configuration;
 +
 +  /**
 +   * Initialize the TabletBalancer. This gives the balancer the opportunity to read the configuration.
 +   */
 +  public void init(ServerConfiguration conf) {
 +    configuration = conf;
 +  }
 +  
 +  /**
 +   * Assign tablets to tablet servers. This method is called whenever the master finds tablets that are unassigned.
 +   * 
 +   * @param current
 +   *          The current table-summary state of all the online tablet servers. Read-only. The TabletServerStatus for each server may be null if the tablet
 +   *          server has not yet responded to a recent request for status.
 +   * @param unassigned
 +   *          A map from unassigned tablet to the last known tablet server. Read-only.
 +   * @param assignments
 +   *          A map from tablet to assigned server. Write-only.
 +   */
 +  abstract public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
 +      Map<KeyExtent,TServerInstance> assignments);
 +  
 +  /**
 +   * Ask the balancer if any migrations are necessary.
 +   * 
 +   * @param current
 +   *          The current table-summary state of all the online tablet servers. Read-only.
 +   * @param migrations
 +   *          the current set of migrations. Read-only.
 +   * @param migrationsOut
 +   *          new migrations to perform; should not contain tablets in the current set of migrations. Write-only.
 +   * @return the time, in milliseconds, to wait before re-balancing.
 +   * 
 +   *         This method will not be called when there are unassigned tablets.
 +   */
 +  public abstract long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut);
 +  
 +  /**
 +   * Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets
 +   * to move.
 +   * 
 +   * @param tserver
 +   *          The tablet server to ask.
 +   * @param tableId
 +   *          The table id
 +   * @return a list of tablet statistics
 +   * @throws ThriftSecurityException
 +   *           tablet server disapproves of your internal System password.
 +   * @throws TException
 +   *           any other problem
 +   */
 +  public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException {
 +    log.debug("Scanning tablet server " + tserver + " for table " + tableId);
 +    Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), tserver.getLocation(), configuration.getConfiguration());
 +    try {
 +      List<TabletStats> onlineTabletsForTable = client.getTabletStats(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), tableId);
 +      return onlineTabletsForTable;
 +    } catch (TTransportException e) {
 +      log.error("Unable to connect to " + tserver + ": " + e);
 +    } finally {
 +      ThriftUtil.returnClient(client);
 +    }
 +    return null;
 +  }
 +  
 +  /**
 +   * Utility to ensure that the migrations from balance() are consistent:
 +   * <ul>
 +   * <li>Tablet objects are not null
 +   * <li>Source and destination tablet servers are not null and current
 +   * </ul>
 +   * 
-    * @param current
-    * @param migrations
 +   * @return A list of TabletMigration object that passed sanity checks.
 +   */
 +  public static List<TabletMigration> checkMigrationSanity(Set<TServerInstance> current, List<TabletMigration> migrations) {
 +    List<TabletMigration> result = new ArrayList<TabletMigration>(migrations.size());
 +    for (TabletMigration m : migrations) {
 +      if (m.tablet == null) {
 +        log.warn("Balancer gave back a null tablet " + m);
 +        continue;
 +      }
 +      if (m.newServer == null) {
 +        log.warn("Balancer did not set the destination " + m);
 +        continue;
 +      }
 +      if (m.oldServer == null) {
 +        log.warn("Balancer did not set the source " + m);
 +        continue;
 +      }
 +      if (!current.contains(m.oldServer)) {
 +        log.warn("Balancer wants to move a tablet from a server that is not current: " + m);
 +        continue;
 +      }
 +      if (!current.contains(m.newServer)) {
 +        log.warn("Balancer wants to move a tablet to a server that is not current: " + m);
 +        continue;
 +      }
 +      result.add(m);
 +    }
 +    return result;
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index f9f03bd,0000000..540ebc0
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@@ -1,87 -1,0 +1,81 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.state;
 +
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Iterator;
 +
 +/**
 + * Interface for storing information about tablet assignments. There are three implementations:
 + * 
 + * ZooTabletStateStore: information about the root tablet is stored in ZooKeeper MetaDataStateStore: information about the other tablets are stored in the
 + * metadata table
 + * 
 + */
 +public abstract class TabletStateStore implements Iterable<TabletLocationState> {
 +  
 +  /**
 +   * Identifying name for this tablet state store.
 +   */
 +  abstract public String name();
 +  
 +  /**
 +   * Scan the information about the tablets covered by this store
 +   */
++  @Override
 +  abstract public Iterator<TabletLocationState> iterator();
 +  
 +  /**
 +   * Store the assigned locations in the data store.
-    * 
-    * @param assignments
-    * @throws DistributedStoreException
 +   */
 +  abstract public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException;
 +  
 +  /**
 +   * Tablet servers will update the data store with the location when they bring the tablet online
-    * 
-    * @param assignments
-    * @throws DistributedStoreException
 +   */
 +  abstract public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException;
 +  
 +  /**
 +   * Mark the tablets as having no known or future location.
 +   * 
 +   * @param tablets
 +   *          the tablets' current information
-    * @throws DistributedStoreException
 +   */
 +  abstract public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException;
 +  
 +  public static void unassign(TabletLocationState tls) throws DistributedStoreException {
 +    TabletStateStore store;
 +    if (tls.extent.isRootTablet()) {
 +      store = new ZooTabletStateStore();
 +    } else {
 +      store = new MetaDataStateStore();
 +    }
 +    store.unassign(Collections.singletonList(tls));
 +  }
 +  
 +  public static void setLocation(Assignment assignment) throws DistributedStoreException {
 +    TabletStateStore store;
 +    if (assignment.tablet.isRootTablet()) {
 +      store = new ZooTabletStateStore();
 +    } else {
 +      store = new MetaDataStateStore();
 +    }
 +    store.setLocations(Collections.singletonList(assignment));
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
index 58a337f,0000000..45f6a60
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
@@@ -1,109 -1,0 +1,84 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.tableOps;
 +
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.accumulo.trace.thrift.TInfo;
 +import org.apache.accumulo.fate.Repo;
 +
 +
 +/**
 + * 
 + */
 +public class TraceRepo<T> implements Repo<T> {
 +  
 +  private static final long serialVersionUID = 1L;
 +
 +  TInfo tinfo;
 +  Repo<T> repo;
 +  
 +  public TraceRepo(Repo<T> repo) {
 +    this.repo = repo;
 +    tinfo = Tracer.traceInfo();
 +  }
 +  
-   /*
-    * (non-Javadoc)
-    * 
-    * @see org.apache.accumulo.server.fate.Repo#isReady(long, java.lang.Object)
-    */
 +  @Override
 +  public long isReady(long tid, T environment) throws Exception {
 +    Span span = Trace.trace(tinfo, repo.getDescription());
 +    try {
 +      return repo.isReady(tid, environment);
 +    } finally {
 +      span.stop();
 +    }
 +  }
 +  
-   /*
-    * (non-Javadoc)
-    * 
-    * @see org.apache.accumulo.server.fate.Repo#call(long, java.lang.Object)
-    */
 +  @Override
 +  public Repo<T> call(long tid, T environment) throws Exception {
 +    Span span = Trace.trace(tinfo, repo.getDescription());
 +    try {
 +      Repo<T> result = repo.call(tid, environment);
 +      if (result == null)
 +        return result;
 +      return new TraceRepo<T>(result);
 +    } finally {
 +      span.stop();
 +    }
 +  }
 +  
-   /*
-    * (non-Javadoc)
-    * 
-    * @see org.apache.accumulo.server.fate.Repo#undo(long, java.lang.Object)
-    */
 +  @Override
 +  public void undo(long tid, T environment) throws Exception {
 +    Span span = Trace.trace(tinfo, repo.getDescription());
 +    try {
 +      repo.undo(tid, environment);
 +    } finally {
 +      span.stop();
 +    }
 +  }
 +  
-   /*
-    * (non-Javadoc)
-    * 
-    * @see org.apache.accumulo.server.fate.Repo#getDescription()
-    */
 +  @Override
 +  public String getDescription() {
 +    return repo.getDescription();
 +  }
 +  
-   /*
-    * (non-Javadoc)
-    * 
-    * @see org.apache.accumulo.server.fate.Repo#getReturn()
-    */
 +  @Override
 +  public String getReturn() {
 +    return repo.getReturn();
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
index 829d7bc,0000000..7e50754
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
+++ b/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
@@@ -1,70 -1,0 +1,66 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.metanalysis;
 +
 +import java.io.IOException;
 +
 +import org.apache.accumulo.server.logger.LogFileKey;
 +import org.apache.accumulo.server.logger.LogFileValue;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.mapreduce.RecordWriter;
 +import org.apache.hadoop.mapreduce.TaskAttemptContext;
 +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 +
 +/**
 + * Output format for Accumulo write ahead logs.
 + */
 +public class LogFileOutputFormat extends FileOutputFormat<LogFileKey,LogFileValue> {
 +  
 +  private static class LogFileRecordWriter extends RecordWriter<LogFileKey,LogFileValue> {
 +    
 +    private FSDataOutputStream out;
 +    
-     /**
-      * @param outputPath
-      * @throws IOException
-      */
 +    public LogFileRecordWriter(Path outputPath) throws IOException {
 +      Configuration conf = new Configuration();
 +      FileSystem fs = FileSystem.get(conf);
 +      
 +      out = fs.create(outputPath);
 +    }
 +
 +    @Override
 +    public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
 +      out.close();
 +    }
 +    
 +    @Override
 +    public void write(LogFileKey key, LogFileValue val) throws IOException, InterruptedException {
 +      key.write(out);
 +      val.write(out);
 +    }
 +    
 +  }
 +
 +  @Override
 +  public RecordWriter<LogFileKey,LogFileValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
 +    Path outputPath = getDefaultWorkFile(context, "");
 +    return new LogFileRecordWriter(outputPath);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
index 88f5cbe,0000000..0478d83
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
+++ b/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
@@@ -1,109 -1,0 +1,106 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.metanalysis;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.DataInputStream;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.Constants;
- import org.apache.accumulo.server.cli.ClientOpts;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.ColumnUpdate;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.server.cli.ClientOpts;
 +import org.apache.accumulo.server.logger.LogFileValue;
 +import org.apache.hadoop.io.Text;
 +
 +import com.beust.jcommander.Parameter;
 +
 +/**
 + * Looks up and prints mutations indexed by IndexMeta
 + */
 +public class PrintEvents {
 +  
 +  static class Opts extends ClientOpts {
 +    @Parameter(names={"-t", "--tableId"}, description="table id", required=true)
 +    String tableId;
 +    @Parameter(names={"-e", "--endRow"}, description="end row")
 +    String endRow;
 +    @Parameter(names={"-t", "--time"}, description="time, in milliseconds", required=true)
 +    long time;
 +  }
 +  
-   /**
-    * @param args
-    */
 +  public static void main(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(PrintEvents.class.getName(), args);
 +    
 +    Connector conn = opts.getConnector();
 +    
 +    printEvents(conn, opts.tableId, opts.endRow, opts.time);
 +  }
 +  
 +  /**
 +   * @param conn
 +   * @param tablePrefix
 +   * @param tableId
 +   * @param endRow
 +   * @param time
 +   */
 +  private static void printEvents(Connector conn, String tableId, String endRow, Long time) throws Exception {
 +    Scanner scanner = conn.createScanner("tabletEvents", new Authorizations());
 +    String metaRow = tableId + (endRow == null ? "<" : ";" + endRow);
 +    scanner.setRange(new Range(new Key(metaRow, String.format("%020d", time)), true, new Key(metaRow).followingKey(PartialKey.ROW), false));
 +    int count = 0;
 +    
 +    String lastLog = null;
 +
 +    loop1: for (Entry<Key,Value> entry : scanner) {
 +      if (entry.getKey().getColumnQualifier().toString().equals("log")) {
 +        if (lastLog == null || !lastLog.equals(entry.getValue().toString()))
 +          System.out.println("Log : " + entry.getValue());
 +        lastLog = entry.getValue().toString();
 +      } else if (entry.getKey().getColumnQualifier().toString().equals("mut")) {
 +        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(entry.getValue().get()));
 +        Mutation m = new Mutation();
 +        m.readFields(dis);
 +        
 +        LogFileValue lfv = new LogFileValue();
 +        lfv.mutations = Collections.singletonList(m);
 +        
 +        System.out.println(LogFileValue.format(lfv, 1));
 +        
 +        List<ColumnUpdate> columnsUpdates = m.getUpdates();
 +        for (ColumnUpdate cu : columnsUpdates) {
 +          if (Constants.METADATA_PREV_ROW_COLUMN.equals(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier())) && count > 0) {
 +            System.out.println("Saw change to prevrow, stopping printing events.");
 +            break loop1;
 +          }
 +        }
 +        count++;
 +      }
 +    }
 +
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
index 5a8ddec,0000000..d76c7a3
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
+++ b/server/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
@@@ -1,277 -1,0 +1,273 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.metrics;
 +
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStreamWriter;
 +import java.io.Writer;
 +import java.lang.management.ManagementFactory;
 +import java.text.SimpleDateFormat;
 +import java.util.Date;
 +import java.util.concurrent.ConcurrentHashMap;
 +
 +import javax.management.MBeanServer;
 +import javax.management.ObjectName;
 +import javax.management.StandardMBean;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.commons.lang.builder.ToStringBuilder;
 +import org.apache.commons.lang.time.DateUtils;
 +
 +public abstract class AbstractMetricsImpl {
 +  
 +  public class Metric {
 +    
 +    private long count = 0;
 +    private long avg = 0;
 +    private long min = 0;
 +    private long max = 0;
 +    
 +    public long getCount() {
 +      return count;
 +    }
 +    
 +    public long getAvg() {
 +      return avg;
 +    }
 +    
 +    public long getMin() {
 +      return min;
 +    }
 +    
 +    public long getMax() {
 +      return max;
 +    }
 +    
 +    public void incCount() {
 +      count++;
 +    }
 +    
 +    public void addAvg(long a) {
 +      if (a < 0)
 +        return;
 +      avg = (long) ((avg * .8) + (a * .2));
 +    }
 +    
 +    public void addMin(long a) {
 +      if (a < 0)
 +        return;
 +      min = Math.min(min, a);
 +    }
 +    
 +    public void addMax(long a) {
 +      if (a < 0)
 +        return;
 +      max = Math.max(max, a);
 +    }
 +    
 +    @Override
 +    public String toString() {
 +      return new ToStringBuilder(this).append("count", count).append("average", avg).append("minimum", min).append("maximum", max).toString();
 +    }
 +    
 +  }
 +  
 +  static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AbstractMetricsImpl.class);
 +  
 +  private static ConcurrentHashMap<String,Metric> registry = new ConcurrentHashMap<String,Metric>();
 +  
 +  private boolean currentlyLogging = false;
 +  
 +  private File logDir = null;
 +  
 +  private String metricsPrefix = null;
 +  
 +  private Date today = new Date();
 +  
 +  private File logFile = null;
 +  
 +  private Writer logWriter = null;
 +  
 +  private SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
 +  
 +  private SimpleDateFormat logFormatter = new SimpleDateFormat("yyyyMMddhhmmssz");
 +  
 +  private MetricsConfiguration config = null;
 +  
 +  public AbstractMetricsImpl() {
 +    this.metricsPrefix = getMetricsPrefix();
 +    config = new MetricsConfiguration(metricsPrefix);
 +  }
 +  
 +  /**
 +   * Registers a StandardMBean with the MBean Server
-    * 
-    * @throws Exception
 +   */
 +  public void register(StandardMBean mbean) throws Exception {
 +    // Register this object with the MBeanServer
 +    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 +    if (null == getObjectName())
 +      throw new IllegalArgumentException("MBean object name must be set.");
 +    mbs.registerMBean(mbean, getObjectName());
 +    
 +    setupLogging();
 +  }
 +  
 +  /**
 +   * Registers this MBean with the MBean Server
-    * 
-    * @throws Exception
 +   */
 +  public void register() throws Exception {
 +    // Register this object with the MBeanServer
 +    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 +    if (null == getObjectName())
 +      throw new IllegalArgumentException("MBean object name must be set.");
 +    mbs.registerMBean(this, getObjectName());
 +    
 +    setupLogging();
 +  }
 +  
 +  public void createMetric(String name) {
 +    registry.put(name, new Metric());
 +  }
 +  
 +  public Metric getMetric(String name) {
 +    return registry.get(name);
 +  }
 +  
 +  public long getMetricCount(String name) {
 +    return registry.get(name).getCount();
 +  }
 +  
 +  public long getMetricAvg(String name) {
 +    return registry.get(name).getAvg();
 +  }
 +  
 +  public long getMetricMin(String name) {
 +    return registry.get(name).getMin();
 +  }
 +  
 +  public long getMetricMax(String name) {
 +    return registry.get(name).getMax();
 +  }
 +  
 +  private void setupLogging() throws IOException {
 +    if (null == config.getMetricsConfiguration())
 +      return;
 +    // If we are already logging, then return
 +    if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
 +      // Check to see if directory exists, else make it
 +      String mDir = config.getMetricsConfiguration().getString("logging.dir");
 +      if (null != mDir) {
 +        File dir = new File(mDir);
 +        if (!dir.isDirectory())
 +          if (!dir.mkdir()) 
 +            log.warn("Could not create log directory: " + dir);
 +        logDir = dir;
 +        // Create new log file
 +        startNewLog();
 +      }
 +      currentlyLogging = true;
 +    }
 +  }
 +  
 +  private void startNewLog() throws IOException {
 +    if (null != logWriter) {
 +      logWriter.flush();
 +      logWriter.close();
 +    }
 +    logFile = new File(logDir, metricsPrefix + "-" + formatter.format(today) + ".log");
 +    if (!logFile.exists()) {
 +      if (!logFile.createNewFile()) {
 +        log.error("Unable to create new log file");
 +        currentlyLogging = false;
 +        return;
 +      }
 +    }
 +    logWriter = new OutputStreamWriter(new FileOutputStream(logFile, true), Constants.UTF8);
 +  }
 +  
 +  private void writeToLog(String name) throws IOException {
 +    if (null == logWriter)
 +      return;
 +    // Increment the date if we have to
 +    Date now = new Date();
 +    if (!DateUtils.isSameDay(today, now)) {
 +      today = now;
 +      startNewLog();
 +    }
 +    logWriter.append(logFormatter.format(now)).append(" Metric: ").append(name).append(": ").append(registry.get(name).toString()).append("\n");
 +  }
 +  
 +  public void add(String name, long time) {
 +    if (isEnabled()) {
 +      registry.get(name).incCount();
 +      registry.get(name).addAvg(time);
 +      registry.get(name).addMin(time);
 +      registry.get(name).addMax(time);
 +      // If we are not currently logging and should be, then initialize
 +      if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
 +        try {
 +          setupLogging();
 +        } catch (IOException ioe) {
 +          log.error("Error setting up log", ioe);
 +        }
 +      } else if (currentlyLogging && !config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
 +        // if we are currently logging and shouldn't be, then close logs
 +        try {
 +          logWriter.flush();
 +          logWriter.close();
 +          logWriter = null;
 +          logFile = null;
 +        } catch (Exception e) {
 +          log.error("Error stopping metrics logging", e);
 +        }
 +        currentlyLogging = false;
 +      }
 +      if (currentlyLogging) {
 +        try {
 +          writeToLog(name);
 +        } catch (IOException ioe) {
 +          log.error("Error writing to metrics log", ioe);
 +        }
 +      }
 +    }
 +  }
 +  
 +  public boolean isEnabled() {
 +    return config.isEnabled();
 +  }
 +  
 +  protected abstract ObjectName getObjectName();
 +  
 +  protected abstract String getMetricsPrefix();
 +  
 +  @Override
 +  protected void finalize() {
 +    if (null != logWriter) {
 +      try {
 +        logWriter.close();
 +      } catch (Exception e) {
 +        // do nothing
 +      } finally {
 +        logWriter = null;
 +      }
 +    }
 +    logFile = null;
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
index d51f3f9,0000000..3bda9d6
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
+++ b/server/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
@@@ -1,146 -1,0 +1,104 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.security.handler;
 +
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.security.SystemPermission;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +
 +/**
 + * This is a Permission Handler implementation that doesn't actually do any security. Use at your own risk.
 + */
 +public class InsecurePermHandler implements PermissionHandler {
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#initialize(java.lang.String)
-    */
 +  @Override
 +  public void initialize(String instanceId, boolean initialize) {
 +    return;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#validSecurityHandlers(org.apache.accumulo.server.security.handler.Authenticator, org.apache.accumulo.server.security.handler.Authorizor)
-    */
 +  @Override
 +  public boolean validSecurityHandlers(Authenticator authent, Authorizor author) {
 +    return true;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#initializeSecurity(java.lang.String)
-    */
 +  @Override
 +  public void initializeSecurity(TCredentials token, String rootuser) throws AccumuloSecurityException {
 +    return;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#hasSystemPermission(java.lang.String, org.apache.accumulo.core.security.SystemPermission)
-    */
 +  @Override
 +  public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
 +    return true;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#hasCachedSystemPermission(java.lang.String, org.apache.accumulo.core.security.SystemPermission)
-    */
 +  @Override
 +  public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
 +    return true;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#hasTablePermission(java.lang.String, java.lang.String, org.apache.accumulo.core.security.TablePermission)
-    */
 +  @Override
 +  public boolean hasTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
 +    return true;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#hasCachedTablePermission(java.lang.String, java.lang.String, org.apache.accumulo.core.security.TablePermission)
-    */
 +  @Override
 +  public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
 +    return true;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#grantSystemPermission(java.lang.String, org.apache.accumulo.core.security.SystemPermission)
-    */
 +  @Override
 +  public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
 +    return;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#revokeSystemPermission(java.lang.String, org.apache.accumulo.core.security.SystemPermission)
-    */
 +  @Override
 +  public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
 +    return;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#grantTablePermission(java.lang.String, java.lang.String, org.apache.accumulo.core.security.TablePermission)
-    */
 +  @Override
 +  public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
 +    return;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#revokeTablePermission(java.lang.String, java.lang.String, org.apache.accumulo.core.security.TablePermission)
-    */
 +  @Override
 +  public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
 +    return;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#cleanTablePermissions(java.lang.String)
-    */
 +  @Override
 +  public void cleanTablePermissions(String table) throws AccumuloSecurityException, TableNotFoundException {
 +    return;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#initUser(java.lang.String)
-    */
 +  @Override
 +  public void initUser(String user) throws AccumuloSecurityException {
 +    return;
 +  }
 +  
-   /* (non-Javadoc)
-    * @see org.apache.accumulo.server.security.handler.PermissionHandler#dropUser(java.lang.String)
-    */
 +  @Override
 +  public void cleanUser(String user) throws AccumuloSecurityException {
 +    return;
 +  }
 +
 +  @Override
 +  public void initTable(String table) throws AccumuloSecurityException {
 +  }
 +  
 +}


Mime
View raw message