accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [36/48] Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Date Tue, 04 Feb 2014 17:55:05 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index e80cd15,0000000..33d7722
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@@ -1,544 -1,0 +1,545 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.log;
 +
 +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
 +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 +import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
 +import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
 +import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 +
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.lang.reflect.Method;
 +import java.nio.channels.ClosedChannelException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.LinkedBlockingQueue;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.security.crypto.CryptoModule;
 +import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 +import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
 +import org.apache.accumulo.core.security.crypto.DefaultCryptoModule;
 +import org.apache.accumulo.core.security.crypto.NoFlushOutputStream;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.StringUtil;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.tserver.TabletMutations;
 +import org.apache.accumulo.tserver.logger.LogFileKey;
 +import org.apache.accumulo.tserver.logger.LogFileValue;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * Wrap a connection to a logger.
 + * 
 + */
 +public class DfsLogger {
 +  // Package private so that LogSorter can find this
 +  static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
 +  static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
 +  
 +  private static Logger log = Logger.getLogger(DfsLogger.class);
 +  
 +  public static class LogClosedException extends IOException {
 +    private static final long serialVersionUID = 1L;
 +    
 +    public LogClosedException() {
 +      super("LogClosed");
 +    }
 +  }
 +  
 +  public static class DFSLoggerInputStreams {
 +    
 +    private FSDataInputStream originalInput;
 +    private DataInputStream decryptingInputStream;
 +
 +    public DFSLoggerInputStreams(FSDataInputStream originalInput, DataInputStream decryptingInputStream) {
 +      this.originalInput = originalInput;
 +      this.decryptingInputStream = decryptingInputStream;
 +    }
 +
 +    public FSDataInputStream getOriginalInput() {
 +      return originalInput;
 +    }
 +
 +    public void setOriginalInput(FSDataInputStream originalInput) {
 +      this.originalInput = originalInput;
 +    }
 +
 +    public DataInputStream getDecryptingInputStream() {
 +      return decryptingInputStream;
 +    }
 +
 +    public void setDecryptingInputStream(DataInputStream decryptingInputStream) {
 +      this.decryptingInputStream = decryptingInputStream;
 +    }
 +  }
 +  
 +  
 +  public interface ServerResources {
 +    AccumuloConfiguration getConfiguration();
 +    
 +    VolumeManager getFileSystem();
 +    
 +    Set<TServerInstance> getCurrentTServers();
 +  }
 +  
 +  private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<DfsLogger.LogWork>();
 +  
 +  private final Object closeLock = new Object();
 +  
 +  private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null);
 +  
 +  private static final LogFileValue EMPTY = new LogFileValue();
 +  
 +  private boolean closed = false;
 +  
 +  private class LogSyncingTask implements Runnable {
 +    
 +    @Override
 +    public void run() {
 +      ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
 +      while (true) {
 +        work.clear();
 +        
 +        try {
 +          work.add(workQueue.take());
 +        } catch (InterruptedException ex) {
 +          continue;
 +        }
 +        workQueue.drainTo(work);
 +        
 +        synchronized (closeLock) {
 +          if (!closed) {
 +            try {
 +              sync.invoke(logFile);
 +            } catch (Exception ex) {
 +              log.warn("Exception syncing " + ex);
 +              for (DfsLogger.LogWork logWork : work) {
 +                logWork.exception = ex;
 +              }
 +            }
 +          } else {
 +            for (DfsLogger.LogWork logWork : work) {
 +              logWork.exception = new LogClosedException();
 +            }
 +          }
 +        }
 +        
 +        boolean sawClosedMarker = false;
 +        for (DfsLogger.LogWork logWork : work)
 +          if (logWork == CLOSED_MARKER)
 +            sawClosedMarker = true;
 +          else
 +            logWork.latch.countDown();
 +        
 +        if (sawClosedMarker) {
 +          synchronized (closeLock) {
 +            closeLock.notifyAll();
 +          }
 +          break;
 +        }
 +      }
 +    }
 +  }
 +  
 +  static class LogWork {
 +    CountDownLatch latch;
 +    volatile Exception exception;
 +
 +    public LogWork(CountDownLatch latch) {
 +      this.latch = latch;
 +    }
 +  }
 +  
 +  public static class LoggerOperation {
 +    private final LogWork work;
 +    
 +    public LoggerOperation(LogWork work) {
 +      this.work = work;
 +    }
 +    
 +    public void await() throws IOException {
 +      try {
 +        work.latch.await();
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +      
 +      if (work.exception != null) {
 +        if (work.exception instanceof IOException)
 +          throw (IOException) work.exception;
 +        else if (work.exception instanceof RuntimeException)
 +          throw (RuntimeException) work.exception;
 +        else
 +          throw new RuntimeException(work.exception);
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  public boolean equals(Object obj) {
 +    // filename is unique
 +    if (obj == null)
 +      return false;
 +    if (obj instanceof DfsLogger)
 +      return getFileName().equals(((DfsLogger) obj).getFileName());
 +    return false;
 +  }
 +  
 +  @Override
 +  public int hashCode() {
 +    // filename is unique
 +    return getFileName().hashCode();
 +  }
 +  
 +  private final ServerResources conf;
 +  private FSDataOutputStream logFile;
 +  private DataOutputStream encryptingLogFile = null;
 +  private Method sync;
 +  private String logPath;
 +  
 +  public DfsLogger(ServerResources conf) throws IOException {
 +    this.conf = conf;
 +  }
 +  
 +  public DfsLogger(ServerResources conf, String filename) throws IOException {
 +    this.conf = conf;
 +    this.logPath = filename;
 +  }
 +  
 +  public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException {
 +    FSDataInputStream input = fs.open(path);
 +    DataInputStream decryptingInput = null;
 +    
 +    byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes();
 +    byte[] magicBuffer = new byte[magic.length];
 +    input.readFully(magicBuffer);
 +    if (Arrays.equals(magicBuffer, magic)) {
 +      // additional parameters it needs from the underlying stream.
 +      String cryptoModuleClassname = input.readUTF();
 +      CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
 +
 +      // Create the parameters and set the input stream into those parameters
 +      CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
 +      params.setEncryptedInputStream(input);
 +
 +      // Create the plaintext input stream from the encrypted one
 +      params = cryptoModule.getDecryptingInputStream(params);
 +
 +      if (params.getPlaintextInputStream() instanceof DataInputStream) {
 +        decryptingInput = (DataInputStream) params.getPlaintextInputStream();
 +      } else {
 +        decryptingInput = new DataInputStream(params.getPlaintextInputStream());
 +      }
 +    } else {
 +      input.seek(0);
 +      byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
 +      byte[] magicBufferV2 = new byte[magic.length];
 +      input.readFully(magicBufferV2);
 +
 +      if (Arrays.equals(magicBufferV2, magicV2)) {
 +        // Log files from 1.5 dump their options in raw to the logger files.  Since we don't know the class
 +        // that needs to read those files, we can make a couple of basic assumptions.  Either it's going to be
 +        // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
 +        
 +        // If it's null, we won't have any parameters whatsoever.  First, let's attempt to read 
 +        // parameters
 +        Map<String,String> opts = new HashMap<String,String>();
 +        int count = input.readInt();
 +        for (int i = 0; i < count; i++) {
 +          String key = input.readUTF();
 +          String value = input.readUTF();
 +          opts.put(key, value);
 +        }
 +        
 +        if (opts.size() == 0) {
 +          // NullCryptoModule, we're done
 +          decryptingInput = input;
 +        } else {
 +          
 +          // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot.
 +          org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
 +              .getCryptoModule(DefaultCryptoModule.class.getName());
 +
 +          CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
 +          
 +          input.seek(0);
 +          input.readFully(magicBuffer);
 +          params.setEncryptedInputStream(input);
 +
 +          params = cryptoModule.getDecryptingInputStream(params);
 +          if (params.getPlaintextInputStream() instanceof DataInputStream) {
 +            decryptingInput = (DataInputStream) params.getPlaintextInputStream();
 +          } else {
 +            decryptingInput = new DataInputStream(params.getPlaintextInputStream());
 +          }
 +        }
 +        
 +      } else {
 +
 +        input.seek(0);
 +        decryptingInput = input;
 +      }
 +
 +    }
 +    return new DFSLoggerInputStreams(input, decryptingInput);
 +  }
 +  
 +  public synchronized void open(String address) throws IOException {
 +    String filename = UUID.randomUUID().toString();
 +    String logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
 +
 +    log.debug("DfsLogger.open() begin");
 +    VolumeManager fs = conf.getFileSystem();
 +    
 +    logPath = fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename;
 +    try {
 +      short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
 +      if (replication == 0)
 +        replication = fs.getDefaultReplication(new Path(logPath));
 +      long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
 +      if (blockSize == 0)
 +        blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
 +      if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
 +        logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize);
 +      else
 +        logFile = fs.create(new Path(logPath), true, 0, replication, blockSize);
 +      
 +      try {
 +        NoSuchMethodException e = null;
 +        try {
 +          // sync: send data to datanodes
 +          sync = logFile.getClass().getMethod("sync");
 +        } catch (NoSuchMethodException ex) {
 +          e = ex;
 +        }
 +        try {
 +          // hsync: send data to datanodes and sync the data to disk
 +          sync = logFile.getClass().getMethod("hsync");
 +          e = null;
 +        } catch (NoSuchMethodException ex) {}
 +        if (e != null)
 +          throw new RuntimeException(e);
 +      } catch (Exception e) {
 +        throw new RuntimeException(e);
 +      }
 +      
 +      // Initialize the crypto operations.
 +      org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf
 +          .getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
 +      
 +      // Initialize the log file with a header and the crypto params used to set up this log file.
-       logFile.write(LOG_FILE_HEADER_V3.getBytes());
++      logFile.write(LOG_FILE_HEADER_V3.getBytes(Constants.UTF8));
 +
 +      CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
 +
 +      params.setPlaintextOutputStream(new NoFlushOutputStream(logFile));
 +
 +      // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here,
 +      // so that that crypto module can re-read its own parameters.
 +
 +      logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
 +
 +      
 +      params = cryptoModule.getEncryptingOutputStream(params);
 +      OutputStream encipheringOutputStream = params.getEncryptedOutputStream();
 +      
 +      // If the module just kicks back our original stream, then just use it, don't wrap it in
 +      // another data OutputStream.
 +      if (encipheringOutputStream == logFile) {
 +        encryptingLogFile = logFile;
 +      } else {
 +        encryptingLogFile = new DataOutputStream(encipheringOutputStream);
 +      }
 +      
 +      LogFileKey key = new LogFileKey();
 +      key.event = OPEN;
 +      key.tserverSession = filename;
 +      key.filename = filename;
 +      write(key, EMPTY);
 +      sync.invoke(logFile);
 +      log.debug("Got new write-ahead log: " + this);
 +    } catch (Exception ex) {
 +      if (logFile != null)
 +        logFile.close();
 +      logFile = null;
 +      encryptingLogFile = null;
 +      throw new IOException(ex);
 +    }
 +    
 +    Thread t = new Daemon(new LogSyncingTask());
 +    t.setName("Accumulo WALog thread " + toString());
 +    t.start();
 +  }
 +  
 +  @Override
 +  public String toString() {
 +    String fileName = getFileName();
 +    if (fileName.contains(":"))
 +      return getLogger() + "/" + getFileName();
 +    return fileName;
 +  }
 +  
 +  public String getFileName() {
 +    return logPath.toString();
 +  }
 +  
 +  public void close() throws IOException {
 +    
 +    synchronized (closeLock) {
 +      if (closed)
 +        return;
 +      // after closed is set to true, nothing else should be added to the queue
 +      // CLOSED_MARKER should be the last thing on the queue, therefore when the
 +      // background thread sees the marker and exits there should be nothing else
 +      // to process... so nothing should be left waiting for the background
 +      // thread to do work
 +      closed = true;
 +      workQueue.add(CLOSED_MARKER);
 +      while (!workQueue.isEmpty())
 +        try {
 +          closeLock.wait();
 +        } catch (InterruptedException e) {
 +          log.info("Interrupted");
 +        }
 +    }
 +
 +    if (encryptingLogFile != null)
 +      try {
 +        logFile.close();
 +      } catch (IOException ex) {
 +        log.error(ex);
 +        throw new LogClosedException();
 +      }
 +  }
 +  
 +  public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException {
 +    // write this log to the METADATA table
 +    final LogFileKey key = new LogFileKey();
 +    key.event = DEFINE_TABLET;
 +    key.seq = seq;
 +    key.tid = tid;
 +    key.tablet = tablet;
 +    try {
 +      write(key, EMPTY);
 +      sync.invoke(logFile);
 +    } catch (Exception ex) {
 +      log.error(ex);
 +      throw new IOException(ex);
 +    }
 +  }
 +  
 +  /**
 +   * @param key
 +   * @param empty2
 +   * @throws IOException
 +   */
 +  private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
 +    key.write(encryptingLogFile);
 +    value.write(encryptingLogFile);
 +    encryptingLogFile.flush();
 +  }
 +  
 +  public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
 +    return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation))));
 +  }
 +  
 +  private LoggerOperation logFileData(List<Pair<LogFileKey, LogFileValue>> keys) throws IOException {
 +    DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1));
 +    synchronized (DfsLogger.this) {
 +      try {
 +        for (Pair<LogFileKey,LogFileValue> pair : keys) {
 +          write(pair.getFirst(), pair.getSecond());
 +        }
 +      } catch (ClosedChannelException ex) {
 +        throw new LogClosedException();
 +      } catch (Exception e) {
 +        log.error(e, e);
 +        work.exception = e;
 +      }
 +    }
 +
 +    synchronized (closeLock) {
 +      // use a different lock for close check so that adding to work queue does not need
 +      // to wait on walog I/O operations
 +      
 +      if (closed)
 +        throw new LogClosedException();
 +      workQueue.add(work);
 +    }
 +    
 +    return new LoggerOperation(work);
 +  }
 +  
 +  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
 +    List<Pair<LogFileKey, LogFileValue>> data = new ArrayList<Pair<LogFileKey, LogFileValue>>();
 +    for (TabletMutations tabletMutations : mutations) {
 +      LogFileKey key = new LogFileKey();
 +      key.event = MANY_MUTATIONS;
 +      key.seq = tabletMutations.getSeq();
 +      key.tid = tabletMutations.getTid();
 +      LogFileValue value = new LogFileValue();
 +      value.mutations = tabletMutations.getMutations();
 +      data.add(new Pair<LogFileKey, LogFileValue>(key, value));
 +    }
 +    return logFileData(data);
 +  }
 +
 +  public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
 +    LogFileKey key = new LogFileKey();
 +    key.event = COMPACTION_FINISH;
 +    key.seq = seq;
 +    key.tid = tid;
 +    return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY)));
 +  }
 +  
 +  public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
 +    LogFileKey key = new LogFileKey();
 +    key.event = COMPACTION_START;
 +    key.seq = seq;
 +    key.tid = tid;
 +    key.filename = fqfn;
 +    return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY)));
 +  }
 +
 +  public String getLogger() {
 +    String parts[] = logPath.split("/");
 +    return StringUtil.join(Arrays.asList(parts[parts.length - 2].split("[+]")), ":");
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
index dbab215,0000000..997f71b
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
@@@ -1,94 -1,0 +1,96 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.logger;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.data.ColumnUpdate;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.server.data.ServerMutation;
 +import org.apache.hadoop.io.Writable;
 +
 +public class LogFileValue implements Writable {
 +  
 +  private static final List<Mutation> empty = Collections.emptyList();
 +  
 +  public List<Mutation> mutations = empty;
 +  
 +  @Override
 +  public void readFields(DataInput in) throws IOException {
 +    int count = in.readInt();
 +    mutations = new ArrayList<Mutation>(count);
 +    for (int i = 0; i < count; i++) {
 +      ServerMutation mutation = new ServerMutation();
 +      mutation.readFields(in);
 +      mutations.add(mutation);
 +    }
 +  }
 +  
 +  @Override
 +  public void write(DataOutput out) throws IOException {
 +    out.writeInt(mutations.size());
 +    for (Mutation m : mutations) {
 +      m.write(out);
 +    }
 +  }
 +  
 +  public static void print(LogFileValue value) {
 +    System.out.println(value.toString());
 +  }
 +  
 +  private static String displayLabels(byte[] labels) {
-     String s = new String(labels);
++    String s = new String(labels, Constants.UTF8);
 +    s = s.replace("&", " & ");
 +    s = s.replace("|", " | ");
 +    return s;
 +  }
 +  
 +  public static String format(LogFileValue lfv, int maxMutations) {
 +    if (lfv.mutations.size() == 0)
 +      return "";
 +    StringBuilder builder = new StringBuilder();
 +    builder.append(lfv.mutations.size() + " mutations:\n");
 +    int i = 0;
 +    for (Mutation m : lfv.mutations) {
 +      if (i++ >= maxMutations) {
 +        builder.append("...");
 +        break;
 +      }
-       builder.append("  " + new String(m.getRow()) + "\n");
++      builder.append("  ").append(new String(m.getRow(), Constants.UTF8)).append("\n");
 +      for (ColumnUpdate update : m.getUpdates()) {
 +        String value = new String(update.getValue());
-         builder.append("      " + new String(update.getColumnFamily()) + ":" + new String(update.getColumnQualifier()) + " "
-             + (update.hasTimestamp() ? "[user]:" : "[system]:") + update.getTimestamp() + " [" + displayLabels(update.getColumnVisibility()) + "] "
-             + (update.isDeleted() ? "<deleted>" : value) + "\n");
++        builder.append("      ").append(new String(update.getColumnFamily(), Constants.UTF8)).append(":")
++                .append(new String(update.getColumnQualifier(), Constants.UTF8)).append(" ").append(update.hasTimestamp() ? "[user]:" : "[system]:")
++                .append(update.getTimestamp()).append(" [").append(displayLabels(update.getColumnVisibility())).append("] ")
++                .append(update.isDeleted() ? "<deleted>" : value).append("\n");
 +      }
 +    }
 +    return builder.toString();
 +  }
 +  
 +  @Override
 +  public String toString() {
 +    return format(this, 5);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 7f723ca,0000000..b32ace9
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@@ -1,170 -1,0 +1,171 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.logger;
 +
 +import java.io.DataInputStream;
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.Help;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.tserver.log.DfsLogger;
 +import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 +import org.apache.accumulo.tserver.log.MultiReader;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +
 +import com.beust.jcommander.JCommander;
 +import com.beust.jcommander.Parameter;
 +
 +public class LogReader {
 +
 +  static class Opts extends Help {
 +    @Parameter(names = "-r", description = "print only mutations associated with the given row")
 +    String row;
 +    @Parameter(names = "-m", description = "limit the number of mutations printed per row")
 +    int maxMutations = 5;
 +    @Parameter(names = "-t", description = "print only mutations that fall within the given key extent")
 +    String extent;
 +    @Parameter(names = "-p", description = "search for a row that matches the given regex")
 +    String regexp;
 +    @Parameter(description = "<logfile> { <logfile> ... }")
 +    List<String> files = new ArrayList<String>();
 +  }
 +
 +  /**
 +   * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
 +   * 
 +   * @param args
 +   *          - first argument is the file to print
 +   * @throws IOException
 +   */
 +  public static void main(String[] args) throws IOException {
 +    Opts opts = new Opts();
 +    opts.parseArgs(LogReader.class.getName(), args);
 +    VolumeManager fs = VolumeManagerImpl.get();
 +
 +    Matcher rowMatcher = null;
 +    KeyExtent ke = null;
 +    Text row = null;
 +    if (opts.files.isEmpty()) {
 +      new JCommander(opts).usage();
 +      return;
 +    }
 +    if (opts.row != null)
 +      row = new Text(opts.row);
 +    if (opts.extent != null) {
 +      String sa[] = opts.extent.split(";");
 +      ke = new KeyExtent(new Text(sa[0]), new Text(sa[1]), new Text(sa[2]));
 +    }
 +    if (opts.regexp != null) {
 +      Pattern pattern = Pattern.compile(opts.regexp);
 +      rowMatcher = pattern.matcher("");
 +    }
 +
 +    Set<Integer> tabletIds = new HashSet<Integer>();
 +
 +    for (String file : opts.files) {
 +
 +      Path path = new Path(file);
 +      LogFileKey key = new LogFileKey();
 +      LogFileValue value = new LogFileValue();
 +
 +      if (fs.isFile(path)) {
 +        // read log entries from a simple hdfs file
 +        @SuppressWarnings("deprecation")
 +        DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getSiteConfiguration());
 +        DataInputStream input = streams.getDecryptingInputStream();
 +
 +        try {
 +          while (true) {
 +            try {
 +              key.readFields(input);
 +              value.readFields(input);
 +            } catch (EOFException ex) {
 +              break;
 +            }
 +            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
 +          }
 +        } finally {
 +          input.close();
 +        }
 +      } else {
 +        // read the log entries sorted in a map file
 +        MultiReader input = new MultiReader(fs, path);
 +        while (input.next(key, value)) {
 +          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
 +        }
 +      }
 +    }
 +  }
 +
 +  public static void printLogEvent(LogFileKey key, LogFileValue value, Text row, Matcher rowMatcher, KeyExtent ke, Set<Integer> tabletIds, int maxMutations) {
 +
 +    if (ke != null) {
 +      if (key.event == LogEvents.DEFINE_TABLET) {
 +        if (key.tablet.equals(ke)) {
 +          tabletIds.add(key.tid);
 +        } else {
 +          return;
 +        }
 +      } else if (!tabletIds.contains(key.tid)) {
 +        return;
 +      }
 +    }
 +
 +    if (row != null || rowMatcher != null) {
 +      if (key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) {
 +        boolean found = false;
 +        for (Mutation m : value.mutations) {
 +          if (row != null && new Text(m.getRow()).equals(row)) {
 +            found = true;
 +            break;
 +          }
 +
 +          if (rowMatcher != null) {
-             rowMatcher.reset(new String(m.getRow()));
++            rowMatcher.reset(new String(m.getRow(), Constants.UTF8));
 +            if (rowMatcher.matches()) {
 +              found = true;
 +              break;
 +            }
 +          }
 +        }
 +
 +        if (!found)
 +          return;
 +      } else {
 +        return;
 +      }
 +
 +    }
 +
 +    System.out.println(key);
 +    System.out.println(LogFileValue.format(value, maxMutations));
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
index 8612c0c,0000000..142f171
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
@@@ -1,88 -1,0 +1,88 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.metrics;
 +
 +import javax.management.ObjectName;
 +
 +import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
 +
 +public class TabletServerScanMetrics extends AbstractMetricsImpl implements TabletServerScanMetricsMBean {
 +  
 +  static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(TabletServerScanMetrics.class);
 +  
 +  public static final String METRICS_PREFIX = "tserver.scan";
 +  
-   public static ObjectName OBJECT_NAME = null;
++  static ObjectName OBJECT_NAME = null;
 +  
 +  public TabletServerScanMetrics() {
 +    super();
 +    reset();
 +    try {
 +      OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerScanMetricsMBean,instance=" + Thread.currentThread().getName());
 +    } catch (Exception e) {
 +      log.error("Exception setting MBean object name", e);
 +    }
 +  }
 +  
 +  @Override
 +  protected ObjectName getObjectName() {
 +    return OBJECT_NAME;
 +  }
 +  
 +  @Override
 +  protected String getMetricsPrefix() {
 +    return METRICS_PREFIX;
 +  }
 +  
 +  public long getResultAvgSize() {
 +    return this.getMetricAvg(resultSize);
 +  }
 +  
 +  public long getResultCount() {
 +    return this.getMetricCount(resultSize);
 +  }
 +  
 +  public long getResultMaxSize() {
 +    return this.getMetricMax(resultSize);
 +  }
 +  
 +  public long getResultMinSize() {
 +    return this.getMetricMin(resultSize);
 +  }
 +  
 +  public long getScanAvgTime() {
 +    return this.getMetricAvg(scan);
 +  }
 +  
 +  public long getScanCount() {
 +    return this.getMetricCount(scan);
 +  }
 +  
 +  public long getScanMaxTime() {
 +    return this.getMetricMax(scan);
 +  }
 +  
 +  public long getScanMinTime() {
 +    return this.getMetricMin(scan);
 +  }
 +  
 +  public void reset() {
 +    createMetric(scan);
 +    createMetric(resultSize);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/NativeMapConcurrencyTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
index bf3f883,f48b8cf..a679c48
--- a/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java
@@@ -70,98 -71,71 +71,98 @@@ public class TestBinaryRows 
      return l;
    }
    
 -  static class Opts extends ClientOnRequiredTable {
 +  public static class Opts extends ClientOnRequiredTable {
      @Parameter(names="--mode", description="either 'ingest', 'delete', 'randomLookups', 'split', 'verify', 'verifyDeleted'", required=true)
 -    String mode;
 +    public String mode;
      @Parameter(names="--start", description="the lowest numbered row")
 -    long start = 0;
 +    public long start = 0;
      @Parameter(names="--count", description="number of rows to ingest", required=true)
 -    long num = 0;
 +    public long num = 0;
    }
    
 -  public static void main(String[] args) {
 -    Opts opts = new Opts();
 -    BatchWriterOpts bwOpts = new BatchWriterOpts();
 -    ScannerOpts scanOpts = new ScannerOpts();
 -    opts.parseArgs(TestBinaryRows.class.getName(), args, scanOpts, bwOpts);
 +  public static void runTest(Connector connector, Opts opts, BatchWriterOpts bwOpts, ScannerOpts scanOpts) throws Exception {
      
 -    try {
 -      Connector connector = opts.getConnector();
 -      
 -      final Text CF = new Text("cf"), CQ = new Text("cq");
 -      final byte[] CF_BYTES = "cf".getBytes(Constants.UTF8), CQ_BYTES = "cq".getBytes(Constants.UTF8);
++    final Text CF = new Text("cf"), CQ = new Text("cq");
++    final byte[] CF_BYTES = "cf".getBytes(Constants.UTF8), CQ_BYTES = "cq".getBytes(Constants.UTF8);
 +    if (opts.mode.equals("ingest") || opts.mode.equals("delete")) {
 +      BatchWriter bw = connector.createBatchWriter(opts.tableName, bwOpts.getBatchWriterConfig());
 +      boolean delete = opts.mode.equals("delete");
        
 -      if (opts.mode.equals("ingest") || opts.mode.equals("delete")) {
 -        BatchWriter bw = connector.createBatchWriter(opts.tableName, bwOpts.getBatchWriterConfig());
 -        boolean delete = opts.mode.equals("delete");
 -        
 -        for (long i = 0; i < opts.num; i++) {
 -          byte[] row = encodeLong(i + opts.start);
 -          String value = "" + (i + opts.start);
 -          
 -          Mutation m = new Mutation(new Text(row));
 -          if (delete) {
 -            m.putDelete(CF, CQ);
 -          } else {
 -            m.put(CF, CQ, new Value(value.getBytes(Constants.UTF8)));
 -          }
 -          bw.addMutation(m);
 +      for (long i = 0; i < opts.num; i++) {
 +        byte[] row = encodeLong(i + opts.start);
 +        String value = "" + (i + opts.start);
 +        
 +        Mutation m = new Mutation(new Text(row));
 +        if (delete) {
-           m.putDelete(new Text("cf"), new Text("cq"));
++          m.putDelete(CF, CQ);
 +        } else {
-           m.put(new Text("cf"), new Text("cq"), new Value(value.getBytes()));
++          m.put(CF, CQ, new Value(value.getBytes(Constants.UTF8)));
          }
 -        
 -        bw.close();
 -      } else if (opts.mode.equals("verifyDeleted")) {
 -        Scanner s = connector.createScanner(opts.tableName, opts.auths);
 -        s.setBatchSize(scanOpts.scanBatchSize);
 -        Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE);
 -        Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, CQ_BYTES, new byte[0], 0);
 -        s.setBatchSize(50000);
 -        s.setRange(new Range(startKey, stopKey));
 -        
 -        for (Entry<Key,Value> entry : s) {
 -          System.err.println("ERROR : saw entries in range that should be deleted ( first value : " + entry.getValue().toString() + ")");
 -          System.err.println("exiting...");
 -          System.exit(1);
 -        }
 -        
 -      } else if (opts.mode.equals("verify")) {
 -        long t1 = System.currentTimeMillis();
 +        bw.addMutation(m);
 +      }
 +      
 +      bw.close();
 +    } else if (opts.mode.equals("verifyDeleted")) {
 +      Scanner s = connector.createScanner(opts.tableName, opts.auths);
 +      s.setBatchSize(scanOpts.scanBatchSize);
-       Key startKey = new Key(encodeLong(opts.start), "cf".getBytes(), "cq".getBytes(), new byte[0], Long.MAX_VALUE);
-       Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), "cf".getBytes(), "cq".getBytes(), new byte[0], 0);
++      Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE);
++      Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, CQ_BYTES, new byte[0], 0);
 +      s.setBatchSize(50000);
 +      s.setRange(new Range(startKey, stopKey));
 +      
 +      for (Entry<Key,Value> entry : s) {
 +        throw new Exception("ERROR : saw entries in range that should be deleted ( first value : " + entry.getValue().toString() + ")");
 +      }
 +      
 +    } else if (opts.mode.equals("verify")) {
 +      long t1 = System.currentTimeMillis();
 +      
 +      Scanner s = connector.createScanner(opts.tableName, opts.auths);
-       Key startKey = new Key(encodeLong(opts.start), "cf".getBytes(), "cq".getBytes(), new byte[0], Long.MAX_VALUE);
-       Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), "cf".getBytes(), "cq".getBytes(), new byte[0], 0);
++      Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE);
++      Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, CQ_BYTES, new byte[0], 0);
 +      s.setBatchSize(scanOpts.scanBatchSize);
 +      s.setRange(new Range(startKey, stopKey));
 +      
 +      long i = opts.start;
 +      
 +      for (Entry<Key,Value> e : s) {
 +        Key k = e.getKey();
 +        Value v = e.getValue();
-         
-         // System.out.println("v = "+v);
-         
++
 +        checkKeyValue(i, k, v);
-         
++
 +        i++;
 +      }
 +      
 +      if (i != opts.start + opts.num) {
 +        throw new Exception("ERROR : did not see expected number of rows, saw " + (i - opts.start) + " expected " + opts.num);
 +      }
 +      
 +      long t2 = System.currentTimeMillis();
 +      
 +      System.out.printf("time : %9.2f secs%n", ((t2 - t1) / 1000.0));
 +      System.out.printf("rate : %9.2f entries/sec%n", opts.num / ((t2 - t1) / 1000.0));
 +      
 +    } else if (opts.mode.equals("randomLookups")) {
 +      int numLookups = 1000;
 +      
 +      Random r = new Random();
 +      
 +      long t1 = System.currentTimeMillis();
 +      
 +      for (int i = 0; i < numLookups; i++) {
 +        long row = ((r.nextLong() & 0x7fffffffffffffffl) % opts.num) + opts.start;
          
          Scanner s = connector.createScanner(opts.tableName, opts.auths);
 -        Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE);
 -        Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, CQ_BYTES, new byte[0], 0);
          s.setBatchSize(scanOpts.scanBatchSize);
-         Key startKey = new Key(encodeLong(row), "cf".getBytes(), "cq".getBytes(), new byte[0], Long.MAX_VALUE);
-         Key stopKey = new Key(encodeLong(row), "cf".getBytes(), "cq".getBytes(), new byte[0], 0);
++        Key startKey = new Key(encodeLong(row), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE);
++        Key stopKey = new Key(encodeLong(row), CF_BYTES, CQ_BYTES, new byte[0], 0);
          s.setRange(new Range(startKey, stopKey));
          
 -        long i = opts.start;
 +        Iterator<Entry<Key,Value>> si = s.iterator();
          
 -        for (Entry<Key,Value> e : s) {
 +        if (si.hasNext()) {
 +          Entry<Key,Value> e = si.next();
            Key k = e.getKey();
            Value v = e.getValue();
            

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/TestIngest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TestIngest.java
index 1efd872,33c3b0c..bc3ff05
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@@ -22,9 -21,8 +22,10 @@@ import java.util.Random
  import java.util.Set;
  import java.util.TreeSet;
  
+ import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
  import org.apache.accumulo.core.client.BatchWriter;
  import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.Instance;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
index 1458065,46174b5..2a6d9ae
--- a/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java
@@@ -17,9 -17,9 +17,10 @@@
  package org.apache.accumulo.test;
  
  import java.util.ArrayList;
 +import java.util.List;
  import java.util.Map.Entry;
  
+ import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.cli.BatchWriterOpts;
  import org.apache.accumulo.core.cli.ScannerOpts;
  import org.apache.accumulo.core.client.AccumuloException;
@@@ -96,8 -96,8 +97,8 @@@ public class TestMultiTableIngest 
        
        // populate
        for (int i = 0; i < opts.count; i++) {
 -        Mutation m = new Mutation(new Text(String.format("%05d", i)));
 +        Mutation m = new Mutation(new Text(String.format("%06d", i)));
-         m.put(new Text("col" + Integer.toString((i % 3) + 1)), new Text("qual"), new Value("junk".getBytes()));
+         m.put(new Text("col" + Integer.toString((i % 3) + 1)), new Text("qual"), new Value("junk".getBytes(Constants.UTF8)));
          b.getBatchWriter(tableNames.get(i % tableNames.size())).addMutation(m);
        }
        try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
index 7d710fa,fe30965..4ecd6f9
--- a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
@@@ -21,9 -21,8 +21,10 @@@ import java.util.Iterator
  import java.util.Map.Entry;
  import java.util.Random;
  
+ import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
  import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.Instance;
  import org.apache.accumulo.core.client.Scanner;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
index bc63858,4032dfa..ea54aa4
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java
@@@ -26,6 -26,9 +26,7 @@@ import java.util.Map.Entry
  import java.util.Random;
  import java.util.zip.CRC32;
  
 -import org.apache.accumulo.trace.instrument.Span;
 -import org.apache.accumulo.trace.instrument.Trace;
+ import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.Scanner;
  import org.apache.accumulo.core.data.Key;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
index 9af578c,a9b072e..cb17340
--- a/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java
@@@ -103,10 -101,10 +104,10 @@@ public class MetadataBatchScanTest 
          
          String dir = "/t-" + UUID.randomUUID();
          
-         TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut, new Value(dir.getBytes()));
 -        Constants.METADATA_DIRECTORY_COLUMN.put(mut, new Value(dir.getBytes(Constants.UTF8)));
++        TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut, new Value(dir.getBytes(Constants.UTF8)));
          
          for (int i = 0; i < 5; i++) {
-           mut.put(DataFileColumnFamily.NAME, new Text(dir + "/00000_0000" + i + ".map"), new Value("10000,1000000".getBytes()));
 -          mut.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(dir + "/00000_0000" + i + ".map"), new Value("10000,1000000".getBytes(Constants.UTF8)));
++          mut.put(DataFileColumnFamily.NAME, new Text(dir + "/00000_0000" + i + ".map"), new Value("10000,1000000".getBytes(Constants.UTF8)));
          }
          
          bw.addMutation(mut);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java
index 2e52f12,a2bad7c..f002274
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java
@@@ -50,10 -51,10 +51,10 @@@ public class BatchWrite extends Test 
        try {
          int numRows = rand.nextInt(100000);
          for (int i = 0; i < numRows; i++) {
 -          Mutation m = new Mutation(String.format("%016x", (rand.nextLong() & 0x7fffffffffffffffl)));
 -          long val = (rand.nextLong() & 0x7fffffffffffffffl);
 +          Mutation m = new Mutation(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffl));
 +          long val = rand.nextLong() & 0x7fffffffffffffffl;
            for (int j = 0; j < 10; j++) {
-             m.put("cf", "cq" + j, new Value(String.format("%016x", val).getBytes()));
+             m.put("cf", "cq" + j, new Value(String.format("%016x", val).getBytes(Constants.UTF8)));
            }
            
            bw.addMutation(m);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
index 9012c10,de4961e..c8da3d8
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
@@@ -47,8 -48,8 +47,8 @@@ public class StopTabletServer extends T
            Collections.sort(children);
            Stat stat = new Stat();
            byte[] data = rdr.getData(base + "/" + child + "/" + children.get(0), stat);
-           if (!"master".equals(new String(data))) {
+           if (!"master".equals(new String(data, Constants.UTF8))) {
 -            result.add(new TServerInstance(AddressUtil.parseAddress(child, Property.TSERV_CLIENTPORT), stat.getEphemeralOwner()));
 +            result.add(new TServerInstance(AddressUtil.parseAddress(child, false), stat.getEphemeralOwner()));
            }
          }
        } catch (KeeperException.NoNodeException ex) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/image/Write.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/security/SecurityHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/security/WalkingSecurity.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerifyTool.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/Write.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/shard/BulkInsert.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/main/java/org/apache/accumulo/test/randomwalk/shard/ShardFixture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/test/java/org/apache/accumulo/test/functional/AddSplitIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/AddSplitIT.java
index e4760ba,0000000..442d21e
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/AddSplitIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/AddSplitIT.java
@@@ -1,134 -1,0 +1,135 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.Map.Entry;
 +import java.util.TreeSet;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +public class AddSplitIT extends SimpleMacIT {
 +
 +  @Test(timeout = 60 * 1000)
 +  public void addSplitTest() throws Exception {
 +
 +    String tableName = getTableNames(1)[0];
 +    Connector c = getConnector();
 +    c.tableOperations().create(tableName);
 +
 +    insertData(tableName, 1l);
 +
 +    TreeSet<Text> splits = new TreeSet<Text>();
 +    splits.add(new Text(String.format("%09d", 333)));
 +    splits.add(new Text(String.format("%09d", 666)));
 +
 +    c.tableOperations().addSplits(tableName, splits);
 +
 +    UtilWaitThread.sleep(100);
 +
 +    Collection<Text> actualSplits = c.tableOperations().listSplits(tableName);
 +
 +    if (!splits.equals(new TreeSet<Text>(actualSplits))) {
 +      throw new Exception(splits + " != " + actualSplits);
 +    }
 +
 +    verifyData(tableName, 1l);
 +    insertData(tableName, 2l);
 +
 +    // did not clear splits on purpose, it should ignore existing split points
 +    // and still create the three additional split points
 +
 +    splits.add(new Text(String.format("%09d", 200)));
 +    splits.add(new Text(String.format("%09d", 500)));
 +    splits.add(new Text(String.format("%09d", 800)));
 +
 +    c.tableOperations().addSplits(tableName, splits);
 +
 +    UtilWaitThread.sleep(100);
 +
 +    actualSplits = c.tableOperations().listSplits(tableName);
 +
 +    if (!splits.equals(new TreeSet<Text>(actualSplits))) {
 +      throw new Exception(splits + " != " + actualSplits);
 +    }
 +
 +    verifyData(tableName, 2l);
 +  }
 +
 +  private void verifyData(String tableName, long ts) throws Exception {
 +    Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
 +
 +    Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +
 +    for (int i = 0; i < 10000; i++) {
 +      if (!iter.hasNext()) {
 +        throw new Exception("row " + i + " not found");
 +      }
 +
 +      Entry<Key,Value> entry = iter.next();
 +
 +      String row = String.format("%09d", i);
 +
 +      if (!entry.getKey().getRow().equals(new Text(row))) {
 +        throw new Exception("unexpected row " + entry.getKey() + " " + i);
 +      }
 +
 +      if (entry.getKey().getTimestamp() != ts) {
 +        throw new Exception("unexpected ts " + entry.getKey() + " " + ts);
 +      }
 +
 +      if (Integer.parseInt(entry.getValue().toString()) != i) {
 +        throw new Exception("unexpected value " + entry + " " + i);
 +      }
 +    }
 +
 +    if (iter.hasNext()) {
 +      throw new Exception("found more than expected " + iter.next());
 +    }
 +
 +  }
 +
 +  private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException {
 +    BatchWriter bw = getConnector().createBatchWriter(tableName, null);
 +
 +    for (int i = 0; i < 10000; i++) {
 +      String row = String.format("%09d", i);
 +
 +      Mutation m = new Mutation(new Text(row));
-       m.put(new Text("cf1"), new Text("cq1"), ts, new Value(("" + i).getBytes()));
++      m.put(new Text("cf1"), new Text("cq1"), ts, new Value(Integer.toString(i).getBytes(Constants.UTF8)));
 +      bw.addMutation(m);
 +    }
 +
 +    bw.close();
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/test/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
index 8de84f2,0000000..ac425b3
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
@@@ -1,108 -1,0 +1,110 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.util.EnumSet;
 +import java.util.Map.Entry;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +public class BadIteratorMincIT extends SimpleMacIT {
 +
 +  @Test(timeout = 60 * 1000)
 +  public void test() throws Exception {
 +    Connector c = getConnector();
 +
 +    String tableName = getTableNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    IteratorSetting is = new IteratorSetting(30, BadIterator.class);
 +    c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc));
 +    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +
 +    Mutation m = new Mutation(new Text("r1"));
-     m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes()));
++    m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(Constants.UTF8)));
++
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    c.tableOperations().flush(tableName, null, null, false);
 +    UtilWaitThread.sleep(1000);
 +
 +    // minc should fail, so there should be no files
 +    FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 0, 0);
 +
 +    // try to scan table
 +    Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
 +
 +    int count = 0;
 +    for (@SuppressWarnings("unused")
 +    Entry<Key,Value> entry : scanner) {
 +      count++;
 +    }
 +
 +    if (count != 1)
 +      throw new Exception("Did not see expected # entries " + count);
 +
 +    // remove the bad iterator
 +    c.tableOperations().removeIterator(tableName, BadIterator.class.getSimpleName(), EnumSet.of(IteratorScope.minc));
 +
 +    UtilWaitThread.sleep(5000);
 +
 +    // minc should complete
 +    FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 1, 1);
 +
 +    count = 0;
 +    for (@SuppressWarnings("unused")
 +    Entry<Key,Value> entry : scanner) {
 +      count++;
 +    }
 +
 +    if (count != 1)
 +      throw new Exception("Did not see expected # entries " + count);
 +
 +    // now try putting bad iterator back and deleting the table
 +    c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc));
 +    bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +    m = new Mutation(new Text("r2"));
-     m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes()));
++    m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(Constants.UTF8)));
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    // make sure property is given time to propagate
 +    UtilWaitThread.sleep(500);
 +
 +    c.tableOperations().flush(tableName, null, null, false);
 +
 +    // make sure the flush has time to start
 +    UtilWaitThread.sleep(1000);
 +
 +    // this should not hang
 +    c.tableOperations().delete(tableName);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
index 0afd651,0000000..faf8777
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
@@@ -1,118 -1,0 +1,119 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +public class BatchScanSplitIT extends ConfigurableMacIT {
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg) {
 +    cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_MAJC_DELAY.getKey(), "0"));
 +  }
 +
 +  @Test(timeout = 2 * 60 * 1000)
 +  public void test() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getTableNames(1)[0];
 +    c.tableOperations().create(tableName);
 +
 +    int numRows = 1 << 18;
 +
 +    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
 +
 +    for (int i = 0; i < numRows; i++) {
 +      Mutation m = new Mutation(new Text(String.format("%09x", i)));
-       m.put(new Text("cf1"), new Text("cq1"), new Value(String.format("%016x", numRows - i).getBytes()));
++      m.put(new Text("cf1"), new Text("cq1"), new Value(String.format("%016x", numRows - i).getBytes(Constants.UTF8)));
 +      bw.addMutation(m);
 +    }
 +
 +    bw.close();
 +
 +    getConnector().tableOperations().flush(tableName, null, null, true);
 +
 +    getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "4K");
 +
 +    Collection<Text> splits = getConnector().tableOperations().listSplits(tableName);
 +    while (splits.size() < 2) {
 +      UtilWaitThread.sleep(1);
 +      splits = getConnector().tableOperations().listSplits(tableName);
 +    }
 +
 +    System.out.println("splits : " + splits);
 +
 +    Random random = new Random(19011230);
 +    HashMap<Text,Value> expected = new HashMap<Text,Value>();
 +    ArrayList<Range> ranges = new ArrayList<Range>();
 +    for (int i = 0; i < 100; i++) {
 +      int r = random.nextInt(numRows);
 +      Text row = new Text(String.format("%09x", r));
-       expected.put(row, new Value(String.format("%016x", numRows - r).getBytes()));
++      expected.put(row, new Value(String.format("%016x", numRows - r).getBytes(Constants.UTF8)));
 +      ranges.add(new Range(row));
 +    }
 +
 +    // logger.setLevel(Level.TRACE);
 +
 +    HashMap<Text,Value> found = new HashMap<Text,Value>();
 +
 +    for (int i = 0; i < 20; i++) {
 +      BatchScanner bs = getConnector().createBatchScanner(tableName, Authorizations.EMPTY, 4);
 +
 +      found.clear();
 +
 +      long t1 = System.currentTimeMillis();
 +
 +      bs.setRanges(ranges);
 +
 +      for (Entry<Key,Value> entry : bs) {
 +        found.put(entry.getKey().getRow(), entry.getValue());
 +      }
 +      bs.close();
 +
 +      long t2 = System.currentTimeMillis();
 +
 +      log.info(String.format("rate : %06.2f%n", ranges.size() / ((t2 - t1) / 1000.0)));
 +
 +      if (!found.equals(expected))
 +        throw new Exception("Found and expected differ " + found + " " + expected);
 +    }
 +
 +    splits = getConnector().tableOperations().listSplits(tableName);
 +    log.info("splits : " + splits);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
index 3458de0,0000000..e455075
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@@ -1,175 -1,0 +1,176 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.util.Iterator;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.concurrent.TimeUnit;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +public class BatchWriterFlushIT extends SimpleMacIT {
 +
 +  private static final int NUM_TO_FLUSH = 100000;
 +
 +  @Test(timeout = 90 * 1000)
 +  public void run() throws Exception {
 +    Connector c = getConnector();
 +    String[] tableNames = getTableNames(2);
 +    String bwft = tableNames[0];
 +    c.tableOperations().create(bwft);
 +    String bwlt = tableNames[1];
 +    c.tableOperations().create(bwlt);
 +    runFlushTest(bwft);
 +    runLatencyTest(bwlt);
 +
 +  }
 +
 +  private void runLatencyTest(String tableName) throws Exception {
 +    // should automatically flush after 2 seconds
 +    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig().setMaxLatency(1000, TimeUnit.MILLISECONDS));
 +    Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
 +
 +    Mutation m = new Mutation(new Text(String.format("r_%10d", 1)));
-     m.put(new Text("cf"), new Text("cq"), new Value(("" + 1).getBytes()));
++    m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(Constants.UTF8)));
 +    bw.addMutation(m);
 +
 +    UtilWaitThread.sleep(500);
 +
 +    int count = 0;
 +    for (@SuppressWarnings("unused")
 +    Entry<Key,Value> entry : scanner) {
 +      count++;
 +    }
 +
 +    if (count != 0) {
 +      throw new Exception("Flushed too soon");
 +    }
 +
 +    UtilWaitThread.sleep(1500);
 +
 +    for (@SuppressWarnings("unused")
 +    Entry<Key,Value> entry : scanner) {
 +      count++;
 +    }
 +
 +    if (count != 1) {
 +      throw new Exception("Did not flush");
 +    }
 +
 +    bw.close();
 +  }
 +
 +  private void runFlushTest(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException,
 +      Exception {
 +    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
 +    Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
 +    Random r = new Random();
 +
 +    for (int i = 0; i < 4; i++) {
 +      for (int j = 0; j < NUM_TO_FLUSH; j++) {
 +        int row = i * NUM_TO_FLUSH + j;
 +
 +        Mutation m = new Mutation(new Text(String.format("r_%10d", row)));
 +        m.put(new Text("cf"), new Text("cq"), new Value(("" + row).getBytes()));
 +        bw.addMutation(m);
 +      }
 +
 +      bw.flush();
 +
 +      // do a few random lookups into the data just flushed
 +
 +      for (int k = 0; k < 10; k++) {
 +        int rowToLookup = r.nextInt(NUM_TO_FLUSH) + i * NUM_TO_FLUSH;
 +
 +        scanner.setRange(new Range(new Text(String.format("r_%10d", rowToLookup))));
 +
 +        Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +
 +        if (!iter.hasNext())
 +          throw new Exception(" row " + rowToLookup + " not found after flush");
 +
 +        Entry<Key,Value> entry = iter.next();
 +
 +        if (iter.hasNext())
 +          throw new Exception("Scanner returned too much");
 +
 +        verifyEntry(rowToLookup, entry);
 +      }
 +
 +      // scan all data just flushed
 +      scanner.setRange(new Range(new Text(String.format("r_%10d", i * NUM_TO_FLUSH)), true, new Text(String.format("r_%10d", (i + 1) * NUM_TO_FLUSH)), false));
 +      Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +
 +      for (int j = 0; j < NUM_TO_FLUSH; j++) {
 +        int row = i * NUM_TO_FLUSH + j;
 +
 +        if (!iter.hasNext())
 +          throw new Exception("Scan stopped permaturely at " + row);
 +
 +        Entry<Key,Value> entry = iter.next();
 +
 +        verifyEntry(row, entry);
 +      }
 +
 +      if (iter.hasNext())
 +        throw new Exception("Scanner returned too much");
 +
 +    }
 +
 +    bw.close();
 +
 +    // test adding a mutation to a closed batch writer
 +    boolean caught = false;
 +    try {
 +      bw.addMutation(new Mutation(new Text("foobar")));
 +    } catch (IllegalStateException ise) {
 +      caught = true;
 +    }
 +
 +    if (!caught) {
 +      throw new Exception("Adding to closed batch writer did not fail");
 +    }
 +  }
 +
 +  private void verifyEntry(int row, Entry<Key,Value> entry) throws Exception {
 +    if (!entry.getKey().getRow().toString().equals(String.format("r_%10d", row))) {
 +      throw new Exception("Unexpected key returned, expected " + row + " got " + entry.getKey());
 +    }
 +
 +    if (!entry.getValue().toString().equals("" + row)) {
 +      throw new Exception("Unexpected value, expected " + row + " got " + entry.getValue());
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java
index 3a0eeda,0000000..5d68155
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java
@@@ -1,114 -1,0 +1,115 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.util.Iterator;
 +import java.util.Map.Entry;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.file.FileSKVWriter;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.file.rfile.RFile;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.trace.TraceFileSystem;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +public class BulkFileIT extends SimpleMacIT {
 +
 +  @Test(timeout = 2 * 60 * 1000)
 +  public void testBulkFile() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getTableNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    SortedSet<Text> splits = new TreeSet<Text>();
 +    for (String split : "0333 0666 0999 1333 1666".split(" "))
 +      splits.add(new Text(split));
 +    c.tableOperations().addSplits(tableName, splits);
 +    Configuration conf = new Configuration();
 +    AccumuloConfiguration aconf = ServerConfiguration.getDefaultConfiguration();
 +    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, aconf));
 +
 +    String dir = rootPath() + "/bulk_test_diff_files_89723987592_" + getTableNames(1)[0];
 +
 +    fs.delete(new Path(dir), true);
 +
 +    FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, aconf);
 +    writer1.startDefaultLocalityGroup();
 +    writeData(writer1, 0, 333);
 +    writer1.close();
 +
 +    FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, aconf);
 +    writer2.startDefaultLocalityGroup();
 +    writeData(writer2, 334, 999);
 +    writer2.close();
 +
 +    FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, aconf);
 +    writer3.startDefaultLocalityGroup();
 +    writeData(writer3, 1000, 1999);
 +    writer3.close();
 +
 +    FunctionalTestUtils.bulkImport(c, fs, tableName, dir);
 +
 +    FunctionalTestUtils.checkRFiles(c, tableName, 6, 6, 1, 1);
 +
 +    verifyData(tableName, 0, 1999);
 +
 +  }
 +
 +  private void verifyData(String table, int s, int e) throws Exception {
 +    Scanner scanner = getConnector().createScanner(table, Authorizations.EMPTY);
 +
 +    Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +
 +    for (int i = s; i <= e; i++) {
 +      if (!iter.hasNext())
 +        throw new Exception("row " + i + " not found");
 +
 +      Entry<Key,Value> entry = iter.next();
 +
 +      String row = String.format("%04d", i);
 +
 +      if (!entry.getKey().getRow().equals(new Text(row)))
 +        throw new Exception("unexpected row " + entry.getKey() + " " + i);
 +
 +      if (Integer.parseInt(entry.getValue().toString()) != i)
 +        throw new Exception("unexpected value " + entry + " " + i);
 +    }
 +
 +    if (iter.hasNext())
 +      throw new Exception("found more than expected " + iter.next());
 +  }
 +
 +  private void writeData(FileSKVWriter w, int s, int e) throws Exception {
 +    for (int i = s; i <= e; i++) {
-       w.append(new Key(new Text(String.format("%04d", i))), new Value(("" + i).getBytes()));
++      w.append(new Key(new Text(String.format("%04d", i))), new Value(Integer.toString(i).getBytes(Constants.UTF8)));
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java
index 305d300,0000000..c09216f
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java
@@@ -1,146 -1,0 +1,147 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.Map.Entry;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +public class ConcurrencyIT extends ConfigurableMacIT {
 +  
 +  static class ScanTask extends Thread {
 +    
 +    int count = 0;
 +    Scanner scanner;
 +    
 +    ScanTask(Connector conn, long time) throws Exception {
 +      scanner = conn.createScanner("cct", Authorizations.EMPTY);
 +      IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class);
 +      SlowIterator.setSleepTime(slow, time);
 +      scanner.addScanIterator(slow);
 +    }
 +    
 +    @Override
 +    public void run() {
 +      for (@SuppressWarnings("unused")
 +      Entry<Key,Value> entry : scanner) {
 +        count++;
 +      }
 +      
 +    }
 +    
 +  }
 +  
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg) {
 +    cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_MAJC_DELAY.getKey(), "1"));
 +  }
 +  
 +  /*
 +   * Below is a diagram of the operations in this test over time.
 +   * 
 +   * Scan 0 |------------------------------| Scan 1 |----------| Minc 1 |-----| Scan 2 |----------| Scan 3 |---------------| Minc 2 |-----| Majc 1 |-----|
 +   */
 +  
 +  @Test(timeout = 2 * 60 * 1000)
 +  public void run() throws Exception {
 +    Connector c = getConnector();
 +    runTest(c);
 +  }
 +
 +  static void runTest(Connector c) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException,
 +      MutationsRejectedException, Exception, InterruptedException {
 +    c.tableOperations().create("cct");
 +    IteratorSetting is = new IteratorSetting(10, SlowIterator.class);
 +    SlowIterator.setSleepTime(is, 50);
 +    c.tableOperations().attachIterator("cct", is, EnumSet.of(IteratorScope.minc, IteratorScope.majc));
 +    c.tableOperations().setProperty("cct", Property.TABLE_MAJC_RATIO.getKey(), "1.0");
 +    
 +    BatchWriter bw = c.createBatchWriter("cct", new BatchWriterConfig());
 +    for (int i = 0; i < 50; i++) {
 +      Mutation m = new Mutation(new Text(String.format("%06d", i)));
-       m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes()));
++      m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(Constants.UTF8)));
 +      bw.addMutation(m);
 +    }
 +    bw.flush();
 +    
 +    ScanTask st0 = new ScanTask(c, 300);
 +    st0.start();
 +    
 +    ScanTask st1 = new ScanTask(c, 100);
 +    st1.start();
 +    
 +    UtilWaitThread.sleep(50);
 +    c.tableOperations().flush("cct", null, null, true);
 +    
 +    for (int i = 0; i < 50; i++) {
 +      Mutation m = new Mutation(new Text(String.format("%06d", i)));
-       m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes()));
++      m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(Constants.UTF8)));
 +      bw.addMutation(m);
 +    }
 +    
 +    bw.flush();
 +    
 +    ScanTask st2 = new ScanTask(c, 100);
 +    st2.start();
 +    
 +    st1.join();
 +    st2.join();
 +    if (st1.count != 50)
 +      throw new Exception("Thread 1 did not see 50, saw " + st1.count);
 +    
 +    if (st2.count != 50)
 +      throw new Exception("Thread 2 did not see 50, saw " + st2.count);
 +    
 +    ScanTask st3 = new ScanTask(c, 150);
 +    st3.start();
 +    
 +    UtilWaitThread.sleep(50);
 +    c.tableOperations().flush("cct", null, null, false);
 +    
 +    st3.join();
 +    if (st3.count != 50)
 +      throw new Exception("Thread 3 did not see 50, saw " + st3.count);
 +    
 +    st0.join();
 +    if (st0.count != 50)
 +      throw new Exception("Thread 0 did not see 50, saw " + st0.count);
 +    
 +    bw.close();
 +  }
 +}


Mime
View raw message