accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibe...@apache.org
Subject [1/4] accumulo git commit: ACCUMULO-4667 Inefficient LocalityGroupIterator
Date Sun, 23 Jul 2017 19:25:06 GMT
Repository: accumulo
Updated Branches:
  refs/heads/1.8 ef78423e2 -> 6c21a8f0a
  refs/heads/master 66d8956bf -> 867c2de35


ACCUMULO-4667 Inefficient LocalityGroupIterator

Reworked the LocalityGroupIterator to more efficiently determine which groups to search when
seeking.

Closes #275


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4a40b88f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4a40b88f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4a40b88f

Branch: refs/heads/master
Commit: 4a40b88f6c7e4c59796b84212bf1fff53bc95224
Parents: 66d8956
Author: Ivan Bella <ivan@bella.name>
Authored: Wed Jun 28 10:00:27 2017 -0400
Committer: Ivan Bella <ivan@bella.name>
Committed: Sun Jul 23 14:01:31 2017 -0400

----------------------------------------------------------------------
 .../apache/accumulo/core/file/rfile/RFile.java  |  35 ++-
 .../iterators/system/LocalityGroupIterator.java | 232 +++++++++++++++----
 .../accumulo/core/util/LocalityGroupUtil.java   |  23 +-
 .../apache/accumulo/tserver/InMemoryMap.java    |   5 +-
 4 files changed, 218 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a40b88f/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 2e26fb0..c1931da 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -67,6 +67,8 @@ import org.apache.accumulo.core.iterators.system.HeapIterator;
 import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
 import org.apache.accumulo.core.iterators.system.LocalityGroupIterator;
 import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroupContext;
+import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroupSeekCache;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.util.MutableByteSequence;
@@ -1050,16 +1052,16 @@ public class RFile {
 
   public static class Reader extends HeapIterator implements FileSKVIterator {
 
-    private BlockFileReader reader;
-
-    private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<>();
-    private ArrayList<LocalityGroupMetadata> sampleGroups = new ArrayList<>();
+    private final BlockFileReader reader;
 
-    private LocalityGroupReader currentReaders[];
-    private LocalityGroupReader readers[];
-    private LocalityGroupReader sampleReaders[];
+    private final ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<>();
+    private final ArrayList<LocalityGroupMetadata> sampleGroups = new ArrayList<>();
 
-    private HashSet<ByteSequence> nonDefaultColumnFamilies;
+    private final LocalityGroupReader currentReaders[];
+    private final LocalityGroupReader readers[];
+    private final LocalityGroupReader sampleReaders[];
+    private final LocalityGroupContext lgContext;
+    private LocalityGroupSeekCache lgCache;
 
     private List<Reader> deepCopies;
     private boolean deepCopy = false;
@@ -1120,11 +1122,7 @@ public class RFile {
         mb.close();
       }
 
-      nonDefaultColumnFamilies = new HashSet<>();
-      for (LocalityGroupMetadata lgm : localityGroups) {
-        if (!lgm.isDefaultLG)
-          nonDefaultColumnFamilies.addAll(lgm.columnFamilies.keySet());
-      }
+      lgContext = new LocalityGroupContext(currentReaders);
 
       createHeap(currentReaders.length);
     }
@@ -1132,7 +1130,6 @@ public class RFile {
     private Reader(Reader r, LocalityGroupReader sampleReaders[]) {
       super(sampleReaders.length);
       this.reader = r.reader;
-      this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies;
       this.currentReaders = new LocalityGroupReader[sampleReaders.length];
       this.deepCopies = r.deepCopies;
       this.deepCopy = false;
@@ -1144,12 +1141,12 @@ public class RFile {
         this.currentReaders[i] = sampleReaders[i];
         this.currentReaders[i].setInterruptFlag(r.interruptFlag);
       }
+      this.lgContext = new LocalityGroupContext(currentReaders);
     }
 
     private Reader(Reader r, boolean useSample) {
       super(r.currentReaders.length);
       this.reader = r.reader;
-      this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies;
       this.currentReaders = new LocalityGroupReader[r.currentReaders.length];
       this.deepCopies = r.deepCopies;
       this.deepCopy = true;
@@ -1168,7 +1165,7 @@ public class RFile {
         }
 
       }
-
+      this.lgContext = new LocalityGroupContext(currentReaders);
     }
 
     private void closeLocalityGroupReaders() {
@@ -1324,8 +1321,6 @@ public class RFile {
       return cf;
     }
 
-    private int numLGSeeked = 0;
-
     /**
      * Method that registers the given MetricsGatherer. You can only register one as it will
clobber any previously set. The MetricsGatherer should be
      * registered before iterating through the LocalityGroups.
@@ -1348,11 +1343,11 @@ public class RFile {
 
     @Override
     public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean
inclusive) throws IOException {
-      numLGSeeked = LocalityGroupIterator.seek(this, currentReaders, nonDefaultColumnFamilies,
range, columnFamilies, inclusive);
+      lgCache = LocalityGroupIterator.seek(this, lgContext, range, columnFamilies, inclusive,
lgCache);
     }
 
     int getNumLocalityGroupsSeeked() {
-      return numLGSeeked;
+      return (lgCache == null ? 0 : lgCache.getNumLGSeeked());
     }
 
     public FileSKVIterator getIndex() throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a40b88f/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
b/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
index ac8355b..a1b4969 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java
@@ -17,14 +17,18 @@
 package org.apache.accumulo.core.iterators.system;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
@@ -65,14 +69,69 @@ public class LocalityGroupIterator extends HeapIterator implements Interruptible
     private InterruptibleIterator iterator;
   }
 
-  private LocalityGroup groups[];
-  private Set<ByteSequence> nonDefaultColumnFamilies;
+  public static class LocalityGroupContext {
+    final List<LocalityGroup> groups;
+    final LocalityGroup defaultGroup;
+    final Map<ByteSequence,LocalityGroup> groupByCf;
+
+    public LocalityGroupContext(LocalityGroup[] groups) {
+      this.groups = Collections.unmodifiableList(Arrays.asList(groups));
+      this.groupByCf = new HashMap<ByteSequence,LocalityGroup>();
+      LocalityGroup foundDefault = null;
+
+      for (LocalityGroup group : groups) {
+        if (group.isDefaultLocalityGroup && group.columnFamilies == null) {
+          if (foundDefault != null) {
+            throw new IllegalStateException("Found multiple default locality groups");
+          }
+          foundDefault = group;
+        } else {
+          for (Entry<ByteSequence,MutableLong> entry : group.columnFamilies.entrySet())
{
+            if (entry.getValue().longValue() > 0) {
+              if (groupByCf.containsKey(entry.getKey())) {
+                throw new IllegalStateException("Found the same cf in multiple locality groups");
+              }
+              groupByCf.put(entry.getKey(), group);
+            }
+          }
+        }
+      }
+      defaultGroup = foundDefault;
+    }
+  }
+
+  /**
+   * This will cache the arguments used in the seek call along with the locality groups seeked.
+   */
+  public static class LocalityGroupSeekCache {
+    private ImmutableSet<ByteSequence> lastColumnFamilies;
+    private volatile boolean lastInclusive;
+    private Collection<LocalityGroup> lastUsed;
+
+    public ImmutableSet<ByteSequence> getLastColumnFamilies() {
+      return lastColumnFamilies;
+    }
+
+    public boolean isLastInclusive() {
+      return lastInclusive;
+    }
+
+    public Collection<LocalityGroup> getLastUsed() {
+      return lastUsed;
+    }
+
+    public int getNumLGSeeked() {
+      return (lastUsed == null ? 0 : lastUsed.size());
+    }
+  }
+
+  private final LocalityGroupContext lgContext;
+  private LocalityGroupSeekCache lgCache;
   private AtomicBoolean interruptFlag;
 
-  public LocalityGroupIterator(LocalityGroup groups[], Set<ByteSequence> nonDefaultColumnFamilies)
{
+  public LocalityGroupIterator(LocalityGroup groups[]) {
     super(groups.length);
-    this.groups = groups;
-    this.nonDefaultColumnFamilies = nonDefaultColumnFamilies;
+    this.lgContext = new LocalityGroupContext(groups);
   }
 
   @Override
@@ -80,12 +139,28 @@ public class LocalityGroupIterator extends HeapIterator implements Interruptible
     throw new UnsupportedOperationException();
   }
 
-  public static final int seek(HeapIterator hiter, LocalityGroup[] groups, Set<ByteSequence>
nonDefaultColumnFamilies, Range range,
-      Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException
{
+  /**
+   * This is the seek work horse for a HeapIterator with locality groups (uses by the InMemory
and RFile mechanisms). This method will find the locality groups
+   * to use in the LocalityGroupContext, and will seek those groups.
+   *
+   * @param hiter
+   *          The heap iterator
+   * @param lgContext
+   *          The locality groups
+   * @param range
+   *          The range to seek
+   * @param columnFamilies
+   *          The column fams to seek
+   * @param inclusive
+   *          The inclusiveness of the column fams
+   * @return The locality groups seeked
+   * @throws IOException
+   *           thrown if an locality group seek fails
+   */
+  static final Collection<LocalityGroup> _seek(HeapIterator hiter, LocalityGroupContext
lgContext, Range range, Collection<ByteSequence> columnFamilies,
+      boolean inclusive) throws IOException {
     hiter.clear();
 
-    int numLGSeeked = 0;
-
     Set<ByteSequence> cfSet;
     if (columnFamilies.size() > 0)
       if (columnFamilies instanceof Set<?>) {
@@ -97,75 +172,140 @@ public class LocalityGroupIterator extends HeapIterator implements Interruptible
     else
       cfSet = Collections.emptySet();
 
-    for (LocalityGroup lgr : groups) {
-      // when include is set to true it means this locality groups contains
-      // wanted column families
-      boolean include = false;
+    // determine the set of groups to use
+    Collection<LocalityGroup> groups = Collections.emptyList();
 
-      if (cfSet.size() == 0) {
-        include = !inclusive;
-      } else if (lgr.isDefaultLocalityGroup && lgr.columnFamilies == null) {
-        // do not know what column families are in the default locality group,
-        // only know what column families are not in it
+    // if no column families specified, then include all groups unless !inclusive
+    if (cfSet.size() == 0) {
+      if (!inclusive) {
+        groups = lgContext.groups;
+      }
+    } else {
+      groups = new HashSet<LocalityGroup>();
 
+      // do not know what column families are in the default locality group,
+      // only know what column families are not in it
+      if (lgContext.defaultGroup != null) {
         if (inclusive) {
-          if (!nonDefaultColumnFamilies.containsAll(cfSet)) {
+          if (!lgContext.groupByCf.keySet().containsAll(cfSet)) {
             // default LG may contain wanted and unwanted column families
-            include = true;
+            groups.add(lgContext.defaultGroup);
           }// else - everything wanted is in other locality groups, so nothing to do
         } else {
-          // must include, if all excluded column families are in other locality groups
-          // then there are not unwanted column families in default LG
-          include = true;
+          // must include the default group as it may include cfs not in our cfSet
+          groups.add(lgContext.defaultGroup);
+        }
+      }
+
+      /*
+       * Need to consider the following cases for inclusive and exclusive (lgcf:locality
group column family set, cf:column family set) lgcf and cf are disjoint
+       * lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and cf intersect
but neither is a subset of the other
+       */
+      if (!inclusive) {
+        for (Entry<ByteSequence,LocalityGroup> entry : lgContext.groupByCf.entrySet())
{
+          if (!cfSet.contains(entry.getKey())) {
+            groups.add(entry.getValue());
+          }
+        }
+      } else if (lgContext.groupByCf.size() <= cfSet.size()) {
+        for (Entry<ByteSequence,LocalityGroup> entry : lgContext.groupByCf.entrySet())
{
+          if (cfSet.contains(entry.getKey())) {
+            groups.add(entry.getValue());
+          }
         }
       } else {
-        /*
-         * Need to consider the following cases for inclusive and exclusive (lgcf:locality
group column family set, cf:column family set) lgcf and cf are
-         * disjoint lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and
cf intersect but neither is a subset of the other
-         */
-
-        for (Entry<ByteSequence,MutableLong> entry : lgr.columnFamilies.entrySet())
-          if (entry.getValue().longValue() > 0)
-            if (cfSet.contains(entry.getKey())) {
-              if (inclusive)
-                include = true;
-            } else if (!inclusive) {
-              include = true;
-            }
+        for (ByteSequence cf : cfSet) {
+          LocalityGroup group = lgContext.groupByCf.get(cf);
+          if (group != null) {
+            groups.add(group);
+          }
+        }
+      }
+    }
+
+    for (LocalityGroup lgr : groups) {
+      lgr.getIterator().seek(range, EMPTY_CF_SET, false);
+      hiter.addSource(lgr.getIterator());
+    }
+
+    return groups;
+  }
+
+  /**
+   * This seek method will reuse the supplied LocalityGroupSeekCache if it can. Otherwise
it will delegate to the _seek method.
+   *
+   * @param hiter
+   *          The heap iterator
+   * @param lgContext
+   *          The locality groups
+   * @param range
+   *          The range to seek
+   * @param columnFamilies
+   *          The column fams to seek
+   * @param inclusive
+   *          The inclusiveness of the column fams
+   * @param lgSeekCache
+   *          A cache returned by the previous call to this method
+   * @return A cache for this seek call
+   * @throws IOException
+   *           thrown if an locality group seek fails
+   */
+  public static LocalityGroupSeekCache seek(HeapIterator hiter, LocalityGroupContext lgContext,
Range range, Collection<ByteSequence> columnFamilies,
+      boolean inclusive, LocalityGroupSeekCache lgSeekCache) throws IOException {
+    if (lgSeekCache == null)
+      lgSeekCache = new LocalityGroupSeekCache();
+
+    // determine if the arguments have changed since the last time
+    boolean sameArgs = false;
+    ImmutableSet<ByteSequence> cfSet = null;
+    if (lgSeekCache.lastUsed != null && inclusive == lgSeekCache.lastInclusive) {
+      if (columnFamilies instanceof Set) {
+        sameArgs = lgSeekCache.lastColumnFamilies.equals(columnFamilies);
+      } else {
+        cfSet = ImmutableSet.copyOf(columnFamilies);
+        sameArgs = lgSeekCache.lastColumnFamilies.equals(cfSet);
       }
+    }
 
-      if (include) {
+    // if the column families and inclusiveness have not changed, then we can simply re-seek
the
+    // locality groups we discovered last round and rebuild the heap.
+    if (sameArgs) {
+      hiter.clear();
+      for (LocalityGroup lgr : lgSeekCache.lastUsed) {
         lgr.getIterator().seek(range, EMPTY_CF_SET, false);
         hiter.addSource(lgr.getIterator());
-        numLGSeeked++;
-      }// every column family is excluded, zero count, or not present
+      }
+    } else { // otherwise capture the parameters, and use the static seek method to locate
the locality groups to use.
+      lgSeekCache.lastColumnFamilies = (cfSet == null ? ImmutableSet.copyOf(columnFamilies)
: cfSet);
+      lgSeekCache.lastInclusive = inclusive;
+      lgSeekCache.lastUsed = _seek(hiter, lgContext, range, columnFamilies, inclusive);
     }
 
-    return numLGSeeked;
+    return lgSeekCache;
   }
 
   @Override
   public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
-    seek(this, groups, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
+    lgCache = seek(this, lgContext, range, columnFamilies, inclusive, lgCache);
   }
 
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
-    LocalityGroup[] groupsCopy = new LocalityGroup[groups.length];
+    LocalityGroup[] groupsCopy = new LocalityGroup[lgContext.groups.size()];
 
-    for (int i = 0; i < groups.length; i++) {
-      groupsCopy[i] = new LocalityGroup(groups[i], env);
+    for (int i = 0; i < lgContext.groups.size(); i++) {
+      groupsCopy[i] = new LocalityGroup(lgContext.groups.get(i), env);
       if (interruptFlag != null)
         groupsCopy[i].getIterator().setInterruptFlag(interruptFlag);
     }
 
-    return new LocalityGroupIterator(groupsCopy, nonDefaultColumnFamilies);
+    return new LocalityGroupIterator(groupsCopy);
   }
 
   @Override
   public void setInterruptFlag(AtomicBoolean flag) {
     this.interruptFlag = flag;
-    for (LocalityGroup lgr : groups) {
+    for (LocalityGroup lgr : lgContext.groups) {
       lgr.getIterator().setInterruptFlag(flag);
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a40b88f/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
index 0c658d5..422fc4a 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
@@ -29,6 +29,8 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -50,16 +52,23 @@ public class LocalityGroupUtil {
 
   // private static final Logger log = Logger.getLogger(ColumnFamilySet.class);
 
-  public static final Set<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
+  // using an ImmutableSet here for more efficient comparisons in LocalityGroupIterator
+  public static final ImmutableSet<ByteSequence> EMPTY_CF_SET = ImmutableSet.of();
 
-  public static Set<ByteSequence> families(Collection<Column> columns) {
+  /**
+   * Create a set of families to be passed into the SortedKeyValueIterator seek call from
a supplied set of columns. We are using the ImmutableSet to enable
+   * faster comparisons down in the LocalityGroupIterator.
+   *
+   * @param columns
+   *          The set of columns
+   * @return An immutable set of columns
+   */
+  public static ImmutableSet<ByteSequence> families(Collection<Column> columns)
{
     if (columns.size() == 0)
       return EMPTY_CF_SET;
-    Set<ByteSequence> result = new HashSet<>(columns.size());
-    for (Column col : columns) {
-      result.add(new ArrayByteSequence(col.getColumnFamily()));
-    }
-    return result;
+    Builder<ByteSequence> builder = ImmutableSet.builder();
+    columns.forEach(c -> builder.add(new ArrayByteSequence(c.getColumnFamily())));
+    return builder.build();
   }
 
   @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a40b88f/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index 29d5ca0..293b274 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -290,13 +290,11 @@ public class InMemoryMap {
     private SimpleMap maps[];
     private Partitioner partitioner;
     private PreAllocatedArray<List<Mutation>> partitioned;
-    private Set<ByteSequence> nonDefaultColumnFamilies;
 
     LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap)
{
       this.groupFams = new PreAllocatedArray<>(groups.size());
       this.maps = new SimpleMap[groups.size() + 1];
       this.partitioned = new PreAllocatedArray<>(groups.size() + 1);
-      this.nonDefaultColumnFamilies = new HashSet<>();
 
       for (int i = 0; i < maps.length; i++) {
         maps[i] = newMap(useNativeMap);
@@ -308,7 +306,6 @@ public class InMemoryMap {
         for (ByteSequence bs : cfset)
           map.put(bs, new MutableLong(1));
         this.groupFams.set(count++, map);
-        nonDefaultColumnFamilies.addAll(cfset);
       }
 
       partitioner = new LocalityGroupUtil.Partitioner(this.groupFams);
@@ -349,7 +346,7 @@ public class InMemoryMap {
           groups[i] = new LocalityGroup(maps[i].skvIterator(null), null, true);
       }
 
-      return new LocalityGroupIterator(groups, nonDefaultColumnFamilies);
+      return new LocalityGroupIterator(groups);
     }
 
     @Override


Mime
View raw message