accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [25/48] Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Date Tue, 04 Feb 2014 17:54:54 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
----------------------------------------------------------------------
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
index d85efa5,0000000..31a8f0f
mode 100644,000000..100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java
@@@ -1,104 -1,0 +1,105 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.monitor.servlets.trace;
 +
 +import java.util.Date;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import javax.servlet.http.HttpServletRequest;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.trace.TraceFormatter;
 +import org.apache.accumulo.monitor.Monitor;
 +import org.apache.accumulo.monitor.servlets.BasicServlet;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +
 +abstract class Basic extends BasicServlet {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  public static String getStringParameter(HttpServletRequest req, String name, String defaultValue) {
 +    String result = req.getParameter(name);
 +    if (result == null) {
 +      return defaultValue;
 +    }
 +    return result;
 +  }
 +
 +  public static int getIntParameter(HttpServletRequest req, String name, int defaultMinutes) {
 +    String valueString = req.getParameter(name);
 +    if (valueString == null)
 +      return defaultMinutes;
 +    int result = 0;
 +    try {
 +      result = Integer.parseInt(valueString);
 +    } catch (NumberFormatException ex) {
 +      return defaultMinutes;
 +    }
 +    return result;
 +  }
 +
 +  public static String dateString(long millis) {
 +    return TraceFormatter.formatDate(new Date(millis));
 +  }
 +
 +  protected Scanner getScanner(StringBuilder sb) throws AccumuloException, AccumuloSecurityException {
 +    AccumuloConfiguration conf = Monitor.getSystemConfiguration();
 +    String principal = conf.get(Property.TRACE_USER);
 +    AuthenticationToken at;
 +    Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
 +    if (loginMap.isEmpty()) {
 +      Property p = Property.TRACE_PASSWORD;
-       at = new PasswordToken(conf.get(p).getBytes());
++      at = new PasswordToken(conf.get(p).getBytes(Constants.UTF8));
 +    } else {
 +      Properties props = new Properties();
 +      int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length() + 1;
 +      for (Entry<String,String> entry : loginMap.entrySet()) {
 +        props.put(entry.getKey().substring(prefixLength), entry.getValue());
 +      }
 +
 +      AuthenticationToken token = Property.createInstanceFromPropertyName(conf, Property.TRACE_TOKEN_TYPE, AuthenticationToken.class, new PasswordToken());
 +      token.init(props);
 +      at = token;
 +    }
 +
 +    String table = conf.get(Property.TRACE_TABLE);
 +    try {
 +      Connector conn = HdfsZooInstance.getInstance().getConnector(principal, at);
 +      if (!conn.tableOperations().exists(table)) {
 +        return new NullScanner();
 +      }
 +      Scanner scanner = conn.createScanner(table, conn.securityOperations().getUserAuthorizations(principal));
 +      return scanner;
 +    } catch (AccumuloSecurityException ex) {
 +      sb.append("<h2>Unable to read trace table: check trace username and password configuration.</h2>\n");
 +      return null;
 +    } catch (TableNotFoundException ex) {
 +      return new NullScanner();
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 32898f4,0000000..0f4cd3a
mode 100644,000000..100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@@ -1,322 -1,0 +1,322 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tracer;
 +
 +import java.net.InetSocketAddress;
 +import java.net.ServerSocket;
 +import java.nio.channels.ServerSocketChannel;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.user.AgeOffFilter;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.trace.TraceFormatter;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.start.classloader.AccumuloClassLoader;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.thrift.RemoteSpan;
 +import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
 +import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TByteArrayOutputStream;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.protocol.TCompactProtocol;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.server.TThreadPoolServer;
 +import org.apache.thrift.transport.TServerSocket;
 +import org.apache.thrift.transport.TServerTransport;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.Watcher.Event.EventType;
 +import org.apache.zookeeper.Watcher.Event.KeeperState;
 +
 +public class TraceServer implements Watcher {
 +  
 +  final private static Logger log = Logger.getLogger(TraceServer.class);
 +  final private ServerConfiguration serverConfiguration;
 +  final private TServer server;
 +  final private AtomicReference<BatchWriter> writer;
 +  final private Connector connector;
 +  final String table;
 +  
 +  private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
 +    m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len));
 +  }
 +  
 +  static class ByteArrayTransport extends TTransport {
 +    TByteArrayOutputStream out = new TByteArrayOutputStream();
 +    
 +    @Override
 +    public boolean isOpen() {
 +      return true;
 +    }
 +    
 +    @Override
 +    public void open() throws TTransportException {}
 +    
 +    @Override
 +    public void close() {}
 +    
 +    @Override
 +    public int read(byte[] buf, int off, int len) {
 +      return 0;
 +    }
 +    
 +    @Override
 +    public void write(byte[] buf, int off, int len) throws TTransportException {
 +      out.write(buf, off, len);
 +    }
 +    
 +    public byte[] get() {
 +      return out.get();
 +    }
 +    
 +    public int len() {
 +      return out.len();
 +    }
 +  }
 +  
 +  class Receiver implements Iface {
 +    @Override
 +    public void span(RemoteSpan s) throws TException {
 +      String idString = Long.toHexString(s.traceId);
 +      String startString = Long.toHexString(s.start);
 +      Mutation spanMutation = new Mutation(new Text(idString));
 +      Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString));
 +      long diff = s.stop - s.start;
-       indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes()));
++      indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes(Constants.UTF8)));
 +      ByteArrayTransport transport = new ByteArrayTransport();
 +      TCompactProtocol protocol = new TCompactProtocol(transport);
 +      s.write(protocol);
 +      String parentString = Long.toHexString(s.parentId);
 +      if (s.parentId == Span.ROOT_SPAN_ID)
 +        parentString = "";
 +      put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(), transport.len());
 +      // Map the root span to time so we can look up traces by time
 +      Mutation timeMutation = null;
 +      if (s.parentId == Span.ROOT_SPAN_ID) {
 +        timeMutation = new Mutation(new Text("start:" + startString));
 +        put(timeMutation, "id", idString, transport.get(), transport.len());
 +      }
 +      try {
 +        final BatchWriter writer = TraceServer.this.writer.get();
 +        /* Check for null, because we expect spans to come in much faster than flush calls.
 +           In the case of failure, we'd rather avoid logging tons of NPEs.
 +         */
 +        if (null == writer) {
 +          log.warn("writer is not ready; discarding span.");
 +          return;
 +        }
 +        writer.addMutation(spanMutation);
 +        writer.addMutation(indexMutation);
 +        if (timeMutation != null)
 +          writer.addMutation(timeMutation);
 +      } catch (MutationsRejectedException exception) {
 +        log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + exception);
 +        if (log.isDebugEnabled()) {
 +          log.debug("discarded span due to rejection of mutation: " + spanMutation, exception);
 +        }
 +      /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */
 +      } catch (RuntimeException exception) {
 +        log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception);
 +        log.debug("unable to write mutation to table due to exception.", exception);
 +      }
 +    }
 +    
 +  }
 +  
 +  public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception {
 +    this.serverConfiguration = serverConfiguration;
 +    AccumuloConfiguration conf = serverConfiguration.getConfiguration();
 +    table = conf.get(Property.TRACE_TABLE);
 +    Connector connector = null;
 +    while (true) {
 +      try {
 +        String principal = conf.get(Property.TRACE_USER);
 +        AuthenticationToken at;
 +        Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
 +        if (loginMap.isEmpty()) {
 +          Property p = Property.TRACE_PASSWORD;
-           at = new PasswordToken(conf.get(p).getBytes());
++          at = new PasswordToken(conf.get(p).getBytes(Constants.UTF8));
 +        } else {
 +          Properties props = new Properties();
 +          AuthenticationToken token = AccumuloClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class)
 +              .newInstance();
 +
 +          int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length() + 1;
 +          for (Entry<String,String> entry : loginMap.entrySet()) {
 +            props.put(entry.getKey().substring(prefixLength), entry.getValue());
 +          }
 +
 +          token.init(props);
 +          
 +          at = token;
 +        }
 +        
 +        connector = serverConfiguration.getInstance().getConnector(principal, at);
 +        if (!connector.tableOperations().exists(table)) {
 +          connector.tableOperations().create(table);
 +          IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
 +          AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
 +          connector.tableOperations().attachIterator(table, setting);
 +        }
 +        connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
 +        break;
 +      } catch (Exception ex) {
 +        log.info("Waiting to checking/create the trace table.", ex);
 +        UtilWaitThread.sleep(1000);
 +      }
 +    }
 +    this.connector = connector;
 +    // make sure we refer to the final variable from now on.
 +    connector = null;
 +    
 +    int port = conf.getPort(Property.TRACE_PORT);
 +    final ServerSocket sock = ServerSocketChannel.open().socket();
 +    sock.setReuseAddress(true);
 +    sock.bind(new InetSocketAddress(hostname, port));
 +    final TServerTransport transport = new TServerSocket(sock);
 +    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
 +    options.processor(new Processor<Iface>(new Receiver()));
 +    server = new TThreadPoolServer(options);
 +    registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort());
 +    writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS)));
 +  }
 +  
 +  public void run() throws Exception {
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        flush();
 +      }
 +    }, 1000, 1000);
 +    server.serve();
 +  }
 +  
 +  private void flush() {
 +    try {
 +      final BatchWriter writer = this.writer.get();
 +      if (null != writer) {
 +        writer.flush();
 +      }
 +    } catch (MutationsRejectedException exception) {
 +      log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
 +      log.debug("flushing traces failed due to exception", exception);
 +      resetWriter();
 +    /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */
 +    } catch (RuntimeException exception) {
 +      log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
 +      log.debug("flushing traces failed due to exception", exception);
 +      resetWriter();
 +    }
 +  }
 +  
 +  private void resetWriter() {
 +    BatchWriter writer = null;
 +    try {
 +      writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS));
 +    } catch (Exception ex) {
 +      log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + ex);
 +      log.debug("batch writer creation failed with exception.", ex);
 +    } finally {
 +      /* Trade in the new writer (even if null) for the one we need to close. */
 +      writer = this.writer.getAndSet(writer);
 +      try {
 +        if (null != writer) {
 +          writer.close();
 +        }
 +      } catch (Exception ex) {
 +        log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex);
 +        log.debug("batch writer close failed with exception", ex);
 +      }
 +    }
 +  }
 +  
 +  private void registerInZooKeeper(String name) throws Exception {
 +    String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS;
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-     String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes());
++    String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes(Constants.UTF8));
 +    zoo.exists(path, this);
 +  }
 +  
 +  public static void main(String[] args) throws Exception {
 +    SecurityUtil.serverLogin();
 +    ServerOpts opts = new ServerOpts();
 +    opts.parseArgs("tracer", args);
 +    Instance instance = HdfsZooInstance.getInstance();
 +    ServerConfiguration conf = new ServerConfiguration(instance);
 +    VolumeManager fs = VolumeManagerImpl.get();
 +    Accumulo.init(fs, conf, "tracer");
 +    String hostname = opts.getAddress();
 +    TraceServer server = new TraceServer(conf, hostname);
 +    Accumulo.enableTracing(hostname, "tserver");
 +    server.run();
 +    log.info("tracer stopping");
 +  }
 +  
 +  @Override
 +  public void process(WatchedEvent event) {
 +    log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
 +    if (event.getState() == KeeperState.Expired) {
 +      log.warn("Trace server lost zookeeper registration at " + event.getPath());
 +      server.stop();
 +    } else if (event.getType() == EventType.NodeDeleted) {
 +      log.warn("Trace server zookeeper entry lost " + event.getPath());
 +      server.stop();
 +    }
 +    if (event.getPath() != null) {
 +      try {
 +        if (ZooReaderWriter.getInstance().exists(event.getPath(), this))
 +          return;
 +      } catch (Exception ex) {
 +        log.error(ex, ex);
 +      }
 +      log.warn("Trace server unable to reset watch on zookeeper registration");
 +      server.stop();
 +    }
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
index 97fb7db,0000000..891bea4
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
@@@ -1,72 -1,0 +1,73 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import java.io.IOException;
 +
++import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.trace.TraceFileSystem;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.FileUtil;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * Copy failed bulk imports.
 + */
 +public class BulkFailedCopyProcessor implements Processor {
 +  
 +  private static final Logger log = Logger.getLogger(BulkFailedCopyProcessor.class);
 +  
 +  @Override
 +  public Processor newProcessor() {
 +    return new BulkFailedCopyProcessor();
 +  }
 +  
 +  @Override
 +  public void process(String workID, byte[] data) {
 +    
-     String paths[] = new String(data).split(",");
++    String paths[] = new String(data, Constants.UTF8).split(",");
 +    
 +    Path orig = new Path(paths[0]);
 +    Path dest = new Path(paths[1]);
 +    Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp");
 +    
 +    try {
 +      FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
 +          ServerConfiguration.getSiteConfiguration()));
 +      
 +      FileUtil.copy(fs, orig, fs, tmp, false, true, CachedConfiguration.getInstance());
 +      fs.rename(tmp, dest);
 +      log.debug("copied " + orig + " to " + dest);
 +    } catch (IOException ex) {
 +      try {
 +        FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
 +            ServerConfiguration.getSiteConfiguration()));
 +        
 +        fs.create(dest).close();
 +        log.warn(" marked " + dest + " failed", ex);
 +      } catch (IOException e) {
 +        log.error("Unable to create failure flag file " + dest, e);
 +      }
 +    }
 +
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java
index df476ec,0000000..151db6e
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Compactor.java
@@@ -1,548 -1,0 +1,548 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import java.io.IOException;
 +import java.text.DateFormat;
 +import java.text.SimpleDateFormat;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.thrift.IterInfo;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.FileSKVWriter;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.WrappingIterator;
 +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 +import org.apache.accumulo.core.iterators.system.DeletingIterator;
 +import org.apache.accumulo.core.iterators.system.MultiIterator;
 +import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 +import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
 +import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
 +import org.apache.accumulo.core.util.LocalityGroupUtil;
 +import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 +import org.apache.accumulo.server.fs.FileRef;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.problems.ProblemReport;
 +import org.apache.accumulo.server.problems.ProblemReportingIterator;
 +import org.apache.accumulo.server.problems.ProblemReports;
 +import org.apache.accumulo.server.problems.ProblemType;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
 +import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.log4j.Logger;
 +
 +public class Compactor implements Callable<CompactionStats> {
 +
 +  public static class CountingIterator extends WrappingIterator {
 +
 +    private long count;
 +    private ArrayList<CountingIterator> deepCopies;
 +    private AtomicLong entriesRead;
 +
 +    @Override
 +    public CountingIterator deepCopy(IteratorEnvironment env) {
 +      return new CountingIterator(this, env);
 +    }
 +
 +    private CountingIterator(CountingIterator other, IteratorEnvironment env) {
 +      setSource(other.getSource().deepCopy(env));
 +      count = 0;
 +      this.deepCopies = other.deepCopies;
 +      this.entriesRead = other.entriesRead;
 +      deepCopies.add(this);
 +    }
 +
 +    public CountingIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong entriesRead) {
 +      deepCopies = new ArrayList<Compactor.CountingIterator>();
 +      this.setSource(source);
 +      count = 0;
 +      this.entriesRead = entriesRead;
 +    }
 +
 +    @Override
 +    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void next() throws IOException {
 +      super.next();
 +      count++;
 +      if (count % 1024 == 0) {
 +        entriesRead.addAndGet(1024);
 +      }
 +    }
 +
 +    public long getCount() {
 +      long sum = 0;
 +      for (CountingIterator dc : deepCopies) {
 +        sum += dc.count;
 +      }
 +
 +      return count + sum;
 +    }
 +  }
 +
 +  private static final Logger log = Logger.getLogger(Compactor.class);
 +
 +  static class CompactionCanceledException extends Exception {
 +    private static final long serialVersionUID = 1L;
 +  }
 +
 +  interface CompactionEnv {
 +    boolean isCompactionEnabled();
 +
 +    IteratorScope getIteratorScope();
 +  }
 +
 +  private Map<FileRef,DataFileValue> filesToCompact;
 +  private InMemoryMap imm;
 +  private FileRef outputFile;
 +  private boolean propogateDeletes;
 +  private AccumuloConfiguration acuTableConf;
 +  private CompactionEnv env;
 +  private Configuration conf;
 +  private VolumeManager fs;
 +  protected KeyExtent extent;
 +  private List<IteratorSetting> iterators;
 +
 +  // things to report
 +  private String currentLocalityGroup = "";
 +  private long startTime;
 +
 +  private MajorCompactionReason reason;
 +  protected MinorCompactionReason mincReason;
 +
 +  private AtomicLong entriesRead = new AtomicLong(0);
 +  private AtomicLong entriesWritten = new AtomicLong(0);
 +  private DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
 +
 +  private static AtomicLong nextCompactorID = new AtomicLong(0);
 +
 +  // a unique id to identify a compactor
 +  private long compactorID = nextCompactorID.getAndIncrement();
 +
 +  protected volatile Thread thread;
 +
 +  private synchronized void setLocalityGroup(String name) {
 +    this.currentLocalityGroup = name;
 +  }
 +
 +  private void clearStats() {
 +    entriesRead.set(0);
 +    entriesWritten.set(0);
 +  }
 +
-   protected static Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>());
++  protected static final Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>());
 +
 +  public static class CompactionInfo {
 +
 +    private Compactor compactor;
 +    private String localityGroup;
 +    private long entriesRead;
 +    private long entriesWritten;
 +
 +    CompactionInfo(Compactor compactor) {
 +      this.localityGroup = compactor.currentLocalityGroup;
 +      this.entriesRead = compactor.entriesRead.get();
 +      this.entriesWritten = compactor.entriesWritten.get();
 +      this.compactor = compactor;
 +    }
 +
 +    public long getID() {
 +      return compactor.compactorID;
 +    }
 +
 +    public KeyExtent getExtent() {
 +      return compactor.getExtent();
 +    }
 +
 +    public long getEntriesRead() {
 +      return entriesRead;
 +    }
 +
 +    public long getEntriesWritten() {
 +      return entriesWritten;
 +    }
 +
 +    public Thread getThread() {
 +      return compactor.thread;
 +    }
 +
 +    public String getOutputFile() {
 +      return compactor.getOutputFile();
 +    }
 +
 +    public ActiveCompaction toThrift() {
 +
 +      CompactionType type;
 +
 +      if (compactor.imm != null)
 +        if (compactor.filesToCompact.size() > 0)
 +          type = CompactionType.MERGE;
 +        else
 +          type = CompactionType.MINOR;
 +      else if (!compactor.propogateDeletes)
 +        type = CompactionType.FULL;
 +      else
 +        type = CompactionType.MAJOR;
 +
 +      CompactionReason reason;
 +
 +      if (compactor.imm != null)
 +        switch (compactor.mincReason) {
 +          case USER:
 +            reason = CompactionReason.USER;
 +            break;
 +          case CLOSE:
 +            reason = CompactionReason.CLOSE;
 +            break;
 +          case SYSTEM:
 +          default:
 +            reason = CompactionReason.SYSTEM;
 +            break;
 +        }
 +      else
 +        switch (compactor.reason) {
 +          case USER:
 +            reason = CompactionReason.USER;
 +            break;
 +          case CHOP:
 +            reason = CompactionReason.CHOP;
 +            break;
 +          case IDLE:
 +            reason = CompactionReason.IDLE;
 +            break;
 +          case NORMAL:
 +          default:
 +            reason = CompactionReason.SYSTEM;
 +            break;
 +        }
 +
 +      List<IterInfo> iiList = new ArrayList<IterInfo>();
 +      Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>();
 +
 +      for (IteratorSetting iterSetting : compactor.iterators) {
 +        iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
 +        iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
 +      }
 +      List<String> filesToCompact = new ArrayList<String>();
 +      for (FileRef ref : compactor.filesToCompact.keySet())
 +        filesToCompact.add(ref.toString());
 +      return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, filesToCompact,
 +          compactor.outputFile.toString(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
 +    }
 +  }
 +
 +  public static List<CompactionInfo> getRunningCompactions() {
 +    ArrayList<CompactionInfo> compactions = new ArrayList<Compactor.CompactionInfo>();
 +
 +    synchronized (runningCompactions) {
 +      for (Compactor compactor : runningCompactions) {
 +        compactions.add(new CompactionInfo(compactor));
 +      }
 +    }
 +
 +    return compactions;
 +  }
 +
 +  Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
 +      AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) {
 +    this.extent = extent;
 +    this.conf = conf;
 +    this.fs = fs;
 +    this.filesToCompact = files;
 +    this.imm = imm;
 +    this.outputFile = outputFile;
 +    this.propogateDeletes = propogateDeletes;
 +    this.acuTableConf = acuTableConf;
 +    this.env = env;
 +    this.iterators = iterators;
 +    this.reason = reason;
 +
 +    startTime = System.currentTimeMillis();
 +  }
 +
 +  Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
 +      AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
 +    this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null);
 +  }
 +
 +  public VolumeManager getFileSystem() {
 +    return fs;
 +  }
 +
 +  KeyExtent getExtent() {
 +    return extent;
 +  }
 +
 +  String getOutputFile() {
 +    return outputFile.toString();
 +  }
 +
 +  @Override
 +  public CompactionStats call() throws IOException, CompactionCanceledException {
 +
 +    FileSKVWriter mfw = null;
 +
 +    CompactionStats majCStats = new CompactionStats();
 +
 +    boolean remove = runningCompactions.add(this);
 +
 +    clearStats();
 +
 +    String oldThreadName = Thread.currentThread().getName();
 +    String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile;
 +    Thread.currentThread().setName(newThreadName);
 +    thread = Thread.currentThread();
 +    try {
 +      FileOperations fileFactory = FileOperations.getInstance();
 +      FileSystem ns = this.fs.getFileSystemByPath(outputFile.path());
 +      mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf);
 +
 +      Map<String,Set<ByteSequence>> lGroups;
 +      try {
 +        lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf);
 +      } catch (LocalityGroupConfigurationError e) {
 +        throw new IOException(e);
 +      }
 +
 +      long t1 = System.currentTimeMillis();
 +
 +      HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>();
 +
 +      if (mfw.supportsLocalityGroups()) {
 +        for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
 +          setLocalityGroup(entry.getKey());
 +          compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats);
 +          allColumnFamilies.addAll(entry.getValue());
 +        }
 +      }
 +
 +      setLocalityGroup("");
 +      compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats);
 +
 +      long t2 = System.currentTimeMillis();
 +
 +      FileSKVWriter mfwTmp = mfw;
 +      mfw = null; // set this to null so we do not try to close it again in finally if the close fails
 +      mfwTmp.close(); // if the close fails it will cause the compaction to fail
 +
 +      // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
 +      try {
 +        FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf);
 +        openReader.close();
 +      } catch (IOException ex) {
 +        log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
 +        throw ex;
 +      }
 +
 +      log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
 +          majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
 +
 +      majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf));
 +      return majCStats;
 +    } catch (IOException e) {
 +      log.error(e, e);
 +      throw e;
 +    } catch (RuntimeException e) {
 +      log.error(e, e);
 +      throw e;
 +    } finally {
 +      Thread.currentThread().setName(oldThreadName);
 +      if (remove) {
 +        thread = null;
 +        runningCompactions.remove(this);
 +      }
 +
 +      try {
 +        if (mfw != null) {
 +          // compaction must not have finished successfully, so close its output file
 +          try {
 +            mfw.close();
 +          } finally {
 +            if (!fs.deleteRecursively(outputFile.path()))
 +              if (fs.exists(outputFile.path()))
 +                log.error("Unable to delete " + outputFile);
 +          }
 +        }
 +      } catch (IOException e) {
 +        log.warn(e, e);
 +      } catch (RuntimeException exception) {
 +        log.warn(exception, exception);
 +      }
 +    }
 +  }
 +
 +  private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException {
 +
 +    List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
 +
 +    for (FileRef mapFile : filesToCompact.keySet()) {
 +      try {
 +
 +        FileOperations fileFactory = FileOperations.getInstance();
 +        FileSystem fs = this.fs.getFileSystemByPath(mapFile.path());
 +        FileSKVIterator reader;
 +
 +        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, conf, acuTableConf);
 +
 +        readers.add(reader);
 +
 +        SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader);
 +
 +        if (filesToCompact.get(mapFile).isTimeSet()) {
 +          iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
 +        }
 +
 +        iters.add(iter);
 +
 +      } catch (Throwable e) {
 +
 +        ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e));
 +
 +        log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
 +        // failed to open some map file... close the ones that were opened
 +        for (FileSKVIterator reader : readers) {
 +          try {
 +            reader.close();
 +          } catch (Throwable e2) {
 +            log.warn("Failed to close map file", e2);
 +          }
 +        }
 +
 +        readers.clear();
 +
 +        if (e instanceof IOException)
 +          throw (IOException) e;
 +        throw new IOException("Failed to open map data files", e);
 +      }
 +    }
 +
 +    return iters;
 +  }
 +
 +  private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats)
 +      throws IOException, CompactionCanceledException {
 +    ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size());
 +    Span span = Trace.start("compact");
 +    try {
 +      long entriesCompacted = 0;
 +      List<SortedKeyValueIterator<Key,Value>> iters = openMapDataFiles(lgName, readers);
 +
 +      if (imm != null) {
 +        iters.add(imm.compactionIterator());
 +      }
 +
 +      CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange()), entriesRead);
 +      DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes);
 +      ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
 +
 +      // if(env.getIteratorScope() )
 +
 +      TabletIteratorEnvironment iterEnv;
 +      if (env.getIteratorScope() == IteratorScope.majc)
 +        iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf);
 +      else if (env.getIteratorScope() == IteratorScope.minc)
 +        iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf);
 +      else
 +        throw new IllegalArgumentException();
 +
 +      SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf,
 +          iterators, iterEnv));
 +
 +      itr.seek(extent.toDataRange(), columnFamilies, inclusive);
 +
 +      if (!inclusive) {
 +        mfw.startDefaultLocalityGroup();
 +      } else {
 +        mfw.startNewLocalityGroup(lgName, columnFamilies);
 +      }
 +
 +      Span write = Trace.start("write");
 +      try {
 +        while (itr.hasTop() && env.isCompactionEnabled()) {
 +          mfw.append(itr.getTopKey(), itr.getTopValue());
 +          itr.next();
 +          entriesCompacted++;
 +
 +          if (entriesCompacted % 1024 == 0) {
 +            // Periodically update stats, do not want to do this too often since its volatile
 +            entriesWritten.addAndGet(1024);
 +          }
 +        }
 +
 +        if (itr.hasTop() && !env.isCompactionEnabled()) {
 +          // cancel major compaction operation
 +          try {
 +            try {
 +              mfw.close();
 +            } catch (IOException e) {
 +              log.error(e, e);
 +            }
 +            fs.deleteRecursively(outputFile.path());
 +          } catch (Exception e) {
 +            log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
 +          }
 +          throw new CompactionCanceledException();
 +        }
 +
 +      } finally {
 +        CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted);
 +        majCStats.add(lgMajcStats);
 +        write.stop();
 +      }
 +
 +    } finally {
 +      // close sequence files opened
 +      for (FileSKVIterator reader : readers) {
 +        try {
 +          reader.close();
 +        } catch (Throwable e) {
 +          log.warn("Failed to close map file", e);
 +        }
 +      }
 +      span.stop();
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index 5ee62c6,0000000..0bd8daf
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@@ -1,760 -1,0 +1,763 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import java.io.IOException;
++import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentSkipListMap;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.ColumnUpdate;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.FileSKVWriter;
 +import org.apache.accumulo.core.file.rfile.RFile;
 +import org.apache.accumulo.core.file.rfile.RFileOperations;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SkippingIterator;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.SortedMapIterator;
 +import org.apache.accumulo.core.iterators.WrappingIterator;
 +import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
 +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
 +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
 +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
 +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.LocalityGroupUtil;
 +import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 +import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.trace.TraceFileSystem;
 +import org.apache.commons.lang.mutable.MutableLong;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +
- class MemKeyComparator implements Comparator<Key> {
++class MemKeyComparator implements Comparator<Key>, Serializable {
 +  
++  private static final long serialVersionUID = 1L;
++
 +  @Override
 +  public int compare(Key k1, Key k2) {
 +    int cmp = k1.compareTo(k2);
 +    
 +    if (cmp == 0) {
 +      if (k1 instanceof MemKey)
 +        if (k2 instanceof MemKey)
 +          cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
 +        else
 +          cmp = 1;
 +      else if (k2 instanceof MemKey)
 +        cmp = -1;
 +    }
 +    
 +    return cmp;
 +  }
 +}
 +
 +class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator {
 +  
 +  int kvCount;
 +  
 +  public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) {
 +    setSource(source);
 +    this.kvCount = maxKVCount;
 +  }
 +  
 +  @Override
 +  protected void consume() throws IOException {
 +    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
 +      getSource().next();
 +  }
 +  
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
 +  }
 +  
 +  @Override
 +  public void setInterruptFlag(AtomicBoolean flag) {
 +    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +  }
 +  
 +}
 +
 +class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator {
 +  MemKey currKey = null;
 +  Value currVal = null;
 +
 +  public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source) {
 +    super();
 +    setSource(source);
 +  }
 +
 +  public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source, MemKey startKey) {
 +    this(source);
 +    try {
 +      if (currKey != null)
 +        currKey = (MemKey) startKey.clone();
 +    } catch (CloneNotSupportedException e) {
 +      // MemKey is supported
 +    }
 +  }
 +
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    return new MemKeyConversionIterator(getSource().deepCopy(env), currKey);
 +  }
 +  
 +  @Override
 +  public Key getTopKey() {
 +    return currKey;
 +  }
 +  
 +  @Override
 +  public Value getTopValue() {
 +    return currVal;
 +  }
 +  
 +  private void getTopKeyVal() {
 +    Key k = super.getTopKey();
 +    Value v = super.getTopValue();
 +    if (k instanceof MemKey || k == null) {
 +      currKey = (MemKey) k;
 +      currVal = v;
 +      return;
 +    }
 +    currVal = new Value(v);
 +    int mc = MemValue.splitKVCount(currVal);
 +    currKey = new MemKey(k, mc);
 +
 +  }
 +  
 +  public void next() throws IOException {
 +    super.next();
 +    if (hasTop())
 +      getTopKeyVal();
 +  }
 +
 +  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
 +    super.seek(range, columnFamilies, inclusive);
 +    
 +    if (hasTop())
 +      getTopKeyVal();
 +
 +    Key k = range.getStartKey();
 +    if (k instanceof MemKey && hasTop()) {
 +      while (hasTop() && currKey.compareTo(k) < 0)
 +        next();
 +    }
 +  }
 +
 +  @Override
 +  public void setInterruptFlag(AtomicBoolean flag) {
 +    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +  }
 +
 +}
 +
 +public class InMemoryMap {
 +  private SimpleMap map = null;
 +  
 +  private static final Logger log = Logger.getLogger(InMemoryMap.class);
 +  
 +  private volatile String memDumpFile = null;
 +  private final String memDumpDir;
 +
 +  private Map<String,Set<ByteSequence>> lggroups;
 +  
 +  public InMemoryMap(boolean useNativeMap, String memDumpDir) {
 +    this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
 +  }
 +
 +  public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
 +    this.memDumpDir = memDumpDir;
 +    this.lggroups = lggroups;
 +    
 +    if (lggroups.size() == 0)
 +      map = newMap(useNativeMap);
 +    else
 +      map = new LocalityGroupMap(lggroups, useNativeMap);
 +  }
 +  
 +  public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
 +    this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
 +  }
 +  
 +  private static SimpleMap newMap(boolean useNativeMap) {
 +    if (useNativeMap && NativeMap.isLoaded()) {
 +      try {
 +        return new NativeMapWrapper();
 +      } catch (Throwable t) {
 +        log.error("Failed to create native map", t);
 +      }
 +    }
 +    
 +    return new DefaultMap();
 +  }
 +  
 +  private interface SimpleMap {
 +    Value get(Key key);
 +    
 +    Iterator<Entry<Key,Value>> iterator(Key startKey);
 +    
 +    int size();
 +    
 +    InterruptibleIterator skvIterator();
 +    
 +    void delete();
 +    
 +    long getMemoryUsed();
 +    
 +    void mutate(List<Mutation> mutations, int kvCount);
 +  }
 +  
 +  private static class LocalityGroupMap implements SimpleMap {
 +    
 +    private Map<ByteSequence,MutableLong> groupFams[];
 +    
 +    // the last map in the array is the default locality group
 +    private SimpleMap maps[];
 +    private Partitioner partitioner;
 +    private List<Mutation>[] partitioned;
 +    private Set<ByteSequence> nonDefaultColumnFamilies;
 +    
 +    @SuppressWarnings("unchecked")
 +    LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) {
 +      this.groupFams = new Map[groups.size()];
 +      this.maps = new SimpleMap[groups.size() + 1];
 +      this.partitioned = new List[groups.size() + 1];
 +      this.nonDefaultColumnFamilies = new HashSet<ByteSequence>();
 +      
 +      for (int i = 0; i < maps.length; i++) {
 +        maps[i] = newMap(useNativeMap);
 +      }
 +
 +      int count = 0;
 +      for (Set<ByteSequence> cfset : groups.values()) {
 +        HashMap<ByteSequence,MutableLong> map = new HashMap<ByteSequence,MutableLong>();
 +        for (ByteSequence bs : cfset)
 +          map.put(bs, new MutableLong(1));
 +        this.groupFams[count++] = map;
 +        nonDefaultColumnFamilies.addAll(cfset);
 +      }
 +      
 +      partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
 +      
 +      for (int i = 0; i < partitioned.length; i++) {
 +        partitioned[i] = new ArrayList<Mutation>();
 +      }
 +    }
 +
 +    @Override
 +    public Value get(Key key) {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Override
 +    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Override
 +    public int size() {
 +      int sum = 0;
 +      for (SimpleMap map : maps)
 +        sum += map.size();
 +      return sum;
 +    }
 +    
 +    @Override
 +    public InterruptibleIterator skvIterator() {
 +      LocalityGroup groups[] = new LocalityGroup[maps.length];
 +      for (int i = 0; i < groups.length; i++) {
 +        if (i < groupFams.length)
 +          groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false);
 +        else
 +          groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
 +      }
 +
 +
 +      return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
 +    }
 +    
 +    @Override
 +    public void delete() {
 +      for (SimpleMap map : maps)
 +        map.delete();
 +    }
 +    
 +    @Override
 +    public long getMemoryUsed() {
 +      long sum = 0;
 +      for (SimpleMap map : maps)
 +        sum += map.getMemoryUsed();
 +      return sum;
 +    }
 +    
 +    @Override
 +    public synchronized void mutate(List<Mutation> mutations, int kvCount) {
 +      // this method is synchronized because it reuses objects to avoid allocation,
 +      // currently, the method that calls this is synchronized so there is no
 +      // loss in parallelism.... synchronization was added here for future proofing
 +      
 +      try{
 +        partitioner.partition(mutations, partitioned);
 +        
 +        for (int i = 0; i < partitioned.length; i++) {
 +          if (partitioned[i].size() > 0) {
 +            maps[i].mutate(partitioned[i], kvCount);
 +            for (Mutation m : partitioned[i])
 +              kvCount += m.getUpdates().size();
 +          }
 +        }
 +      } finally {
 +        // clear immediately so mutations can be garbage collected
 +        for (List<Mutation> list : partitioned) {
 +          list.clear();
 +        }
 +      }
 +    }
 +    
 +  }
 +
 +  private static class DefaultMap implements SimpleMap {
 +    private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator());
 +    private AtomicLong bytesInMemory = new AtomicLong();
 +    private AtomicInteger size = new AtomicInteger();
 +    
 +    public void put(Key key, Value value) {
 +      // Always a MemKey, so account for the kvCount int
 +      bytesInMemory.addAndGet(key.getLength() + 4);
 +      bytesInMemory.addAndGet(value.getSize());
 +      if (map.put(key, value) == null)
 +        size.incrementAndGet();
 +    }
 +    
 +    public Value get(Key key) {
 +      return map.get(key);
 +    }
 +    
 +    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
 +      Key lk = new Key(startKey);
 +      SortedMap<Key,Value> tm = map.tailMap(lk);
 +      return tm.entrySet().iterator();
 +    }
 +    
 +    public int size() {
 +      return size.get();
 +    }
 +    
 +    public synchronized InterruptibleIterator skvIterator() {
 +      if (map == null)
 +        throw new IllegalStateException();
 +      
 +      return new SortedMapIterator(map);
 +    }
 +    
 +    public synchronized void delete() {
 +      map = null;
 +    }
 +    
 +    public long getOverheadPerEntry() {
 +      // all of the java objects that are used to hold the
 +      // data and make it searchable have overhead... this
 +      // overhead is estimated using test.EstimateInMemMapOverhead
 +      // and is in bytes.. the estimates were obtained by running
 +      // java 6_16 in 64 bit server mode
 +      
 +      return 200;
 +    }
 +    
 +    @Override
 +    public void mutate(List<Mutation> mutations, int kvCount) {
 +      for (Mutation m : mutations) {
 +        for (ColumnUpdate cvp : m.getUpdates()) {
 +          Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(),
 +              false, kvCount++);
 +          Value value = new Value(cvp.getValue());
 +          put(newKey, value);
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public long getMemoryUsed() {
 +      return bytesInMemory.get() + (size() * getOverheadPerEntry());
 +    }
 +  }
 +  
 +  private static class NativeMapWrapper implements SimpleMap {
 +    private NativeMap nativeMap;
 +    
 +    NativeMapWrapper() {
 +      nativeMap = new NativeMap();
 +    }
 +    
 +    public Value get(Key key) {
 +      return nativeMap.get(key);
 +    }
 +    
 +    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
 +      return nativeMap.iterator(startKey);
 +    }
 +    
 +    public int size() {
 +      return nativeMap.size();
 +    }
 +    
 +    public InterruptibleIterator skvIterator() {
 +      return (InterruptibleIterator) nativeMap.skvIterator();
 +    }
 +    
 +    public void delete() {
 +      nativeMap.delete();
 +    }
 +    
 +    public long getMemoryUsed() {
 +      return nativeMap.getMemoryUsed();
 +    }
 +    
 +    @Override
 +    public void mutate(List<Mutation> mutations, int kvCount) {
 +      nativeMap.mutate(mutations, kvCount);
 +    }
 +  }
 +  
 +  private AtomicInteger nextKVCount = new AtomicInteger(1);
 +  private AtomicInteger kvCount = new AtomicInteger(0);
 +
 +  private Object writeSerializer = new Object();
 +  
 +  /**
 +   * Applies changes to a row in the InMemoryMap
 +   * 
 +   */
 +  public void mutate(List<Mutation> mutations) {
 +    int numKVs = 0;
 +    for (int i = 0; i < mutations.size(); i++)
 +      numKVs += mutations.get(i).size();
 +    
 +    // Can not update mutationCount while writes that started before
 +    // are in progress, this would cause partial mutations to be seen.
 +    // Also, can not continue until mutation count is updated, because
 +    // a read may not see a successful write. Therefore writes must
 +    // wait for writes that started before to finish.
 +    //
 +    // using separate lock from this map, to allow read/write in parallel
 +    synchronized (writeSerializer ) {
 +      int kv = nextKVCount.getAndAdd(numKVs);
 +      try {
 +        map.mutate(mutations, kv);
 +      } finally {
 +        kvCount.set(kv + numKVs - 1);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Returns a long representing the size of the InMemoryMap
 +   * 
 +   * @return bytesInMemory
 +   */
 +  public synchronized long estimatedSizeInBytes() {
 +    if (map == null)
 +      return 0;
 +    
 +    return map.getMemoryUsed();
 +  }
 +  
 +  Iterator<Map.Entry<Key,Value>> iterator(Key startKey) {
 +    return map.iterator(startKey);
 +  }
 +  
 +  public long getNumEntries() {
 +    return map.size();
 +  }
 +  
 +  private final Set<MemoryIterator> activeIters = Collections.synchronizedSet(new HashSet<MemoryIterator>());
 +  
 +  class MemoryDataSource implements DataSource {
 +    
 +    boolean switched = false;
 +    private InterruptibleIterator iter;
 +    private List<FileSKVIterator> readers;
 +    
 +    MemoryDataSource() {
 +      this(new ArrayList<FileSKVIterator>());
 +    }
 +    
 +    public MemoryDataSource(List<FileSKVIterator> readers) {
 +      this.readers = readers;
 +    }
 +    
 +    @Override
 +    public boolean isCurrent() {
 +      if (switched)
 +        return true;
 +      else
 +        return memDumpFile == null;
 +    }
 +    
 +    @Override
 +    public DataSource getNewDataSource() {
 +      if (switched)
 +        throw new IllegalStateException();
 +      
 +      if (!isCurrent()) {
 +        switched = true;
 +        iter = null;
 +      }
 +      
 +      return this;
 +    }
 +    
 +    @Override
 +    public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
 +      if (iter == null)
 +        if (!switched)
 +          iter = map.skvIterator();
 +        else {
 +          
 +          Configuration conf = CachedConfiguration.getInstance();
 +          FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
 +          
 +          FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
 +
 +          readers.add(reader);
 +          
 +          iter = new MemKeyConversionIterator(reader);
 +        }
 +      
 +      return iter;
 +    }
 +    
 +    @Override
 +    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
 +      return new MemoryDataSource(readers);
 +    }
 +    
 +  }
 +  
 +  class MemoryIterator extends WrappingIterator implements InterruptibleIterator {
 +    
 +    private AtomicBoolean closed;
 +    private SourceSwitchingIterator ssi;
 +    private MemoryDataSource mds;
 +    
 +    protected SortedKeyValueIterator<Key,Value> getSource() {
 +      if (closed.get())
 +        throw new IllegalStateException("Memory iterator is closed");
 +      return super.getSource();
 +    }
 +    
 +    private MemoryIterator(InterruptibleIterator source) {
 +      this(source, new AtomicBoolean(false));
 +    }
 +    
 +    private MemoryIterator(SortedKeyValueIterator<Key,Value> source, AtomicBoolean closed) {
 +      setSource(source);
 +      this.closed = closed;
 +    }
 +    
 +    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +      return new MemoryIterator(getSource().deepCopy(env), closed);
 +    }
 +    
 +    public void close() {
 +      
 +      synchronized (this) {
 +        if (closed.compareAndSet(false, true)) {
 +          
 +          for (FileSKVIterator reader : mds.readers)
 +            try {
 +              reader.close();
 +            } catch (IOException e) {
 +              log.warn(e, e);
 +            }
 +        }
 +      }
 +      
 +      // remove outside of sync to avoid deadlock
 +      activeIters.remove(this);
 +    }
 +    
 +    private synchronized boolean switchNow() throws IOException {
 +      if (closed.get())
 +        return false;
 +      
 +      ssi.switchNow();
 +      return true;
 +    }
 +    
 +    @Override
 +    public void setInterruptFlag(AtomicBoolean flag) {
 +      ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +    }
 +    
 +    private void setSSI(SourceSwitchingIterator ssi) {
 +      this.ssi = ssi;
 +    }
 +    
 +    public void setMDS(MemoryDataSource mds) {
 +      this.mds = mds;
 +    }
 +    
 +  }
 +  
 +  public synchronized MemoryIterator skvIterator() {
 +    if (map == null)
 +      throw new NullPointerException();
 +    
 +    if (deleted)
 +      throw new IllegalStateException("Can not obtain iterator after map deleted");
 +    
 +    int mc = kvCount.get();
 +    MemoryDataSource mds = new MemoryDataSource();
 +    SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
 +    MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc));
 +    mi.setSSI(ssi);
 +    mi.setMDS(mds);
 +    activeIters.add(mi);
 +    return mi;
 +  }
 +  
 +  public SortedKeyValueIterator<Key,Value> compactionIterator() {
 +    
 +    if (nextKVCount.get() - 1 != kvCount.get())
 +      throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = "
 +          + kvCount.get());
 +    
 +    return map.skvIterator();
 +  }
 +  
 +  private boolean deleted = false;
 +  
 +  public void delete(long waitTime) {
 +    
 +    synchronized (this) {
 +      if (deleted)
 +        throw new IllegalStateException("Double delete");
 +      
 +      deleted = true;
 +    }
 +    
 +    long t1 = System.currentTimeMillis();
 +    
 +    while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime) {
 +      UtilWaitThread.sleep(50);
 +    }
 +    
 +    if (activeIters.size() > 0) {
 +      // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file
 +      try {
 +        Configuration conf = CachedConfiguration.getInstance();
 +        FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
 +        
 +        String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION;
 +        
 +        Configuration newConf = new Configuration(conf);
 +        newConf.setInt("io.seqfile.compress.blocksize", 100000);
 +        
 +        FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
 +        
 +        InterruptibleIterator iter = map.skvIterator();
 +       
 +        HashSet<ByteSequence> allfams= new HashSet<ByteSequence>();
 +        
 +        for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){
 +          allfams.addAll(entry.getValue());
 +          out.startNewLocalityGroup(entry.getKey(), entry.getValue());
 +          iter.seek(new Range(), entry.getValue(), true);
 +          dumpLocalityGroup(out, iter);
 +        }
 +        
 +        out.startDefaultLocalityGroup();
 +        iter.seek(new Range(), allfams, false);
 +       
 +        dumpLocalityGroup(out, iter);
 +        
 +        out.close();
 +        
 +        log.debug("Created mem dump file " + tmpFile);
 +        
 +        memDumpFile = tmpFile;
 +        
 +        synchronized (activeIters) {
 +          for (MemoryIterator mi : activeIters) {
 +            mi.switchNow();
 +          }
 +        }
 +        
 +        // rely on unix behavior that file will be deleted when last
 +        // reader closes it
 +        fs.delete(new Path(memDumpFile), true);
 +        
 +      } catch (IOException ioe) {
 +        log.error("Failed to create mem dump file ", ioe);
 +        
 +        while (activeIters.size() > 0) {
 +          UtilWaitThread.sleep(100);
 +        }
 +      }
 +      
 +    }
 +    
 +    SimpleMap tmpMap = map;
 +    
 +    synchronized (this) {
 +      map = null;
 +    }
 +    
 +    tmpMap.delete();
 +  }
 +
 +  private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException {
 +    while (iter.hasTop() && activeIters.size() > 0) {
 +      // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
 +      // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
 +      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
 +      out.append(iter.getTopKey(), newValue);
 +      iter.next();
 +
 +    }
 +  }
 +}


Mime
View raw message