accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [2/2] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT
Date Mon, 25 Aug 2014 16:10:04 GMT
Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT

Conflicts:
	server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3d3d301f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3d3d301f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3d3d301f

Branch: refs/heads/1.6.1-SNAPSHOT
Commit: 3d3d301f4ab39efd27d1deb1df438a1df0b9c1e8
Parents: 23c74cd 30a0ca3
Author: Keith Turner <kturner@apache.org>
Authored: Mon Aug 25 11:51:38 2014 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Mon Aug 25 11:51:38 2014 -0400

----------------------------------------------------------------------
 .../system/SourceSwitchingIterator.java         |  2 +-
 .../apache/accumulo/tserver/InMemoryMap.java    | 61 +++++++++++++-------
 .../accumulo/tserver/InMemoryMapTest.java       | 44 ++++++++++++++
 3 files changed, 85 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3d3d301f/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index dc36718,0000000..5f6d9ce
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@@ -1,753 -1,0 +1,772 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentSkipListMap;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.ColumnUpdate;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.FileSKVWriter;
 +import org.apache.accumulo.core.file.rfile.RFile;
 +import org.apache.accumulo.core.file.rfile.RFileOperations;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SkippingIterator;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.SortedMapIterator;
 +import org.apache.accumulo.core.iterators.WrappingIterator;
 +import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
 +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
 +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
 +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
 +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.LocalityGroupUtil;
 +import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 +import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.trace.TraceFileSystem;
 +import org.apache.commons.lang.mutable.MutableLong;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +
 +class MemKeyComparator implements Comparator<Key>, Serializable {
 +  
 +  private static final long serialVersionUID = 1L;
 +
 +  @Override
 +  public int compare(Key k1, Key k2) {
 +    int cmp = k1.compareTo(k2);
 +    
 +    if (cmp == 0) {
 +      if (k1 instanceof MemKey)
 +        if (k2 instanceof MemKey)
 +          cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
 +        else
 +          cmp = 1;
 +      else if (k2 instanceof MemKey)
 +        cmp = -1;
 +    }
 +    
 +    return cmp;
 +  }
 +}
 +
 +class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator
{
 +  
 +  int kvCount;
 +  
 +  public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source,
int maxKVCount) {
 +    setSource(source);
 +    this.kvCount = maxKVCount;
 +  }
 +  
 +  @Override
 +  protected void consume() throws IOException {
 +    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount >
kvCount)
 +      getSource().next();
 +  }
 +  
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
 +  }
 +  
 +  @Override
 +  public void setInterruptFlag(AtomicBoolean flag) {
 +    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +  }
 +  
 +}
 +
 +class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator
{
 +  MemKey currKey = null;
 +  Value currVal = null;
 +
 +  public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source) {
 +    super();
 +    setSource(source);
 +  }
 +
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    return new MemKeyConversionIterator(getSource().deepCopy(env));
 +  }
 +  
 +  @Override
 +  public Key getTopKey() {
 +    return currKey;
 +  }
 +  
 +  @Override
 +  public Value getTopValue() {
 +    return currVal;
 +  }
 +  
 +  private void getTopKeyVal() {
 +    Key k = super.getTopKey();
 +    Value v = super.getTopValue();
 +    if (k instanceof MemKey || k == null) {
 +      currKey = (MemKey) k;
 +      currVal = v;
 +      return;
 +    }
 +    currVal = new Value(v);
 +    int mc = MemValue.splitKVCount(currVal);
 +    currKey = new MemKey(k, mc);
 +
 +  }
 +  
 +  public void next() throws IOException {
 +    super.next();
 +    if (hasTop())
 +      getTopKeyVal();
 +  }
 +
 +  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
 +    super.seek(range, columnFamilies, inclusive);
 +    
 +    if (hasTop())
 +      getTopKeyVal();
 +
 +    Key k = range.getStartKey();
 +    if (k instanceof MemKey && hasTop()) {
 +      while (hasTop() && currKey.compareTo(k) < 0)
 +        next();
 +    }
 +  }
 +
 +  @Override
 +  public void setInterruptFlag(AtomicBoolean flag) {
 +    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +  }
 +
 +}
 +
 +public class InMemoryMap {
 +  private SimpleMap map = null;
 +  
 +  private static final Logger log = Logger.getLogger(InMemoryMap.class);
 +  
 +  private volatile String memDumpFile = null;
 +  private final String memDumpDir;
 +
 +  private Map<String,Set<ByteSequence>> lggroups;
 +  
 +  public InMemoryMap(boolean useNativeMap, String memDumpDir) {
 +    this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
 +  }
 +
 +  public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap,
String memDumpDir) {
 +    this.memDumpDir = memDumpDir;
 +    this.lggroups = lggroups;
 +    
 +    if (lggroups.size() == 0)
 +      map = newMap(useNativeMap);
 +    else
 +      map = new LocalityGroupMap(lggroups, useNativeMap);
 +  }
 +  
 +  public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError
{
 +    this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED),
config.get(Property.TSERV_MEMDUMP_DIR));
 +  }
 +  
 +  private static SimpleMap newMap(boolean useNativeMap) {
 +    if (useNativeMap && NativeMap.isLoaded()) {
 +      try {
 +        return new NativeMapWrapper();
 +      } catch (Throwable t) {
 +        log.error("Failed to create native map", t);
 +      }
 +    }
 +    
 +    return new DefaultMap();
 +  }
 +  
 +  private interface SimpleMap {
 +    Value get(Key key);
 +    
 +    Iterator<Entry<Key,Value>> iterator(Key startKey);
 +    
 +    int size();
 +    
 +    InterruptibleIterator skvIterator();
 +    
 +    void delete();
 +    
 +    long getMemoryUsed();
 +    
 +    void mutate(List<Mutation> mutations, int kvCount);
 +  }
 +  
 +  private static class LocalityGroupMap implements SimpleMap {
 +    
 +    private Map<ByteSequence,MutableLong> groupFams[];
 +    
 +    // the last map in the array is the default locality group
 +    private SimpleMap maps[];
 +    private Partitioner partitioner;
 +    private List<Mutation>[] partitioned;
 +    private Set<ByteSequence> nonDefaultColumnFamilies;
 +    
 +    @SuppressWarnings("unchecked")
 +    LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap)
{
 +      this.groupFams = new Map[groups.size()];
 +      this.maps = new SimpleMap[groups.size() + 1];
 +      this.partitioned = new List[groups.size() + 1];
 +      this.nonDefaultColumnFamilies = new HashSet<ByteSequence>();
 +      
 +      for (int i = 0; i < maps.length; i++) {
 +        maps[i] = newMap(useNativeMap);
 +      }
 +
 +      int count = 0;
 +      for (Set<ByteSequence> cfset : groups.values()) {
 +        HashMap<ByteSequence,MutableLong> map = new HashMap<ByteSequence,MutableLong>();
 +        for (ByteSequence bs : cfset)
 +          map.put(bs, new MutableLong(1));
 +        this.groupFams[count++] = map;
 +        nonDefaultColumnFamilies.addAll(cfset);
 +      }
 +      
 +      partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
 +      
 +      for (int i = 0; i < partitioned.length; i++) {
 +        partitioned[i] = new ArrayList<Mutation>();
 +      }
 +    }
 +
 +    @Override
 +    public Value get(Key key) {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Override
 +    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +    @Override
 +    public int size() {
 +      int sum = 0;
 +      for (SimpleMap map : maps)
 +        sum += map.size();
 +      return sum;
 +    }
 +    
 +    @Override
 +    public InterruptibleIterator skvIterator() {
 +      LocalityGroup groups[] = new LocalityGroup[maps.length];
 +      for (int i = 0; i < groups.length; i++) {
 +        if (i < groupFams.length)
 +          groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false);
 +        else
 +          groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true);
 +      }
 +
 +
 +      return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
 +    }
 +    
 +    @Override
 +    public void delete() {
 +      for (SimpleMap map : maps)
 +        map.delete();
 +    }
 +    
 +    @Override
 +    public long getMemoryUsed() {
 +      long sum = 0;
 +      for (SimpleMap map : maps)
 +        sum += map.getMemoryUsed();
 +      return sum;
 +    }
 +    
 +    @Override
 +    public synchronized void mutate(List<Mutation> mutations, int kvCount) {
 +      // this method is synchronized because it reuses objects to avoid allocation,
 +      // currently, the method that calls this is synchronized so there is no
 +      // loss in parallelism.... synchronization was added here for future proofing
 +      
 +      try{
 +        partitioner.partition(mutations, partitioned);
 +        
 +        for (int i = 0; i < partitioned.length; i++) {
 +          if (partitioned[i].size() > 0) {
 +            maps[i].mutate(partitioned[i], kvCount);
 +            for (Mutation m : partitioned[i])
 +              kvCount += m.getUpdates().size();
 +          }
 +        }
 +      } finally {
 +        // clear immediately so mutations can be garbage collected
 +        for (List<Mutation> list : partitioned) {
 +          list.clear();
 +        }
 +      }
 +    }
 +    
 +  }
 +
 +  private static class DefaultMap implements SimpleMap {
 +    private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new
MemKeyComparator());
 +    private AtomicLong bytesInMemory = new AtomicLong();
 +    private AtomicInteger size = new AtomicInteger();
 +    
 +    public void put(Key key, Value value) {
 +      // Always a MemKey, so account for the kvCount int
 +      bytesInMemory.addAndGet(key.getLength() + 4);
 +      bytesInMemory.addAndGet(value.getSize());
 +      if (map.put(key, value) == null)
 +        size.incrementAndGet();
 +    }
 +    
 +    public Value get(Key key) {
 +      return map.get(key);
 +    }
 +    
 +    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
 +      Key lk = new Key(startKey);
 +      SortedMap<Key,Value> tm = map.tailMap(lk);
 +      return tm.entrySet().iterator();
 +    }
 +    
 +    public int size() {
 +      return size.get();
 +    }
 +    
 +    public synchronized InterruptibleIterator skvIterator() {
 +      if (map == null)
 +        throw new IllegalStateException();
 +      
 +      return new SortedMapIterator(map);
 +    }
 +    
 +    public synchronized void delete() {
 +      map = null;
 +    }
 +    
 +    public long getOverheadPerEntry() {
 +      // all of the java objects that are used to hold the
 +      // data and make it searchable have overhead... this
 +      // overhead is estimated using test.EstimateInMemMapOverhead
 +      // and is in bytes.. the estimates were obtained by running
 +      // java 6_16 in 64 bit server mode
 +      
 +      return 200;
 +    }
 +    
 +    @Override
 +    public void mutate(List<Mutation> mutations, int kvCount) {
 +      for (Mutation m : mutations) {
 +        for (ColumnUpdate cvp : m.getUpdates()) {
 +          Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(),
cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(),
 +              false, kvCount++);
 +          Value value = new Value(cvp.getValue());
 +          put(newKey, value);
 +        }
 +      }
 +    }
 +    
 +    @Override
 +    public long getMemoryUsed() {
 +      return bytesInMemory.get() + (size() * getOverheadPerEntry());
 +    }
 +  }
 +  
 +  private static class NativeMapWrapper implements SimpleMap {
 +    private NativeMap nativeMap;
 +    
 +    NativeMapWrapper() {
 +      nativeMap = new NativeMap();
 +    }
 +    
 +    public Value get(Key key) {
 +      return nativeMap.get(key);
 +    }
 +    
 +    public Iterator<Entry<Key,Value>> iterator(Key startKey) {
 +      return nativeMap.iterator(startKey);
 +    }
 +    
 +    public int size() {
 +      return nativeMap.size();
 +    }
 +    
 +    public InterruptibleIterator skvIterator() {
 +      return (InterruptibleIterator) nativeMap.skvIterator();
 +    }
 +    
 +    public void delete() {
 +      nativeMap.delete();
 +    }
 +    
 +    public long getMemoryUsed() {
 +      return nativeMap.getMemoryUsed();
 +    }
 +    
 +    @Override
 +    public void mutate(List<Mutation> mutations, int kvCount) {
 +      nativeMap.mutate(mutations, kvCount);
 +    }
 +  }
 +  
 +  private AtomicInteger nextKVCount = new AtomicInteger(1);
 +  private AtomicInteger kvCount = new AtomicInteger(0);
 +
 +  private Object writeSerializer = new Object();
 +  
 +  /**
 +   * Applies changes to a row in the InMemoryMap
 +   * 
 +   */
 +  public void mutate(List<Mutation> mutations) {
 +    int numKVs = 0;
 +    for (int i = 0; i < mutations.size(); i++)
 +      numKVs += mutations.get(i).size();
 +    
 +    // Can not update mutationCount while writes that started before
 +    // are in progress, this would cause partial mutations to be seen.
 +    // Also, can not continue until mutation count is updated, because
 +    // a read may not see a successful write. Therefore writes must
 +    // wait for writes that started before to finish.
 +    //
 +    // using separate lock from this map, to allow read/write in parallel
 +    synchronized (writeSerializer ) {
 +      int kv = nextKVCount.getAndAdd(numKVs);
 +      try {
 +        map.mutate(mutations, kv);
 +      } finally {
 +        kvCount.set(kv + numKVs - 1);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Returns a long representing the size of the InMemoryMap
 +   * 
 +   * @return bytesInMemory
 +   */
 +  public synchronized long estimatedSizeInBytes() {
 +    if (map == null)
 +      return 0;
 +    
 +    return map.getMemoryUsed();
 +  }
 +  
 +  Iterator<Map.Entry<Key,Value>> iterator(Key startKey) {
 +    return map.iterator(startKey);
 +  }
 +  
 +  public long getNumEntries() {
 +    return map.size();
 +  }
 +  
 +  private final Set<MemoryIterator> activeIters = Collections.synchronizedSet(new
HashSet<MemoryIterator>());
 +  
 +  class MemoryDataSource implements DataSource {
 +    
 +    boolean switched = false;
 +    private InterruptibleIterator iter;
-     private List<FileSKVIterator> readers;
++    private FileSKVIterator reader;
++    private MemoryDataSource parent;
++    private IteratorEnvironment env;
 +    
 +    MemoryDataSource() {
-       this(new ArrayList<FileSKVIterator>());
++      this(null, false, null);
 +    }
 +    
-     public MemoryDataSource(List<FileSKVIterator> readers) {
-       this.readers = readers;
++    public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment
env) {
++      this.parent = parent;
++      this.switched = switched;
++      this.env = env;
 +    }
 +    
 +    @Override
 +    public boolean isCurrent() {
 +      if (switched)
 +        return true;
 +      else
 +        return memDumpFile == null;
 +    }
 +    
 +    @Override
 +    public DataSource getNewDataSource() {
 +      if (switched)
 +        throw new IllegalStateException();
 +      
 +      if (!isCurrent()) {
 +        switched = true;
 +        iter = null;
++        try {
++          // ensure files are referenced even if iterator was never seeked before
++          iterator();
++        } catch (IOException e) {
++          throw new RuntimeException();
++        }
 +      }
 +      
 +      return this;
 +    }
 +    
++    private synchronized FileSKVIterator getReader() throws IOException {
++      if (reader == null) {
++        Configuration conf = CachedConfiguration.getInstance();
++        FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
++        
++        reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
++      }
++
++      return reader;
++    }
++
 +    @Override
 +    public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
 +      if (iter == null)
 +        if (!switched)
 +          iter = map.skvIterator();
 +        else {
-           
-           Configuration conf = CachedConfiguration.getInstance();
-           FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
-           
-           FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs,
conf, ServerConfiguration.getSiteConfiguration());
- 
-           readers.add(reader);
-           
-           iter = new MemKeyConversionIterator(reader);
++          if (parent == null)
++            iter = new MemKeyConversionIterator(getReader());
++          else
++            synchronized (parent) {
++              // synchronize deep copy operation on parent, this prevents multiple threads
from deep copying the rfile shared from parent its possible that the
++              // thread deleting an InMemoryMap and scan thread could be switching different
deep copies
++              iter = new MemKeyConversionIterator(parent.getReader().deepCopy(env));
++            }
 +        }
 +      
 +      return iter;
 +    }
 +    
 +    @Override
 +    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
-       return new MemoryDataSource(readers);
++      return new MemoryDataSource(parent == null ? this : parent, switched, env);
 +    }
 +    
 +  }
 +  
 +  class MemoryIterator extends WrappingIterator implements InterruptibleIterator {
 +    
 +    private AtomicBoolean closed;
 +    private SourceSwitchingIterator ssi;
 +    private MemoryDataSource mds;
 +    
 +    protected SortedKeyValueIterator<Key,Value> getSource() {
 +      if (closed.get())
 +        throw new IllegalStateException("Memory iterator is closed");
 +      return super.getSource();
 +    }
 +    
 +    private MemoryIterator(InterruptibleIterator source) {
 +      this(source, new AtomicBoolean(false));
 +    }
 +    
 +    private MemoryIterator(SortedKeyValueIterator<Key,Value> source, AtomicBoolean
closed) {
 +      setSource(source);
 +      this.closed = closed;
 +    }
 +    
 +    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +      return new MemoryIterator(getSource().deepCopy(env), closed);
 +    }
 +    
 +    public void close() {
 +      
 +      synchronized (this) {
 +        if (closed.compareAndSet(false, true)) {
-           
-           for (FileSKVIterator reader : mds.readers)
-             try {
-               reader.close();
-             } catch (IOException e) {
-               log.warn(e, e);
-             }
++          try {
++            if (mds.reader != null)
++              mds.reader.close();
++          } catch (IOException e) {
++            log.warn(e, e);
++          }
 +        }
 +      }
 +      
 +      // remove outside of sync to avoid deadlock
 +      activeIters.remove(this);
 +    }
 +    
 +    private synchronized boolean switchNow() throws IOException {
 +      if (closed.get())
 +        return false;
 +      
 +      ssi.switchNow();
 +      return true;
 +    }
 +    
 +    @Override
 +    public void setInterruptFlag(AtomicBoolean flag) {
 +      ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +    }
 +    
 +    private void setSSI(SourceSwitchingIterator ssi) {
 +      this.ssi = ssi;
 +    }
 +    
 +    public void setMDS(MemoryDataSource mds) {
 +      this.mds = mds;
 +    }
 +    
 +  }
 +  
 +  public synchronized MemoryIterator skvIterator() {
 +    if (map == null)
 +      throw new NullPointerException();
 +    
 +    if (deleted)
 +      throw new IllegalStateException("Can not obtain iterator after map deleted");
 +    
 +    int mc = kvCount.get();
 +    MemoryDataSource mds = new MemoryDataSource();
 +    SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
 +    MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc));
 +    mi.setSSI(ssi);
 +    mi.setMDS(mds);
 +    activeIters.add(mi);
 +    return mi;
 +  }
 +  
 +  public SortedKeyValueIterator<Key,Value> compactionIterator() {
 +    
 +    if (nextKVCount.get() - 1 != kvCount.get())
 +      throw new IllegalStateException("Memory map in unexpected state : nextKVCount = "
+ nextKVCount.get() + " kvCount = "
 +          + kvCount.get());
 +    
 +    return map.skvIterator();
 +  }
 +  
 +  private boolean deleted = false;
 +  
 +  public void delete(long waitTime) {
 +    
 +    synchronized (this) {
 +      if (deleted)
 +        throw new IllegalStateException("Double delete");
 +      
 +      deleted = true;
 +    }
 +    
 +    long t1 = System.currentTimeMillis();
 +    
 +    while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime)
{
 +      UtilWaitThread.sleep(50);
 +    }
 +    
 +    if (activeIters.size() > 0) {
 +      // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp
file
 +      try {
 +        Configuration conf = CachedConfiguration.getInstance();
 +        FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
 +        
 +        String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION;
 +        
 +        Configuration newConf = new Configuration(conf);
 +        newConf.setInt("io.seqfile.compress.blocksize", 100000);
 +        
 +        FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
 +        
 +        InterruptibleIterator iter = map.skvIterator();
 +       
 +        HashSet<ByteSequence> allfams= new HashSet<ByteSequence>();
 +        
 +        for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){
 +          allfams.addAll(entry.getValue());
 +          out.startNewLocalityGroup(entry.getKey(), entry.getValue());
 +          iter.seek(new Range(), entry.getValue(), true);
 +          dumpLocalityGroup(out, iter);
 +        }
 +        
 +        out.startDefaultLocalityGroup();
 +        iter.seek(new Range(), allfams, false);
 +       
 +        dumpLocalityGroup(out, iter);
 +        
 +        out.close();
 +        
 +        log.debug("Created mem dump file " + tmpFile);
 +        
 +        memDumpFile = tmpFile;
 +        
 +        synchronized (activeIters) {
 +          for (MemoryIterator mi : activeIters) {
 +            mi.switchNow();
 +          }
 +        }
 +        
 +        // rely on unix behavior that file will be deleted when last
 +        // reader closes it
 +        fs.delete(new Path(memDumpFile), true);
 +        
 +      } catch (IOException ioe) {
 +        log.error("Failed to create mem dump file ", ioe);
 +        
 +        while (activeIters.size() > 0) {
 +          UtilWaitThread.sleep(100);
 +        }
 +      }
 +      
 +    }
 +    
 +    SimpleMap tmpMap = map;
 +    
 +    synchronized (this) {
 +      map = null;
 +    }
 +    
 +    tmpMap.delete();
 +  }
 +
 +  private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException
{
 +    while (iter.hasTop() && activeIters.size() > 0) {
 +      // RFile does not support MemKey, so we move the kv count into the value only for
the RFile.
 +      // There is no need to change the MemKey to a normal key because the kvCount info
gets lost when it is written
 +      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
 +      out.append(iter.getTopKey(), newValue);
 +      iter.next();
 +
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3d3d301f/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
----------------------------------------------------------------------
diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
index dc7ee99,0000000..3932552
mode 100644,000000..100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
@@@ -1,513 -1,0 +1,557 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.data.ArrayByteSequence;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 +import org.apache.accumulo.core.util.LocalityGroupUtil;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ZooConfiguration;
 +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +import org.junit.BeforeClass;
 +import org.junit.Ignore;
 +import org.junit.Rule;
 +import org.junit.Test;
 +import org.junit.rules.TemporaryFolder;
 +
 +public class InMemoryMapTest {
 +
 +  @BeforeClass
 +  public static void setUp() throws Exception {
 +    // suppress log messages having to do with not having an instance
 +    Logger.getLogger(ZooConfiguration.class).setLevel(Level.OFF);
 +    Logger.getLogger(HdfsZooInstance.class).setLevel(Level.OFF);
 +  }
 +
 +  @Rule
 +  public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir")
+ "/target"));
 +
 +  public void mutate(InMemoryMap imm, String row, String column, long ts) {
 +    Mutation m = new Mutation(new Text(row));
 +    String[] sa = column.split(":");
 +    m.putDelete(new Text(sa[0]), new Text(sa[1]), ts);
 +
 +    imm.mutate(Collections.singletonList(m));
 +  }
 +
 +  public void mutate(InMemoryMap imm, String row, String column, long ts, String value)
{
 +    Mutation m = new Mutation(new Text(row));
 +    String[] sa = column.split(":");
 +    m.put(new Text(sa[0]), new Text(sa[1]), ts, new Value(value.getBytes()));
 +
 +    imm.mutate(Collections.singletonList(m));
 +  }
 +
 +  static Key nk(String row, String column, long ts) {
 +    String[] sa = column.split(":");
 +    Key k = new Key(new Text(row), new Text(sa[0]), new Text(sa[1]), ts);
 +    return k;
 +  }
 +
 +  static void ae(SortedKeyValueIterator<Key,Value> dc, String row, String column,
int ts, String val) throws IOException {
 +    assertTrue(dc.hasTop());
 +    assertEquals(nk(row, column, ts), dc.getTopKey());
 +    assertEquals(new Value(val.getBytes()), dc.getTopValue());
 +    dc.next();
 +
 +  }
 +
 +  static Set<ByteSequence> newCFSet(String... cfs) {
 +    HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>();
 +    for (String cf : cfs) {
 +      cfSet.add(new ArrayByteSequence(cf));
 +    }
 +    return cfSet;
 +  }
 +
 +  @Test
 +  public void test2() throws Exception {
 +    InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
 +
 +    MemoryIterator ski1 = imm.skvIterator();
 +    mutate(imm, "r1", "foo:cq1", 3, "bar1");
 +    MemoryIterator ski2 = imm.skvIterator();
 +
 +    ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    assertFalse(ski1.hasTop());
 +
 +    ski2.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    assertTrue(ski2.hasTop());
 +    ae(ski2, "r1", "foo:cq1", 3, "bar1");
 +    assertFalse(ski2.hasTop());
 +
 +  }
 +
 +  @Test
 +  public void test3() throws Exception {
 +    InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
 +
 +    mutate(imm, "r1", "foo:cq1", 3, "bar1");
 +    mutate(imm, "r1", "foo:cq1", 3, "bar2");
 +    MemoryIterator ski1 = imm.skvIterator();
 +    mutate(imm, "r1", "foo:cq1", 3, "bar3");
 +
 +    mutate(imm, "r3", "foo:cq1", 3, "bar9");
 +    mutate(imm, "r3", "foo:cq1", 3, "bara");
 +
 +    MemoryIterator ski2 = imm.skvIterator();
 +
 +    ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(ski1, "r1", "foo:cq1", 3, "bar2");
 +    ae(ski1, "r1", "foo:cq1", 3, "bar1");
 +    assertFalse(ski1.hasTop());
 +
 +    ski2.seek(new Range(new Text("r3")), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(ski2, "r3", "foo:cq1", 3, "bara");
 +    ae(ski2, "r3", "foo:cq1", 3, "bar9");
 +    assertFalse(ski1.hasTop());
 +
 +  }
 +
 +  @Test
 +  public void test4() throws Exception {
 +    InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
 +
 +    mutate(imm, "r1", "foo:cq1", 3, "bar1");
 +    mutate(imm, "r1", "foo:cq1", 3, "bar2");
 +    MemoryIterator ski1 = imm.skvIterator();
 +    mutate(imm, "r1", "foo:cq1", 3, "bar3");
 +
 +    imm.delete(0);
 +
 +    ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(ski1, "r1", "foo:cq1", 3, "bar2");
 +    ae(ski1, "r1", "foo:cq1", 3, "bar1");
 +    assertFalse(ski1.hasTop());
 +
 +    ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(ski1, "r1", "foo:cq1", 3, "bar2");
 +    ae(ski1, "r1", "foo:cq1", 3, "bar1");
 +    assertFalse(ski1.hasTop());
 +
 +    ski1.seek(new Range(new Text("r2")), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    assertFalse(ski1.hasTop());
 +
 +    ski1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(ski1, "r1", "foo:cq1", 3, "bar2");
 +    ae(ski1, "r1", "foo:cq1", 3, "bar1");
 +    assertFalse(ski1.hasTop());
 +
 +    ski1.close();
 +  }
 +
 +  @Test
 +  public void test5() throws Exception {
 +    InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
 +
 +    mutate(imm, "r1", "foo:cq1", 3, "bar1");
 +    mutate(imm, "r1", "foo:cq1", 3, "bar2");
 +    mutate(imm, "r1", "foo:cq1", 3, "bar3");
 +
 +    MemoryIterator ski1 = imm.skvIterator();
 +    ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(ski1, "r1", "foo:cq1", 3, "bar3");
 +
 +    imm.delete(0);
 +
 +    ae(ski1, "r1", "foo:cq1", 3, "bar2");
 +    ae(ski1, "r1", "foo:cq1", 3, "bar1");
 +    assertFalse(ski1.hasTop());
 +
 +    ski1.close();
 +
 +    imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
 +
 +    mutate(imm, "r1", "foo:cq1", 3, "bar1");
 +    mutate(imm, "r1", "foo:cq2", 3, "bar2");
 +    mutate(imm, "r1", "foo:cq3", 3, "bar3");
 +
 +    ski1 = imm.skvIterator();
 +    ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(ski1, "r1", "foo:cq1", 3, "bar1");
 +
 +    imm.delete(0);
 +
 +    ae(ski1, "r1", "foo:cq2", 3, "bar2");
 +    ae(ski1, "r1", "foo:cq3", 3, "bar3");
 +    assertFalse(ski1.hasTop());
 +
 +    ski1.close();
 +  }
 +
 +  @Test
 +  public void test6() throws Exception {
 +    InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
 +
 +    mutate(imm, "r1", "foo:cq1", 3, "bar1");
 +    mutate(imm, "r1", "foo:cq2", 3, "bar2");
 +    mutate(imm, "r1", "foo:cq3", 3, "bar3");
 +    mutate(imm, "r1", "foo:cq4", 3, "bar4");
 +
 +    MemoryIterator ski1 = imm.skvIterator();
 +
 +    mutate(imm, "r1", "foo:cq5", 3, "bar5");
 +
 +    SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null);
 +
 +    ski1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(ski1, "r1", "foo:cq1", 3, "bar1");
 +
 +    dc.seek(new Range(nk("r1", "foo:cq2", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(dc, "r1", "foo:cq2", 3, "bar2");
 +
 +    imm.delete(0);
 +
 +    ae(ski1, "r1", "foo:cq2", 3, "bar2");
 +    ae(dc, "r1", "foo:cq3", 3, "bar3");
 +    ae(ski1, "r1", "foo:cq3", 3, "bar3");
 +    ae(dc, "r1", "foo:cq4", 3, "bar4");
 +    ae(ski1, "r1", "foo:cq4", 3, "bar4");
 +    assertFalse(ski1.hasTop());
 +    assertFalse(dc.hasTop());
 +
 +    ski1.seek(new Range(nk("r1", "foo:cq3", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
 +
 +    dc.seek(new Range(nk("r1", "foo:cq4", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(dc, "r1", "foo:cq4", 3, "bar4");
 +    assertFalse(dc.hasTop());
 +
 +    ae(ski1, "r1", "foo:cq3", 3, "bar3");
 +    ae(ski1, "r1", "foo:cq4", 3, "bar4");
 +    assertFalse(ski1.hasTop());
 +    assertFalse(dc.hasTop());
 +
 +    ski1.close();
 +  }
 +
++  private void deepCopyAndDelete(int interleaving) throws Exception {
++    // interleaving == 0 intentionally omitted, this runs the test w/o deleting in mem map
++
++    InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
++    
++    mutate(imm, "r1", "foo:cq1", 3, "bar1");
++    mutate(imm, "r1", "foo:cq2", 3, "bar2");
++    
++    MemoryIterator ski1 = imm.skvIterator();
++    
++    if (interleaving == 1)
++      imm.delete(0);
++    
++    SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null);
++
++    if (interleaving == 2)
++      imm.delete(0);
++
++    dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
++    ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
++
++    if (interleaving == 3)
++      imm.delete(0);
++
++    ae(dc, "r1", "foo:cq1", 3, "bar1");
++    ae(ski1, "r1", "foo:cq1", 3, "bar1");
++    dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
++
++    if (interleaving == 4)
++      imm.delete(0);
++
++    ae(ski1, "r1", "foo:cq2", 3, "bar2");
++    ae(dc, "r1", "foo:cq1", 3, "bar1");
++    ae(dc, "r1", "foo:cq2", 3, "bar2");
++    assertFalse(dc.hasTop());
++    assertFalse(ski1.hasTop());
++  }
++
++  @Test
++  public void testDeepCopyAndDelete() throws Exception {
++    for (int i = 0; i <= 4; i++)
++      deepCopyAndDelete(i);
++  }
++   
 +  @Test
 +  public void testBug1() throws Exception {
 +    InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
 +
 +    for (int i = 0; i < 20; i++) {
 +      mutate(imm, "r1", "foo:cq" + i, 3, "bar" + i);
 +    }
 +
 +    for (int i = 0; i < 20; i++) {
 +      mutate(imm, "r2", "foo:cq" + i, 3, "bar" + i);
 +    }
 +
 +    MemoryIterator ski1 = imm.skvIterator();
 +    ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(ski1);
 +
 +    imm.delete(0);
 +
 +    ArrayList<ByteSequence> columns = new ArrayList<ByteSequence>();
 +    columns.add(new ArrayByteSequence("bar"));
 +
 +    // this seek resulted in an infinite loop before a bug was fixed
 +    cfsi.seek(new Range("r1"), columns, true);
 +
 +    assertFalse(cfsi.hasTop());
 +
 +    ski1.close();
 +  }
 +
 +  @Test
 +  public void testSeekBackWards() throws Exception {
 +    InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
 +
 +    mutate(imm, "r1", "foo:cq1", 3, "bar1");
 +    mutate(imm, "r1", "foo:cq2", 3, "bar2");
 +    mutate(imm, "r1", "foo:cq3", 3, "bar3");
 +    mutate(imm, "r1", "foo:cq4", 3, "bar4");
 +
 +    MemoryIterator skvi1 = imm.skvIterator();
 +
 +    skvi1.seek(new Range(nk("r1", "foo:cq3", 3), null), LocalityGroupUtil.EMPTY_CF_SET,
false);
 +    ae(skvi1, "r1", "foo:cq3", 3, "bar3");
 +
 +    skvi1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET,
false);
 +    ae(skvi1, "r1", "foo:cq1", 3, "bar1");
 +
 +  }
 +
 +  @Test
 +  public void testDuplicateKey() throws Exception {
 +    InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
 +
 +    Mutation m = new Mutation(new Text("r1"));
 +    m.put(new Text("foo"), new Text("cq"), 3, new Value("v1".getBytes()));
 +    m.put(new Text("foo"), new Text("cq"), 3, new Value("v2".getBytes()));
 +    imm.mutate(Collections.singletonList(m));
 +
 +    MemoryIterator skvi1 = imm.skvIterator();
 +    skvi1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    ae(skvi1, "r1", "foo:cq", 3, "v2");
 +    ae(skvi1, "r1", "foo:cq", 3, "v1");
 +  }
 +
 +  private static final Logger log = Logger.getLogger(InMemoryMapTest.class);
 +
 +  static long sum(long[] counts) {
 +    long result = 0;
 +    for (int i = 0; i < counts.length; i++)
 +      result += counts[i];
 +    return result;
 +  }
 +
 +  // - hard to get this timing test to run well on apache build machines
 +  @Test
 +  @Ignore
 +  public void parallelWriteSpeed() throws InterruptedException, IOException {
 +    List<Double> timings = new ArrayList<Double>();
 +    for (int threads : new int[] {1, 2, 16, /* 64, 256 */}) {
 +      final long now = System.currentTimeMillis();
 +      final long counts[] = new long[threads];
 +      final InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
 +      ExecutorService e = Executors.newFixedThreadPool(threads);
 +      for (int j = 0; j < threads; j++) {
 +        final int threadId = j;
 +        e.execute(new Runnable() {
 +          @Override
 +          public void run() {
 +            while (System.currentTimeMillis() - now < 1000) {
 +              for (int k = 0; k < 1000; k++) {
 +                Mutation m = new Mutation("row");
 +                m.put("cf", "cq", new Value("v".getBytes()));
 +                List<Mutation> mutations = Collections.singletonList(m);
 +                imm.mutate(mutations);
 +                counts[threadId]++;
 +              }
 +            }
 +          }
 +        });
 +      }
 +      e.shutdown();
 +      e.awaitTermination(10, TimeUnit.SECONDS);
 +      imm.delete(10000);
 +      double mutationsPerSecond = sum(counts) / ((System.currentTimeMillis() - now) / 1000.);
 +      timings.add(mutationsPerSecond);
 +      log.info(String.format("%.1f mutations per second with %d threads", mutationsPerSecond,
threads));
 +    }
 +    // verify that more threads doesn't go a lot faster, or a lot slower than one thread
 +    for (int i = 0; i < timings.size(); i++) {
 +      double ratioFirst = timings.get(0) / timings.get(i);
 +      assertTrue(ratioFirst < 3);
 +      assertTrue(ratioFirst > 0.3);
 +    }
 +  }
 +
 +  @Test
 +  public void testLocalityGroups() throws Exception {
 +
 +    Map<String,Set<ByteSequence>> lggroups1 = new HashMap<String,Set<ByteSequence>>();
 +    lggroups1.put("lg1", newCFSet("cf1", "cf2"));
 +    lggroups1.put("lg2", newCFSet("cf3", "cf4"));
 +
 +    InMemoryMap imm = new InMemoryMap(lggroups1, false, tempFolder.newFolder().getAbsolutePath());
 +
 +    Mutation m1 = new Mutation("r1");
 +    m1.put("cf1", "x", 2, "1");
 +    m1.put("cf1", "y", 2, "2");
 +    m1.put("cf3", "z", 2, "3");
 +    m1.put("foo", "b", 2, "9");
 +
 +    Mutation m2 = new Mutation("r2");
 +    m2.put("cf2", "x", 3, "5");
 +
 +    Mutation m3 = new Mutation("r3");
 +    m3.put("foo", "b", 4, "6");
 +
 +    Mutation m4 = new Mutation("r4");
 +    m4.put("foo", "b", 5, "7");
 +    m4.put("cf4", "z", 5, "8");
 +
 +    Mutation m5 = new Mutation("r5");
 +    m5.put("cf3", "z", 6, "A");
 +    m5.put("cf4", "z", 6, "B");
 +
 +    imm.mutate(Arrays.asList(m1, m2, m3, m4, m5));
 +
 +    MemoryIterator iter1 = imm.skvIterator();
 +
 +    seekLocalityGroups(iter1);
 +    SortedKeyValueIterator<Key,Value> dc1 = iter1.deepCopy(null);
 +    seekLocalityGroups(dc1);
 +
 +    assertTrue(imm.getNumEntries() == 10);
 +    assertTrue(imm.estimatedSizeInBytes() > 0);
 +
 +    imm.delete(0);
 +
 +    seekLocalityGroups(iter1);
 +    seekLocalityGroups(dc1);
 +    // TODO uncomment following when ACCUMULO-1628 is fixed
 +    // seekLocalityGroups(iter1.deepCopy(null));
 +  }
 +
 +  private void seekLocalityGroups(SortedKeyValueIterator<Key,Value> iter1) throws
IOException {
 +    iter1.seek(new Range(), newCFSet("cf1"), true);
 +    ae(iter1, "r1", "cf1:x", 2, "1");
 +    ae(iter1, "r1", "cf1:y", 2, "2");
 +    ae(iter1, "r2", "cf2:x", 3, "5");
 +    assertFalse(iter1.hasTop());
 +
 +    iter1.seek(new Range("r2", "r4"), newCFSet("cf1"), true);
 +    ae(iter1, "r2", "cf2:x", 3, "5");
 +    assertFalse(iter1.hasTop());
 +
 +    iter1.seek(new Range(), newCFSet("cf3"), true);
 +    ae(iter1, "r1", "cf3:z", 2, "3");
 +    ae(iter1, "r4", "cf4:z", 5, "8");
 +    ae(iter1, "r5", "cf3:z", 6, "A");
 +    ae(iter1, "r5", "cf4:z", 6, "B");
 +    assertFalse(iter1.hasTop());
 +
 +    iter1.seek(new Range(), newCFSet("foo"), true);
 +    ae(iter1, "r1", "foo:b", 2, "9");
 +    ae(iter1, "r3", "foo:b", 4, "6");
 +    ae(iter1, "r4", "foo:b", 5, "7");
 +    assertFalse(iter1.hasTop());
 +
 +    iter1.seek(new Range(), newCFSet("cf1", "cf3"), true);
 +    ae(iter1, "r1", "cf1:x", 2, "1");
 +    ae(iter1, "r1", "cf1:y", 2, "2");
 +    ae(iter1, "r1", "cf3:z", 2, "3");
 +    ae(iter1, "r2", "cf2:x", 3, "5");
 +    ae(iter1, "r4", "cf4:z", 5, "8");
 +    ae(iter1, "r5", "cf3:z", 6, "A");
 +    ae(iter1, "r5", "cf4:z", 6, "B");
 +    assertFalse(iter1.hasTop());
 +
 +    iter1.seek(new Range("r2", "r4"), newCFSet("cf1", "cf3"), true);
 +    ae(iter1, "r2", "cf2:x", 3, "5");
 +    ae(iter1, "r4", "cf4:z", 5, "8");
 +    assertFalse(iter1.hasTop());
 +
 +    iter1.seek(new Range(), newCFSet("cf1", "cf3", "foo"), true);
 +    assertAll(iter1);
 +
 +    iter1.seek(new Range("r1", "r2"), newCFSet("cf1", "cf3", "foo"), true);
 +    ae(iter1, "r1", "cf1:x", 2, "1");
 +    ae(iter1, "r1", "cf1:y", 2, "2");
 +    ae(iter1, "r1", "cf3:z", 2, "3");
 +    ae(iter1, "r1", "foo:b", 2, "9");
 +    ae(iter1, "r2", "cf2:x", 3, "5");
 +    assertFalse(iter1.hasTop());
 +
 +    iter1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
 +    assertAll(iter1);
 +
 +    iter1.seek(new Range(), newCFSet("cf1"), false);
 +    assertAll(iter1);
 +
 +    iter1.seek(new Range(), newCFSet("cf1", "cf2"), false);
 +    ae(iter1, "r1", "cf3:z", 2, "3");
 +    ae(iter1, "r1", "foo:b", 2, "9");
 +    ae(iter1, "r3", "foo:b", 4, "6");
 +    ae(iter1, "r4", "cf4:z", 5, "8");
 +    ae(iter1, "r4", "foo:b", 5, "7");
 +    ae(iter1, "r5", "cf3:z", 6, "A");
 +    ae(iter1, "r5", "cf4:z", 6, "B");
 +    assertFalse(iter1.hasTop());
 +
 +    iter1.seek(new Range("r2"), newCFSet("cf1", "cf3", "foo"), true);
 +    ae(iter1, "r2", "cf2:x", 3, "5");
 +    assertFalse(iter1.hasTop());
 +  }
 +
 +  private void assertAll(SortedKeyValueIterator<Key,Value> iter1) throws IOException
{
 +    ae(iter1, "r1", "cf1:x", 2, "1");
 +    ae(iter1, "r1", "cf1:y", 2, "2");
 +    ae(iter1, "r1", "cf3:z", 2, "3");
 +    ae(iter1, "r1", "foo:b", 2, "9");
 +    ae(iter1, "r2", "cf2:x", 3, "5");
 +    ae(iter1, "r3", "foo:b", 4, "6");
 +    ae(iter1, "r4", "cf4:z", 5, "8");
 +    ae(iter1, "r4", "foo:b", 5, "7");
 +    ae(iter1, "r5", "cf3:z", 6, "A");
 +    ae(iter1, "r5", "cf4:z", 6, "B");
 +    assertFalse(iter1.hasTop());
 +  }
 +}


Mime
View raw message