accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [31/53] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars
Date Fri, 06 Sep 2013 01:48:59 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
new file mode 100644
index 0000000..9988a9e
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Semaphore;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReportingIterator;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class FileManager {
+  
+  private static final Logger log = Logger.getLogger(FileManager.class);
+  
+  int maxOpen;
+  
+  private static class OpenReader implements Comparable<OpenReader> {
+    long releaseTime;
+    FileSKVIterator reader;
+    String fileName;
+    
+    public OpenReader(String fileName, FileSKVIterator reader) {
+      this.fileName = fileName;
+      this.reader = reader;
+      this.releaseTime = System.currentTimeMillis();
+    }
+    
+    @Override
+    public int compareTo(OpenReader o) {
+      if (releaseTime < o.releaseTime) {
+        return -1;
+      } else if (releaseTime > o.releaseTime) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof OpenReader) {
+        return compareTo((OpenReader) obj) == 0;
+      }
+      return false;
+    }
+    
+    @Override
+    public int hashCode() {
+      return fileName.hashCode();
+    }
+  }
+  
+  private Map<String,List<OpenReader>> openFiles;
+  private HashMap<FileSKVIterator,String> reservedReaders;
+  
+  private Semaphore filePermits;
+  
+  private VolumeManager fs;
+  
+  // the data cache and index cache are allocated in
+  // TabletResourceManager and passed through the file opener to
+  // CachableBlockFile which can handle the caches being
+  // null if unallocated
+  private BlockCache dataCache = null;
+  private BlockCache indexCache = null;
+  
+  private long maxIdleTime;
+  
+  private final ServerConfiguration conf;
+  
+  private class IdleFileCloser implements Runnable {
+    
+    @Override
+    public void run() {
+      
+      long curTime = System.currentTimeMillis();
+      
+      ArrayList<FileSKVIterator> filesToClose = new ArrayList<FileSKVIterator>();
+      
+      // determine which files to close in a sync block, and then close the
+      // files outside of the sync block
+      synchronized (FileManager.this) {
+        Iterator<Entry<String,List<OpenReader>>> iter = openFiles.entrySet().iterator();
+        while (iter.hasNext()) {
+          Entry<String,List<OpenReader>> entry = iter.next();
+          List<OpenReader> ofl = entry.getValue();
+          
+          for (Iterator<OpenReader> oflIter = ofl.iterator(); oflIter.hasNext();) {
+            OpenReader openReader = oflIter.next();
+            
+            if (curTime - openReader.releaseTime > maxIdleTime) {
+              
+              filesToClose.add(openReader.reader);
+              oflIter.remove();
+            }
+          }
+          
+          if (ofl.size() == 0) {
+            iter.remove();
+          }
+        }
+      }
+      
+      closeReaders(filesToClose);
+      
+    }
+    
+  }
+  
+  /**
+   * 
+   * @param dataCache
+   *          : underlying file can and should be able to handle a null cache
+   * @param indexCache
+   *          : underlying file can and should be able to handle a null cache
+   */
+  FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
+    
+    if (maxOpen <= 0)
+      throw new IllegalArgumentException("maxOpen <= 0");
+    this.conf = conf;
+    this.dataCache = dataCache;
+    this.indexCache = indexCache;
+    
+    this.filePermits = new Semaphore(maxOpen, true);
+    this.maxOpen = maxOpen;
+    this.fs = fs;
+    
+    this.openFiles = new HashMap<String,List<OpenReader>>();
+    this.reservedReaders = new HashMap<FileSKVIterator,String>();
+    
+    this.maxIdleTime = conf.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
+    SimpleTimer.getInstance().schedule(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2);
+    
+  }
+  
+  private static int countReaders(Map<String,List<OpenReader>> files) {
+    int count = 0;
+    
+    for (List<OpenReader> list : files.values()) {
+      count += list.size();
+    }
+    
+    return count;
+  }
+  
+  private List<FileSKVIterator> takeLRUOpenFiles(int numToTake) {
+    
+    ArrayList<OpenReader> openReaders = new ArrayList<OpenReader>();
+    
+    for (Entry<String,List<OpenReader>> entry : openFiles.entrySet()) {
+      openReaders.addAll(entry.getValue());
+    }
+    
+    Collections.sort(openReaders);
+    
+    ArrayList<FileSKVIterator> ret = new ArrayList<FileSKVIterator>();
+    
+    for (int i = 0; i < numToTake; i++) {
+      OpenReader or = openReaders.get(i);
+      
+      List<OpenReader> ofl = openFiles.get(or.fileName);
+      if (!ofl.remove(or)) {
+        throw new RuntimeException("Failed to remove open reader that should have been there");
+      }
+      
+      if (ofl.size() == 0) {
+        openFiles.remove(or.fileName);
+      }
+      
+      ret.add(or.reader);
+    }
+    
+    return ret;
+  }
+  
+  private static <T> List<T> getFileList(String file, Map<String,List<T>> files) {
+    List<T> ofl = files.get(file);
+    if (ofl == null) {
+      ofl = new ArrayList<T>();
+      files.put(file, ofl);
+    }
+    
+    return ofl;
+  }
+  
+  private void closeReaders(List<FileSKVIterator> filesToClose) {
+    for (FileSKVIterator reader : filesToClose) {
+      try {
+        reader.close();
+      } catch (Exception e) {
+        log.error("Failed to close file " + e.getMessage(), e);
+      }
+    }
+  }
+  
+  private List<String> takeOpenFiles(Collection<String> files, List<FileSKVIterator> reservedFiles, Map<FileSKVIterator,String> readersReserved) {
+    List<String> filesToOpen = new LinkedList<String>(files);
+    for (Iterator<String> iterator = filesToOpen.iterator(); iterator.hasNext();) {
+      String file = iterator.next();
+      
+      List<OpenReader> ofl = openFiles.get(file);
+      if (ofl != null && ofl.size() > 0) {
+        OpenReader openReader = ofl.remove(ofl.size() - 1);
+        reservedFiles.add(openReader.reader);
+        readersReserved.put(openReader.reader, file);
+        if (ofl.size() == 0) {
+          openFiles.remove(file);
+        }
+        iterator.remove();
+      }
+      
+    }
+    return filesToOpen;
+  }
+  
+  private synchronized String getReservedReadeFilename(FileSKVIterator reader) {
+    return reservedReaders.get(reader);
+  }
+  
+  private List<FileSKVIterator> reserveReaders(Text table, Collection<String> files, boolean continueOnFailure) throws IOException {
+    
+    if (files.size() >= maxOpen) {
+      throw new IllegalArgumentException("requested files exceeds max open");
+    }
+    
+    if (files.size() == 0) {
+      return Collections.emptyList();
+    }
+    
+    List<String> filesToOpen = null;
+    List<FileSKVIterator> filesToClose = Collections.emptyList();
+    List<FileSKVIterator> reservedFiles = new ArrayList<FileSKVIterator>();
+    Map<FileSKVIterator,String> readersReserved = new HashMap<FileSKVIterator,String>();
+    
+    filePermits.acquireUninterruptibly(files.size());
+    
+    // now that the we are past the semaphore, we have the authority
+    // to open files.size() files
+    
+    // determine what work needs to be done in sync block
+    // but do the work of opening and closing files outside
+    // a synch block
+    synchronized (this) {
+      
+      filesToOpen = takeOpenFiles(files, reservedFiles, readersReserved);
+      
+      int numOpen = countReaders(openFiles);
+      
+      if (filesToOpen.size() + numOpen + reservedReaders.size() > maxOpen) {
+        filesToClose = takeLRUOpenFiles((filesToOpen.size() + numOpen + reservedReaders.size()) - maxOpen);
+      }
+    }
+    
+    // close files before opening files to ensure we stay under resource
+    // limitations
+    closeReaders(filesToClose);
+    
+    // open any files that need to be opened
+    for (String file : filesToOpen) {
+      try {
+        Path path = fs.getFullPath(ServerConstants.getTablesDirs(), file);
+        FileSystem ns = fs.getFileSystemByPath(path);
+        //log.debug("Opening "+file + " path " + path);
+        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
+            dataCache, indexCache);
+        reservedFiles.add(reader);
+        readersReserved.put(reader, file);
+      } catch (Exception e) {
+        
+        ProblemReports.getInstance().report(new ProblemReport(table.toString(), ProblemType.FILE_READ, file, e));
+        
+        if (continueOnFailure) {
+          // release the permit for the file that failed to open
+          filePermits.release(1);
+          log.warn("Failed to open file " + file + " " + e.getMessage() + " continuing...");
+        } else {
+          // close whatever files were opened
+          closeReaders(reservedFiles);
+          
+          filePermits.release(files.size());
+          
+          log.error("Failed to open file " + file + " " + e.getMessage());
+          throw new IOException("Failed to open " + file, e);
+        }
+      }
+    }
+    
+    synchronized (this) {
+      // update set of reserved readers
+      reservedReaders.putAll(readersReserved);
+    }
+    
+    return reservedFiles;
+  }
+  
+  private void releaseReaders(List<FileSKVIterator> readers, boolean sawIOException) {
+    // put files in openFiles
+    
+    synchronized (this) {
+      
+      // check that readers were actually reserved ... want to make sure a thread does
+      // not try to release readers they never reserved
+      if (!reservedReaders.keySet().containsAll(readers)) {
+        throw new IllegalArgumentException("Asked to release readers that were never reserved ");
+      }
+      
+      for (FileSKVIterator reader : readers) {
+        try {
+          reader.closeDeepCopies();
+        } catch (IOException e) {
+          log.warn(e, e);
+          sawIOException = true;
+        }
+      }
+      
+      for (FileSKVIterator reader : readers) {
+        String fileName = reservedReaders.remove(reader);
+        if (!sawIOException)
+          getFileList(fileName, openFiles).add(new OpenReader(fileName, reader));
+      }
+    }
+    
+    if (sawIOException)
+      closeReaders(readers);
+    
+    // decrement the semaphore
+    filePermits.release(readers.size());
+    
+  }
+  
+  static class FileDataSource implements DataSource {
+    
+    private SortedKeyValueIterator<Key,Value> iter;
+    private ArrayList<FileDataSource> deepCopies;
+    private boolean current = true;
+    private IteratorEnvironment env;
+    private String file;
+    
+    FileDataSource(String file, SortedKeyValueIterator<Key,Value> iter) {
+      this.file = file;
+      this.iter = iter;
+      this.deepCopies = new ArrayList<FileManager.FileDataSource>();
+    }
+    
+    public FileDataSource(IteratorEnvironment env, SortedKeyValueIterator<Key,Value> deepCopy, ArrayList<FileDataSource> deepCopies) {
+      this.iter = deepCopy;
+      this.env = env;
+      this.deepCopies = deepCopies;
+      deepCopies.add(this);
+    }
+    
+    @Override
+    public boolean isCurrent() {
+      return current;
+    }
+    
+    @Override
+    public DataSource getNewDataSource() {
+      current = true;
+      return this;
+    }
+    
+    @Override
+    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
+      return new FileDataSource(env, iter.deepCopy(env), deepCopies);
+    }
+    
+    @Override
+    public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
+      return iter;
+    }
+    
+    void unsetIterator() {
+      current = false;
+      iter = null;
+      for (FileDataSource fds : deepCopies) {
+        fds.current = false;
+        fds.iter = null;
+      }
+    }
+    
+    void setIterator(SortedKeyValueIterator<Key,Value> iter) {
+      current = false;
+      this.iter = iter;
+      for (FileDataSource fds : deepCopies) {
+        fds.current = false;
+        fds.iter = iter.deepCopy(fds.env);
+      }
+    }
+    
+  }
+  
+  public class ScanFileManager {
+    
+    private ArrayList<FileDataSource> dataSources;
+    private ArrayList<FileSKVIterator> tabletReservedReaders;
+    private KeyExtent tablet;
+    private boolean continueOnFailure;
+    
+    ScanFileManager(KeyExtent tablet) {
+      tabletReservedReaders = new ArrayList<FileSKVIterator>();
+      dataSources = new ArrayList<FileDataSource>();
+      this.tablet = tablet;
+      
+      continueOnFailure = conf.getTableConfiguration(tablet).getBoolean(Property.TABLE_FAILURES_IGNORE);
+      
+      if (tablet.isMeta()) {
+        continueOnFailure = false;
+      }
+    }
+    
+    private List<FileSKVIterator> openFileRefs(Collection<FileRef> files) throws TooManyFilesException, IOException {
+      List<String> strings = new ArrayList<String>(files.size());
+      for (FileRef ref : files)
+        strings.add(ref.path().toString());
+      return openFiles(strings);
+    }
+    
+    private List<FileSKVIterator> openFiles(Collection<String> files) throws TooManyFilesException, IOException {
+      // one tablet can not open more than maxOpen files, otherwise it could get stuck
+      // forever waiting on itself to release files
+      
+      if (tabletReservedReaders.size() + files.size() >= maxOpen) {
+        throw new TooManyFilesException("Request to open files would exceed max open files reservedReaders.size()=" + tabletReservedReaders.size()
+            + " files.size()=" + files.size() + " maxOpen=" + maxOpen + " tablet = " + tablet);
+      }
+      
+      List<FileSKVIterator> newlyReservedReaders = reserveReaders(tablet.getTableId(), files, continueOnFailure);
+      
+      tabletReservedReaders.addAll(newlyReservedReaders);
+      return newlyReservedReaders;
+    }
+    
+    synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
+      
+      List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet());
+      
+      ArrayList<InterruptibleIterator> iters = new ArrayList<InterruptibleIterator>();
+      
+      for (FileSKVIterator reader : newlyReservedReaders) {
+        String filename = getReservedReadeFilename(reader);
+        InterruptibleIterator iter;
+        if (detachable) {
+          FileDataSource fds = new FileDataSource(filename, reader);
+          dataSources.add(fds);
+          SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds);
+          iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, ssi);
+        } else {
+          iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, reader);
+        }
+        DataFileValue value = files.get(new FileRef(filename));
+        if (value.isTimeSet()) {
+          iter = new TimeSettingIterator(iter, value.getTime());
+        }
+        
+        iters.add(iter);
+      }
+      
+      return iters;
+    }
+    
+    synchronized void detach() {
+      
+      releaseReaders(tabletReservedReaders, false);
+      tabletReservedReaders.clear();
+      
+      for (FileDataSource fds : dataSources)
+        fds.unsetIterator();
+    }
+    
+    synchronized void reattach() throws IOException {
+      if (tabletReservedReaders.size() != 0)
+        throw new IllegalStateException();
+      
+      Collection<String> files = new ArrayList<String>();
+      for (FileDataSource fds : dataSources)
+        files.add(fds.file);
+      
+      List<FileSKVIterator> newlyReservedReaders = openFiles(files);
+      Map<String,List<FileSKVIterator>> map = new HashMap<String,List<FileSKVIterator>>();
+      for (FileSKVIterator reader : newlyReservedReaders) {
+        String fileName = getReservedReadeFilename(reader);
+        List<FileSKVIterator> list = map.get(fileName);
+        if (list == null) {
+          list = new LinkedList<FileSKVIterator>();
+          map.put(fileName, list);
+        }
+        
+        list.add(reader);
+      }
+      
+      for (FileDataSource fds : dataSources) {
+        FileSKVIterator reader = map.get(fds.file).remove(0);
+        fds.setIterator(reader);
+      }
+    }
+    
+    synchronized void releaseOpenFiles(boolean sawIOException) {
+      releaseReaders(tabletReservedReaders, sawIOException);
+      tabletReservedReaders.clear();
+      dataSources.clear();
+    }
+    
+    synchronized int getNumOpenFiles() {
+      return tabletReservedReaders.size();
+    }
+  }
+  
+  public ScanFileManager newScanFileManager(KeyExtent tablet) {
+    return new ScanFileManager(tablet);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/HoldTimeoutException.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/HoldTimeoutException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/HoldTimeoutException.java
new file mode 100644
index 0000000..2175851
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/HoldTimeoutException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+public class HoldTimeoutException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
+  
+  public HoldTimeoutException(String why) {
+    super(why);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
new file mode 100644
index 0000000..abce00c
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -0,0 +1,762 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+class MemKeyComparator implements Comparator<Key> {
+  
+  @Override
+  public int compare(Key k1, Key k2) {
+    int cmp = k1.compareTo(k2);
+    
+    if (cmp == 0) {
+      if (k1 instanceof MemKey)
+        if (k2 instanceof MemKey)
+          cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
+        else
+          cmp = 1;
+      else if (k2 instanceof MemKey)
+        cmp = -1;
+    }
+    
+    return cmp;
+  }
+}
+
+class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator {
+  
+  int kvCount;
+  
+  public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) {
+    setSource(source);
+    this.kvCount = maxKVCount;
+  }
+  
+  @Override
+  protected void consume() throws IOException {
+    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
+      getSource().next();
+  }
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
+  }
+  
+  @Override
+  public void setInterruptFlag(AtomicBoolean flag) {
+    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+  }
+  
+}
+
+class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator {
+  MemKey currKey = null;
+  Value currVal = null;
+
+  public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source) {
+    super();
+    setSource(source);
+  }
+
+  public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source, MemKey startKey) {
+    this(source);
+    try {
+      if (currKey != null)
+        currKey = (MemKey) startKey.clone();
+    } catch (CloneNotSupportedException e) {
+      // MemKey is supported
+    }
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new MemKeyConversionIterator(getSource().deepCopy(env), currKey);
+  }
+  
+  @Override
+  public Key getTopKey() {
+    return currKey;
+  }
+  
+  @Override
+  public Value getTopValue() {
+    return currVal;
+  }
+  
+  private void getTopKeyVal() {
+    Key k = super.getTopKey();
+    Value v = super.getTopValue();
+    if (k instanceof MemKey || k == null) {
+      currKey = (MemKey) k;
+      currVal = v;
+      return;
+    }
+    currVal = new Value(v);
+    int mc = MemValue.splitKVCount(currVal);
+    currKey = new MemKey(k, mc);
+
+  }
+  
+  public void next() throws IOException {
+    super.next();
+    if (hasTop())
+      getTopKeyVal();
+  }
+
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    super.seek(range, columnFamilies, inclusive);
+    
+    if (hasTop())
+      getTopKeyVal();
+
+    Key k = range.getStartKey();
+    if (k instanceof MemKey && hasTop()) {
+      while (hasTop() && currKey.compareTo(k) < 0)
+        next();
+    }
+  }
+
+  @Override
+  public void setInterruptFlag(AtomicBoolean flag) {
+    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+  }
+
+}
+
+public class InMemoryMap {
+  MutationLog mutationLog;
+  
+  private SimpleMap map = null;
+  
+  private static final Logger log = Logger.getLogger(InMemoryMap.class);
+  
+  private volatile String memDumpFile = null;
+  private final String memDumpDir;
+
+  private Map<String,Set<ByteSequence>> lggroups;
+  
+  public InMemoryMap(boolean useNativeMap, String memDumpDir) {
+    this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
+  }
+
+  public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
+    this.memDumpDir = memDumpDir;
+    this.lggroups = lggroups;
+    
+    if (lggroups.size() == 0)
+      map = newMap(useNativeMap);
+    else
+      map = new LocalityGroupMap(lggroups, useNativeMap);
+  }
+  
+  public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
+    this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
+  }
+  
+  private static SimpleMap newMap(boolean useNativeMap) {
+    if (useNativeMap && NativeMap.loadedNativeLibraries()) {
+      try {
+        return new NativeMapWrapper();
+      } catch (Throwable t) {
+        log.error("Failed to create native map", t);
+      }
+    }
+    
+    return new DefaultMap();
+  }
+  
+  private interface SimpleMap {
+    public Value get(Key key);
+    
+    public Iterator<Entry<Key,Value>> iterator(Key startKey);
+    
+    public int size();
+    
+    public InterruptibleIterator skvIterator();
+    
+    public void delete();
+    
+    public long getMemoryUsed();
+    
+    public void mutate(List<Mutation> mutations, int kvCount);
+  }
+  
+  private static class LocalityGroupMap implements SimpleMap {
+    
+    private Map<ByteSequence,MutableLong> groupFams[];
+    
+    // the last map in the array is the default locality group
+    private SimpleMap maps[];
+    private Partitioner partitioner;
+    private List<Mutation>[] partitioned;
+    private Set<ByteSequence> nonDefaultColumnFamilies;
+    
+    @SuppressWarnings("unchecked")
+    LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) {
+      this.groupFams = new Map[groups.size()];
+      this.maps = new SimpleMap[groups.size() + 1];
+      this.partitioned = new List[groups.size() + 1];
+      this.nonDefaultColumnFamilies = new HashSet<ByteSequence>();
+      
+      for (int i = 0; i < maps.length; i++) {
+        maps[i] = newMap(useNativeMap);
+      }
+
+      int count = 0;
+      for (Set<ByteSequence> cfset : groups.values()) {
+        HashMap<ByteSequence,MutableLong> map = new HashMap<ByteSequence,MutableLong>();
+        for (ByteSequence bs : cfset)
+          map.put(bs, new MutableLong(1));
+        this.groupFams[count++] = map;
+        nonDefaultColumnFamilies.addAll(cfset);
+      }
+      
+      partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
+      
+      for (int i = 0; i < partitioned.length; i++) {
+        partitioned[i] = new ArrayList<Mutation>();
+      }
+    }
+
+    @Override
+    public Value get(Key key) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public int size() {
+      int sum = 0;
+      for (SimpleMap map : maps)
+        sum += map.size();
+      return sum;
+    }
+    
+    @Override
+    public InterruptibleIterator skvIterator() {
+      LocalityGroup groups[] = new LocalityGroup[maps.length];
+      for (int i = 0; i < groups.length; i++) {
+        if (i < groupFams.length)
+          groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false);
+        else
+          groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
+      }
+
+
+      return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
+    }
+    
+    @Override
+    public void delete() {
+      for (SimpleMap map : maps)
+        map.delete();
+    }
+    
+    @Override
+    public long getMemoryUsed() {
+      long sum = 0;
+      for (SimpleMap map : maps)
+        sum += map.getMemoryUsed();
+      return sum;
+    }
+    
+    @Override
+    public synchronized void mutate(List<Mutation> mutations, int kvCount) {
+      // this method is synchronized because it reuses objects to avoid allocation,
+      // currently, the method that calls this is synchronized so there is no
+      // loss in parallelism.... synchronization was added here for future proofing
+      
+      try{
+        partitioner.partition(mutations, partitioned);
+        
+        for (int i = 0; i < partitioned.length; i++) {
+          if (partitioned[i].size() > 0) {
+            maps[i].mutate(partitioned[i], kvCount);
+            for (Mutation m : partitioned[i])
+              kvCount += m.getUpdates().size();
+          }
+        }
+      } finally {
+        // clear immediately so mutations can be garbage collected
+        for (List<Mutation> list : partitioned) {
+          list.clear();
+        }
+      }
+    }
+    
+  }
+
+  private static class DefaultMap implements SimpleMap {
+    private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator());
+    private AtomicLong bytesInMemory = new AtomicLong();
+    private AtomicInteger size = new AtomicInteger();
+    
+    public void put(Key key, Value value) {
+      // Always a MemKey, so account for the kvCount int
+      bytesInMemory.addAndGet(key.getLength() + 4);
+      bytesInMemory.addAndGet(value.getSize());
+      if (map.put(key, value) == null)
+        size.incrementAndGet();
+    }
+    
+    public Value get(Key key) {
+      return map.get(key);
+    }
+    
+    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+      Key lk = new Key(startKey);
+      SortedMap<Key,Value> tm = map.tailMap(lk);
+      return tm.entrySet().iterator();
+    }
+    
+    public int size() {
+      return size.get();
+    }
+    
+    public synchronized InterruptibleIterator skvIterator() {
+      if (map == null)
+        throw new IllegalStateException();
+      
+      return new SortedMapIterator(map);
+    }
+    
+    public synchronized void delete() {
+      map = null;
+    }
+    
+    public long getOverheadPerEntry() {
+      // all of the java objects that are used to hold the
+      // data and make it searchable have overhead... this
+      // overhead is estimated using test.EstimateInMemMapOverhead
+      // and is in bytes.. the estimates were obtained by running
+      // java 6_16 in 64 bit server mode
+      
+      return 200;
+    }
+    
+    @Override
+    public void mutate(List<Mutation> mutations, int kvCount) {
+      for (Mutation m : mutations) {
+        for (ColumnUpdate cvp : m.getUpdates()) {
+          Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(),
+              false, kvCount++);
+          Value value = new Value(cvp.getValue());
+          put(newKey, value);
+        }
+      }
+    }
+    
+    @Override
+    public long getMemoryUsed() {
+      return bytesInMemory.get() + (size() * getOverheadPerEntry());
+    }
+  }
+  
+  private static class NativeMapWrapper implements SimpleMap {
+    private NativeMap nativeMap;
+    
+    NativeMapWrapper() {
+      nativeMap = new NativeMap();
+    }
+    
+    public Value get(Key key) {
+      return nativeMap.get(key);
+    }
+    
+    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
+      return nativeMap.iterator(startKey);
+    }
+    
+    public int size() {
+      return nativeMap.size();
+    }
+    
+    public InterruptibleIterator skvIterator() {
+      return (InterruptibleIterator) nativeMap.skvIterator();
+    }
+    
+    public void delete() {
+      nativeMap.delete();
+    }
+    
+    public long getMemoryUsed() {
+      return nativeMap.getMemoryUsed();
+    }
+    
+    @Override
+    public void mutate(List<Mutation> mutations, int kvCount) {
+      nativeMap.mutate(mutations, kvCount);
+    }
+  }
+  
+  private AtomicInteger nextKVCount = new AtomicInteger(1);
+  private AtomicInteger kvCount = new AtomicInteger(0);
+
+  private Object writeSerializer = new Object();
+  
+  /**
+   * Applies changes to a row in the InMemoryMap
+   * 
+   */
+  public void mutate(List<Mutation> mutations) {
+    int numKVs = 0;
+    for (int i = 0; i < mutations.size(); i++)
+      numKVs += mutations.get(i).size();
+    
+    // Can not update mutationCount while writes that started before
+    // are in progress, this would cause partial mutations to be seen.
+    // Also, can not continue until mutation count is updated, because
+    // a read may not see a successful write. Therefore writes must
+    // wait for writes that started before to finish.
+    //
+    // using separate lock from this map, to allow read/write in parallel
+    synchronized (writeSerializer ) {
+      int kv = nextKVCount.getAndAdd(numKVs);
+      try {
+        map.mutate(mutations, kv);
+      } finally {
+        kvCount.set(kv + numKVs - 1);
+      }
+    }
+  }
+  
+  /**
+   * Returns a long representing the size of the InMemoryMap
+   * 
+   * @return bytesInMemory
+   */
+  public synchronized long estimatedSizeInBytes() {
+    if (map == null)
+      return 0;
+    
+    return map.getMemoryUsed();
+  }
+  
+  Iterator<Map.Entry<Key,Value>> iterator(Key startKey) {
+    return map.iterator(startKey);
+  }
+  
+  public long getNumEntries() {
+    return map.size();
+  }
+  
+  private final Set<MemoryIterator> activeIters = Collections.synchronizedSet(new HashSet<MemoryIterator>());
+  
+  class MemoryDataSource implements DataSource {
+    
+    boolean switched = false;
+    private InterruptibleIterator iter;
+    private List<FileSKVIterator> readers;
+    
+    MemoryDataSource() {
+      this(new ArrayList<FileSKVIterator>());
+    }
+    
+    public MemoryDataSource(List<FileSKVIterator> readers) {
+      this.readers = readers;
+    }
+    
+    @Override
+    public boolean isCurrent() {
+      if (switched)
+        return true;
+      else
+        return memDumpFile == null;
+    }
+    
+    @Override
+    public DataSource getNewDataSource() {
+      if (switched)
+        throw new IllegalStateException();
+      
+      if (!isCurrent()) {
+        switched = true;
+        iter = null;
+      }
+      
+      return this;
+    }
+    
+    @Override
+    public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
+      if (iter == null)
+        if (!switched)
+          iter = map.skvIterator();
+        else {
+          
+          Configuration conf = CachedConfiguration.getInstance();
+          FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+          
+          FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
+
+          readers.add(reader);
+          
+          iter = new MemKeyConversionIterator(reader);
+        }
+      
+      return iter;
+    }
+    
+    @Override
+    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
+      return new MemoryDataSource(readers);
+    }
+    
+  }
+  
+  class MemoryIterator extends WrappingIterator implements InterruptibleIterator {
+    
+    private AtomicBoolean closed;
+    private SourceSwitchingIterator ssi;
+    private MemoryDataSource mds;
+    
+    protected SortedKeyValueIterator<Key,Value> getSource() {
+      if (closed.get())
+        throw new IllegalStateException("Memory iterator is closed");
+      return super.getSource();
+    }
+    
+    private MemoryIterator(InterruptibleIterator source) {
+      this(source, new AtomicBoolean(false));
+    }
+    
+    private MemoryIterator(SortedKeyValueIterator<Key,Value> source, AtomicBoolean closed) {
+      setSource(source);
+      this.closed = closed;
+    }
+    
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      return new MemoryIterator(getSource().deepCopy(env), closed);
+    }
+    
+    public void close() {
+      
+      synchronized (this) {
+        if (closed.compareAndSet(false, true)) {
+          
+          for (FileSKVIterator reader : mds.readers)
+            try {
+              reader.close();
+            } catch (IOException e) {
+              log.warn(e, e);
+            }
+        }
+      }
+      
+      // remove outside of sync to avoid deadlock
+      activeIters.remove(this);
+    }
+    
+    private synchronized boolean switchNow() throws IOException {
+      if (closed.get())
+        return false;
+      
+      ssi.switchNow();
+      return true;
+    }
+    
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+    }
+    
+    private void setSSI(SourceSwitchingIterator ssi) {
+      this.ssi = ssi;
+    }
+    
+    public void setMDS(MemoryDataSource mds) {
+      this.mds = mds;
+    }
+    
+  }
+  
+  public synchronized MemoryIterator skvIterator() {
+    if (map == null)
+      throw new NullPointerException();
+    
+    if (deleted)
+      throw new IllegalStateException("Can not obtain iterator after map deleted");
+    
+    int mc = kvCount.get();
+    MemoryDataSource mds = new MemoryDataSource();
+    SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
+    MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc));
+    mi.setSSI(ssi);
+    mi.setMDS(mds);
+    activeIters.add(mi);
+    return mi;
+  }
+  
+  public SortedKeyValueIterator<Key,Value> compactionIterator() {
+    
+    if (nextKVCount.get() - 1 != kvCount.get())
+      throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = "
+          + kvCount.get());
+    
+    return map.skvIterator();
+  }
+  
+  private boolean deleted = false;
+  
+  public void delete(long waitTime) {
+    
+    synchronized (this) {
+      if (deleted)
+        throw new IllegalStateException("Double delete");
+      
+      deleted = true;
+    }
+    
+    long t1 = System.currentTimeMillis();
+    
+    while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime) {
+      UtilWaitThread.sleep(50);
+    }
+    
+    if (activeIters.size() > 0) {
+      // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file
+      try {
+        Configuration conf = CachedConfiguration.getInstance();
+        FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+        
+        String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION;
+        
+        Configuration newConf = new Configuration(conf);
+        newConf.setInt("io.seqfile.compress.blocksize", 100000);
+        
+        FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
+        
+        InterruptibleIterator iter = map.skvIterator();
+       
+        HashSet<ByteSequence> allfams= new HashSet<ByteSequence>();
+        
+        for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){
+          allfams.addAll(entry.getValue());
+          out.startNewLocalityGroup(entry.getKey(), entry.getValue());
+          iter.seek(new Range(), entry.getValue(), true);
+          dumpLocalityGroup(out, iter);
+        }
+        
+        out.startDefaultLocalityGroup();
+        iter.seek(new Range(), allfams, false);
+       
+        dumpLocalityGroup(out, iter);
+        
+        out.close();
+        
+        log.debug("Created mem dump file " + tmpFile);
+        
+        memDumpFile = tmpFile;
+        
+        synchronized (activeIters) {
+          for (MemoryIterator mi : activeIters) {
+            mi.switchNow();
+          }
+        }
+        
+        // rely on unix behavior that file will be deleted when last
+        // reader closes it
+        fs.delete(new Path(memDumpFile), true);
+        
+      } catch (IOException ioe) {
+        log.error("Failed to create mem dump file ", ioe);
+        
+        while (activeIters.size() > 0) {
+          UtilWaitThread.sleep(100);
+        }
+      }
+      
+    }
+    
+    SimpleMap tmpMap = map;
+    
+    synchronized (this) {
+      map = null;
+    }
+    
+    tmpMap.delete();
+  }
+
+  private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException {
+    while (iter.hasTop() && activeIters.size() > 0) {
+      // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
+      // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
+      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
+      out.append(iter.getTopKey(), newValue);
+      iter.next();
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/LargestFirstMemoryManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/LargestFirstMemoryManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/LargestFirstMemoryManager.java
new file mode 100644
index 0000000..f8605ba
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/LargestFirstMemoryManager.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class LargestFirstMemoryManager implements MemoryManager {
+  
+  private static final Logger log = Logger.getLogger(LargestFirstMemoryManager.class);
+  private static final int TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER = 2;
+  
+  private long maxMemory = -1;
+  private int maxConcurrentMincs;
+  private int numWaitingMultiplier;
+  private long prevIngestMemory;
+  private double compactionThreshold;
+  private long maxObserved;
+  private HashMap<Text,Long> mincIdleThresholds;
+  private static final long zerotime = System.currentTimeMillis();
+  private ServerConfiguration config = null;
+  
+  LargestFirstMemoryManager(long maxMemory, int maxConcurrentMincs, int numWaitingMultiplier) {
+    this();
+    this.maxMemory = maxMemory;
+    this.maxConcurrentMincs = maxConcurrentMincs;
+    this.numWaitingMultiplier = numWaitingMultiplier;
+  }
+  
+  @Override
+  public void init(ServerConfiguration conf) {
+    this.config = conf;
+    maxMemory = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM);
+    maxConcurrentMincs = conf.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT);
+    numWaitingMultiplier = TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER;
+  }
+  
+  LargestFirstMemoryManager() {
+    prevIngestMemory = 0;
+    compactionThreshold = 0.5;
+    maxObserved = 0;
+    mincIdleThresholds = new HashMap<Text,Long>();
+  }
+  
+  @Override
+  public MemoryManagementActions getMemoryManagementActions(List<TabletState> tablets) {
+    if (maxMemory < 0)
+      throw new IllegalStateException("need to initialize Largst");
+    mincIdleThresholds.clear();
+    long ingestMemory = 0;
+    long compactionMemory = 0;
+    KeyExtent largestMemTablet = null;
+    long largestMemTableLoad = 0;
+    KeyExtent largestIdleMemTablet = null;
+    long largestIdleMemTableLoad = 0;
+    long mts;
+    long mcmts;
+    int numWaitingMincs = 0;
+    long idleTime;
+    long tml;
+    long ct = System.currentTimeMillis();
+    
+    long largestMemTableIdleTime = -1, largestMemTableSize = -1;
+    long largestIdleMemTableIdleTime = -1, largestIdleMemTableSize = -1;
+    
+    for (TabletState ts : tablets) {
+      mts = ts.getMemTableSize();
+      mcmts = ts.getMinorCompactingMemTableSize();
+      if (ts.getLastCommitTime() > 0)
+        idleTime = ct - ts.getLastCommitTime();
+      else
+        idleTime = ct - zerotime;
+      ingestMemory += mts;
+      tml = timeMemoryLoad(mts, idleTime);
+      if (mcmts == 0 && mts > 0) {
+        if (tml > largestMemTableLoad) {
+          largestMemTableLoad = tml;
+          largestMemTablet = ts.getExtent();
+          largestMemTableSize = mts;
+          largestMemTableIdleTime = idleTime;
+        }
+        Text tableId = ts.getExtent().getTableId();
+        if (!mincIdleThresholds.containsKey(tableId))
+          mincIdleThresholds.put(tableId, config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME));
+        if (idleTime > mincIdleThresholds.get(tableId) && tml > largestIdleMemTableLoad) {
+          largestIdleMemTableLoad = tml;
+          largestIdleMemTablet = ts.getExtent();
+          largestIdleMemTableSize = mts;
+          largestIdleMemTableIdleTime = idleTime;
+        }
+        // log.debug("extent: "+ts.getExtent()+" idle threshold: "+mincIdleThresholds.get(tableId)+" idle time: "+idleTime+" memtable: "+mts+" compacting: "+mcmts);
+      }
+      // else {
+      // log.debug("skipping extent "+ts.getExtent()+", nothing in memory");
+      // }
+      
+      compactionMemory += mcmts;
+      if (mcmts > 0)
+        numWaitingMincs++;
+    }
+    
+    if (ingestMemory + compactionMemory > maxObserved) {
+      maxObserved = ingestMemory + compactionMemory;
+    }
+    
+    long memoryChange = ingestMemory - prevIngestMemory;
+    prevIngestMemory = ingestMemory;
+    
+    MemoryManagementActions mma = new MemoryManagementActions();
+    mma.tabletsToMinorCompact = new ArrayList<KeyExtent>();
+    
+    boolean startMinC = false;
+    
+    if (numWaitingMincs < maxConcurrentMincs * numWaitingMultiplier) {
+      // based on previous ingest memory increase, if we think that the next increase will
+      // take us over the threshold for non-compacting memory, then start a minor compaction
+      // or if the idle time of the chosen tablet is greater than the threshold, start a minor compaction
+      if (memoryChange >= 0 && ingestMemory + memoryChange > compactionThreshold * maxMemory) {
+        startMinC = true;
+      } else if (largestIdleMemTablet != null) {
+        startMinC = true;
+        // switch largestMemTablet to largestIdleMemTablet
+        largestMemTablet = largestIdleMemTablet;
+        largestMemTableLoad = largestIdleMemTableLoad;
+        largestMemTableSize = largestIdleMemTableSize;
+        largestMemTableIdleTime = largestIdleMemTableIdleTime;
+        log.debug("IDLE minor compaction chosen");
+      }
+    }
+    
+    if (startMinC && largestMemTablet != null) {
+      mma.tabletsToMinorCompact.add(largestMemTablet);
+      log.debug(String.format("COMPACTING %s  total = %,d ingestMemory = %,d", largestMemTablet.toString(), (ingestMemory + compactionMemory), ingestMemory));
+      log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", largestMemTableSize, largestMemTableIdleTime / 1000.0, largestMemTableLoad));
+    } else if (memoryChange < 0) {
+      // before idle mincs, starting a minor compaction meant that memoryChange >= 0.
+      // we thought we might want to remove the "else" if that changed,
+      // however it seems performing idle compactions shouldn't make the threshold
+      // change more often, so it is staying for now.
+      // also, now we have the case where memoryChange < 0 due to an idle compaction, yet
+      // we are still adjusting the threshold. should this be tracked and prevented?
+      
+      // memory change < 0 means a minor compaction occurred
+      // we want to see how full the memory got during the compaction
+      // (the goal is for it to have between 80% and 90% memory utilization)
+      // and adjust the compactionThreshold accordingly
+      
+      log.debug(String.format("BEFORE compactionThreshold = %.3f maxObserved = %,d", compactionThreshold, maxObserved));
+      
+      if (compactionThreshold < 0.82 && maxObserved < 0.8 * maxMemory) {
+        // 0.82 * 1.1 is about 0.9, which is our desired max threshold
+        compactionThreshold *= 1.1;
+      } else if (compactionThreshold > 0.056 && maxObserved > 0.9 * maxMemory) {
+        // 0.056 * 0.9 is about 0.05, which is our desired min threshold
+        compactionThreshold *= 0.9;
+      }
+      maxObserved = 0;
+      
+      log.debug(String.format("AFTER compactionThreshold = %.3f", compactionThreshold));
+    }
+    
+    return mma;
+  }
+  
+  @Override
+  public void tabletClosed(KeyExtent extent) {}
+  
+  static long timeMemoryLoad(long mem, long time) {
+    double minutesIdle = time / 60000.0;
+    
+    return (long) (mem * Math.pow(2, minutesIdle / 15.0));
+  }
+  
+  public static void main(String[] args) {
+    for (int i = 0; i < 62; i++) {
+      System.out.printf("%d\t%d%n", i, timeMemoryLoad(1, i * 60000l));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
new file mode 100644
index 0000000..4bc8891
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.Key;
+
+class MemKey extends Key {
+  
+  int kvCount;
+  
+  public MemKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean del, boolean copy, int mc) {
+    super(row, cf, cq, cv, ts, del, copy);
+    this.kvCount = mc;
+  }
+  
+  public MemKey() {
+    super();
+    this.kvCount = Integer.MAX_VALUE;
+  }
+  
+  public MemKey(Key key, int mc) {
+    super(key);
+    this.kvCount = mc;
+  }
+  
+  public String toString() {
+    return super.toString() + " mc=" + kvCount;
+  }
+  
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    return super.clone();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(kvCount);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    kvCount = in.readInt();
+  }
+  
+  @Override
+  public int compareTo(Key k) {
+    
+    int cmp = super.compareTo(k);
+    
+    if (cmp == 0 && k instanceof MemKey) {
+      cmp = ((MemKey) k).kvCount - kvCount;
+    }
+    
+    return cmp;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
new file mode 100644
index 0000000..13bcdbe
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.Value;
+
+/**
+ * 
+ */
+public class MemValue extends Value {
+  int kvCount;
+  boolean merged = false;
+  
+  /**
+   * @param value
+   *          Value
+   * @param kv
+   *          kv count
+   */
+  public MemValue(byte[] value, int kv) {
+    super(value);
+    this.kvCount = kv;
+  }
+  
+  public MemValue() {
+    super();
+    this.kvCount = Integer.MAX_VALUE;
+  }
+  
+  public MemValue(Value value, int kv) {
+    super(value);
+    this.kvCount = kv;
+  }
+  
+  // Override
+  public void write(final DataOutput out) throws IOException {
+    if (!merged) {
+      byte[] combinedBytes = new byte[getSize() + 4];
+      System.arraycopy(value, 0, combinedBytes, 4, getSize());
+      combinedBytes[0] = (byte) (kvCount >>> 24);
+      combinedBytes[1] = (byte) (kvCount >>> 16);
+      combinedBytes[2] = (byte) (kvCount >>> 8);
+      combinedBytes[3] = (byte) (kvCount);
+      value = combinedBytes;
+      merged = true;
+    }
+    super.write(out);
+  }
+  
+  public void set(final byte[] b) {
+    super.set(b);
+    merged = false;
+  }
+
+  public void copy(byte[] b) {
+    super.copy(b);
+    merged = false;
+  }
+  
+  /**
+   * Takes a Value and will take out the embedded kvCount, and then return that value while replacing the Value with the original unembedded version
+   * 
+   * @param v
+   * @return The kvCount embedded in v.
+   */
+  public static int splitKVCount(Value v) {
+    if (v instanceof MemValue)
+      return ((MemValue) v).kvCount;
+    
+    byte[] originalBytes = new byte[v.getSize() - 4];
+    byte[] combined = v.get();
+    System.arraycopy(combined, 4, originalBytes, 0, originalBytes.length);
+    v.set(originalBytes);
+    return (combined[0] << 24) + ((combined[1] & 0xFF) << 16) + ((combined[2] & 0xFF) << 8) + (combined[3] & 0xFF);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/MemoryManagementActions.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemoryManagementActions.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemoryManagementActions.java
new file mode 100644
index 0000000..31fc431
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemoryManagementActions.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.List;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+public class MemoryManagementActions {
+  public List<KeyExtent> tabletsToMinorCompact;
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/MemoryManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemoryManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemoryManager.java
new file mode 100644
index 0000000..45e02b8
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemoryManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.List;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+
+/**
+ * A MemoryManager in accumulo currently determines when minor compactions should occur and when ingest should be put on hold. The goal of a memory manager
+ * implementation is to maximize ingest throughput and minimize the number of minor compactions.
+ * 
+ * 
+ * 
+ */
+
+public interface MemoryManager {
+  
+  /**
+   * Initialize the memory manager.
+   * 
+   * @param conf
+   */
+  void init(ServerConfiguration conf);
+
+  /**
+   * An implementation of this function will be called periodically by accumulo and should return a list of tablets to minor compact.
+   * 
+   * Instructing a tablet that is already minor compacting (this can be inferred from the TabletState) to minor compact has no effect.
+   * 
+   * Holding all ingest does not affect metadata tablets.
+   */
+  
+  MemoryManagementActions getMemoryManagementActions(List<TabletState> tablets);
+  
+  /**
+   * This method is called when a tablet is closed. A memory manger can clean up any per tablet state it is keeping when this is called.
+   */
+  void tabletClosed(KeyExtent extent);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java
new file mode 100644
index 0000000..9e4c5cd
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class MinorCompactor extends Compactor {
+  
+  private static final Logger log = Logger.getLogger(MinorCompactor.class);
+  
+  private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap();
+  
+  private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue dfv) {
+    if (mergeFile == null)
+      return EMPTY_MAP;
+    
+    return Collections.singletonMap(mergeFile, dfv);
+  }
+  
+  MinorCompactor(Configuration conf, VolumeManager fs, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration acuTableConf,
+      KeyExtent extent, MinorCompactionReason mincReason) {
+    super(conf, fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() {
+      
+      @Override
+      public boolean isCompactionEnabled() {
+        return true;
+      }
+      
+      @Override
+      public IteratorScope getIteratorScope() {
+        return IteratorScope.minc;
+      }
+    });
+    
+    super.mincReason = mincReason;
+  }
+  
+  private boolean isTableDeleting() {
+    try {
+      return Tables.getTableState(HdfsZooInstance.getInstance(), extent.getTableId().toString()) == TableState.DELETING;
+    } catch (Exception e) {
+      log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ", e);
+      return false; // can not get positive confirmation that its deleting.
+    }
+  }
+  
+  @Override
+  public CompactionStats call() {
+    log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent());
+    
+    // output to new MapFile with a temporary name
+    int sleepTime = 100;
+    double growthFactor = 4;
+    int maxSleepTime = 1000 * 60 * 3; // 3 minutes
+    boolean reportedProblem = false;
+    
+    runningCompactions.add(this);
+    try {
+      do {
+        try {
+          CompactionStats ret = super.call();
+          
+          // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f secs | %,d bytes ",map.size(), entriesCompacted,
+          // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes()));
+          
+          if (reportedProblem) {
+            ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile());
+          }
+          
+          return ret;
+        } catch (IOException e) {
+          log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...");
+          ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e));
+          reportedProblem = true;
+        } catch (RuntimeException e) {
+          // if this is coming from a user iterator, it is possible that the user could change the iterator config and that the
+          // minor compaction would succeed
+          log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...", e);
+          ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e));
+          reportedProblem = true;
+        } catch (CompactionCanceledException e) {
+          throw new IllegalStateException(e);
+        }
+        
+        Random random = new Random();
+        
+        int sleep = sleepTime + random.nextInt(sleepTime);
+        log.debug("MinC failed sleeping " + sleep + " ms before retrying");
+        UtilWaitThread.sleep(sleep);
+        sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor));
+        
+        // clean up
+        try {
+          if (getFileSystem().exists(new Path(getOutputFile()))) {
+            getFileSystem().deleteRecursively(new Path(getOutputFile()));
+          }
+        } catch (IOException e) {
+          log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());
+        }
+        
+        if (isTableDeleting())
+          return new CompactionStats(0, 0);
+        
+      } while (true);
+    } finally {
+      runningCompactions.remove(this);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/MutationLog.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MutationLog.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MutationLog.java
new file mode 100644
index 0000000..9ce2146
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MutationLog.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class MutationLog {
+  private FSDataOutputStream logout;
+  
+  public static final byte MUTATION_EVENT = 1;
+  public static final byte CLOSE_EVENT = 2;
+  
+  public MutationLog(Path logfile) throws IOException {
+    
+    Configuration conf = CachedConfiguration.getInstance();
+    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration()));
+    
+    if (!fs.exists(logfile))
+      logout = fs.create(logfile);
+  }
+  
+  public void log(Mutation m) throws IOException {
+    // write event type
+    logout.writeByte(MUTATION_EVENT);
+    
+    // write event
+    m.write(logout);
+    logout.flush();
+  }
+  
+  public void close() throws IOException {
+    logout.writeByte(CLOSE_EVENT);
+    logout.close();
+  }
+  
+  public static Iterator<Mutation> replay(Path logfile, Tablet t, long min_timestamp) throws IOException {
+    Configuration conf = CachedConfiguration.getInstance();
+    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration()));
+    
+    final FSDataInputStream login = fs.open(logfile);
+    
+    final Mutation mutation = new ServerMutation();
+    
+    return new Iterator<Mutation>() {
+      
+      byte eventType;
+      
+      {
+        eventType = login.readByte();
+      }
+      
+      public boolean hasNext() {
+        return eventType != CLOSE_EVENT;
+      }
+      
+      public Mutation next() {
+        try {
+          mutation.readFields(login);
+          eventType = login.readByte();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        
+        return mutation;
+      }
+      
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+}


Mime
View raw message