Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9C9FD200CCE for ; Sun, 23 Jul 2017 21:25:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 99561164216; Sun, 23 Jul 2017 19:25:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6D96D164219 for ; Sun, 23 Jul 2017 21:25:10 +0200 (CEST) Received: (qmail 30883 invoked by uid 500); 23 Jul 2017 19:25:08 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 30794 invoked by uid 99); 23 Jul 2017 19:25:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 23 Jul 2017 19:25:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 18F8FE0896; Sun, 23 Jul 2017 19:25:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ibella@apache.org To: commits@accumulo.apache.org Date: Sun, 23 Jul 2017 19:25:07 -0000 Message-Id: In-Reply-To: <7cb25b3e59b946f6987d8293d70d1ef0@git.apache.org> References: <7cb25b3e59b946f6987d8293d70d1ef0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] accumulo git commit: ACCUMULO-4667 Inefficient LocalityGroupIterator archived-at: Sun, 23 Jul 2017 19:25:11 -0000 ACCUMULO-4667 Inefficient LocalityGroupIterator Reworked the LocalityGroupIterator to more efficiently determine which groups to search when seeking. Closes #275 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6c21a8f0 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6c21a8f0 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6c21a8f0 Branch: refs/heads/master Commit: 6c21a8f0ad342692ad010865db699f4bb985a91d Parents: ef78423 Author: Ivan Bella Authored: Sun Jul 23 15:23:38 2017 -0400 Committer: Ivan Bella Committed: Sun Jul 23 15:23:38 2017 -0400 ---------------------------------------------------------------------- .../apache/accumulo/core/file/rfile/RFile.java | 35 ++- .../iterators/system/LocalityGroupIterator.java | 232 +++++++++++++++---- .../accumulo/core/util/LocalityGroupUtil.java | 23 +- .../apache/accumulo/tserver/InMemoryMap.java | 5 +- 4 files changed, 219 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/6c21a8f0/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 2f08d24..4539392 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -67,6 +67,8 @@ import org.apache.accumulo.core.iterators.system.HeapIterator; import org.apache.accumulo.core.iterators.system.InterruptibleIterator; import org.apache.accumulo.core.iterators.system.LocalityGroupIterator; import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup; +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroupContext; +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroupSeekCache; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.MutableByteSequence; @@ -1050,16 +1052,16 @@ public class RFile { public static class Reader extends HeapIterator implements FileSKVIterator { - private BlockFileReader reader; - - private ArrayList localityGroups = new ArrayList<>(); - private ArrayList sampleGroups = new ArrayList<>(); + private final BlockFileReader reader; - private LocalityGroupReader currentReaders[]; - private LocalityGroupReader readers[]; - private LocalityGroupReader sampleReaders[]; + private final ArrayList localityGroups = new ArrayList<>(); + private final ArrayList sampleGroups = new ArrayList<>(); - private HashSet nonDefaultColumnFamilies; + private final LocalityGroupReader currentReaders[]; + private final LocalityGroupReader readers[]; + private final LocalityGroupReader sampleReaders[]; + private final LocalityGroupContext lgContext; + private LocalityGroupSeekCache lgCache; private List deepCopies; private boolean deepCopy = false; @@ -1120,11 +1122,7 @@ public class RFile { mb.close(); } - nonDefaultColumnFamilies = new HashSet<>(); - for (LocalityGroupMetadata lgm : localityGroups) { - if (!lgm.isDefaultLG) - nonDefaultColumnFamilies.addAll(lgm.columnFamilies.keySet()); - } + lgContext = new LocalityGroupContext(currentReaders); createHeap(currentReaders.length); } @@ -1132,7 +1130,6 @@ public class RFile { private Reader(Reader r, LocalityGroupReader sampleReaders[]) { super(sampleReaders.length); this.reader = r.reader; - this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies; this.currentReaders = new LocalityGroupReader[sampleReaders.length]; this.deepCopies = r.deepCopies; this.deepCopy = false; @@ -1144,12 +1141,12 @@ public class RFile { this.currentReaders[i] = sampleReaders[i]; this.currentReaders[i].setInterruptFlag(r.interruptFlag); } + this.lgContext = new LocalityGroupContext(currentReaders); } private Reader(Reader r, boolean useSample) { super(r.currentReaders.length); this.reader = r.reader; - this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies; this.currentReaders = new LocalityGroupReader[r.currentReaders.length]; this.deepCopies = r.deepCopies; this.deepCopy = true; @@ -1168,7 +1165,7 @@ public class RFile { } } - + this.lgContext = new LocalityGroupContext(currentReaders); } private void closeLocalityGroupReaders() { @@ -1324,8 +1321,6 @@ public class RFile { return cf; } - private int numLGSeeked = 0; - /** * Method that registers the given MetricsGatherer. You can only register one as it will clobber any previously set. The MetricsGatherer should be * registered before iterating through the LocalityGroups. @@ -1348,11 +1343,11 @@ public class RFile { @Override public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { - numLGSeeked = LocalityGroupIterator.seek(this, currentReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive); + lgCache = LocalityGroupIterator.seek(this, lgContext, range, columnFamilies, inclusive, lgCache); } int getNumLocalityGroupsSeeked() { - return numLGSeeked; + return (lgCache == null ? 0 : lgCache.getNumLGSeeked()); } public FileSKVIterator getIndex() throws IOException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6c21a8f0/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java index ac8355b..a1b4969 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/LocalityGroupIterator.java @@ -17,14 +17,18 @@ package org.apache.accumulo.core.iterators.system; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.ImmutableSet; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -65,14 +69,69 @@ public class LocalityGroupIterator extends HeapIterator implements Interruptible private InterruptibleIterator iterator; } - private LocalityGroup groups[]; - private Set nonDefaultColumnFamilies; + public static class LocalityGroupContext { + final List groups; + final LocalityGroup defaultGroup; + final Map groupByCf; + + public LocalityGroupContext(LocalityGroup[] groups) { + this.groups = Collections.unmodifiableList(Arrays.asList(groups)); + this.groupByCf = new HashMap(); + LocalityGroup foundDefault = null; + + for (LocalityGroup group : groups) { + if (group.isDefaultLocalityGroup && group.columnFamilies == null) { + if (foundDefault != null) { + throw new IllegalStateException("Found multiple default locality groups"); + } + foundDefault = group; + } else { + for (Entry entry : group.columnFamilies.entrySet()) { + if (entry.getValue().longValue() > 0) { + if (groupByCf.containsKey(entry.getKey())) { + throw new IllegalStateException("Found the same cf in multiple locality groups"); + } + groupByCf.put(entry.getKey(), group); + } + } + } + } + defaultGroup = foundDefault; + } + } + + /** + * This will cache the arguments used in the seek call along with the locality groups seeked. + */ + public static class LocalityGroupSeekCache { + private ImmutableSet lastColumnFamilies; + private volatile boolean lastInclusive; + private Collection lastUsed; + + public ImmutableSet getLastColumnFamilies() { + return lastColumnFamilies; + } + + public boolean isLastInclusive() { + return lastInclusive; + } + + public Collection getLastUsed() { + return lastUsed; + } + + public int getNumLGSeeked() { + return (lastUsed == null ? 0 : lastUsed.size()); + } + } + + private final LocalityGroupContext lgContext; + private LocalityGroupSeekCache lgCache; private AtomicBoolean interruptFlag; - public LocalityGroupIterator(LocalityGroup groups[], Set nonDefaultColumnFamilies) { + public LocalityGroupIterator(LocalityGroup groups[]) { super(groups.length); - this.groups = groups; - this.nonDefaultColumnFamilies = nonDefaultColumnFamilies; + this.lgContext = new LocalityGroupContext(groups); } @Override @@ -80,12 +139,28 @@ public class LocalityGroupIterator extends HeapIterator implements Interruptible throw new UnsupportedOperationException(); } - public static final int seek(HeapIterator hiter, LocalityGroup[] groups, Set nonDefaultColumnFamilies, Range range, - Collection columnFamilies, boolean inclusive) throws IOException { + /** + * This is the seek work horse for a HeapIterator with locality groups (uses by the InMemory and RFile mechanisms). This method will find the locality groups + * to use in the LocalityGroupContext, and will seek those groups. + * + * @param hiter + * The heap iterator + * @param lgContext + * The locality groups + * @param range + * The range to seek + * @param columnFamilies + * The column fams to seek + * @param inclusive + * The inclusiveness of the column fams + * @return The locality groups seeked + * @throws IOException + * thrown if an locality group seek fails + */ + static final Collection _seek(HeapIterator hiter, LocalityGroupContext lgContext, Range range, Collection columnFamilies, + boolean inclusive) throws IOException { hiter.clear(); - int numLGSeeked = 0; - Set cfSet; if (columnFamilies.size() > 0) if (columnFamilies instanceof Set) { @@ -97,75 +172,140 @@ public class LocalityGroupIterator extends HeapIterator implements Interruptible else cfSet = Collections.emptySet(); - for (LocalityGroup lgr : groups) { - // when include is set to true it means this locality groups contains - // wanted column families - boolean include = false; + // determine the set of groups to use + Collection groups = Collections.emptyList(); - if (cfSet.size() == 0) { - include = !inclusive; - } else if (lgr.isDefaultLocalityGroup && lgr.columnFamilies == null) { - // do not know what column families are in the default locality group, - // only know what column families are not in it + // if no column families specified, then include all groups unless !inclusive + if (cfSet.size() == 0) { + if (!inclusive) { + groups = lgContext.groups; + } + } else { + groups = new HashSet(); + // do not know what column families are in the default locality group, + // only know what column families are not in it + if (lgContext.defaultGroup != null) { if (inclusive) { - if (!nonDefaultColumnFamilies.containsAll(cfSet)) { + if (!lgContext.groupByCf.keySet().containsAll(cfSet)) { // default LG may contain wanted and unwanted column families - include = true; + groups.add(lgContext.defaultGroup); }// else - everything wanted is in other locality groups, so nothing to do } else { - // must include, if all excluded column families are in other locality groups - // then there are not unwanted column families in default LG - include = true; + // must include the default group as it may include cfs not in our cfSet + groups.add(lgContext.defaultGroup); + } + } + + /* + * Need to consider the following cases for inclusive and exclusive (lgcf:locality group column family set, cf:column family set) lgcf and cf are disjoint + * lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and cf intersect but neither is a subset of the other + */ + if (!inclusive) { + for (Entry entry : lgContext.groupByCf.entrySet()) { + if (!cfSet.contains(entry.getKey())) { + groups.add(entry.getValue()); + } + } + } else if (lgContext.groupByCf.size() <= cfSet.size()) { + for (Entry entry : lgContext.groupByCf.entrySet()) { + if (cfSet.contains(entry.getKey())) { + groups.add(entry.getValue()); + } } } else { - /* - * Need to consider the following cases for inclusive and exclusive (lgcf:locality group column family set, cf:column family set) lgcf and cf are - * disjoint lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and cf intersect but neither is a subset of the other - */ - - for (Entry entry : lgr.columnFamilies.entrySet()) - if (entry.getValue().longValue() > 0) - if (cfSet.contains(entry.getKey())) { - if (inclusive) - include = true; - } else if (!inclusive) { - include = true; - } + for (ByteSequence cf : cfSet) { + LocalityGroup group = lgContext.groupByCf.get(cf); + if (group != null) { + groups.add(group); + } + } + } + } + + for (LocalityGroup lgr : groups) { + lgr.getIterator().seek(range, EMPTY_CF_SET, false); + hiter.addSource(lgr.getIterator()); + } + + return groups; + } + + /** + * This seek method will reuse the supplied LocalityGroupSeekCache if it can. Otherwise it will delegate to the _seek method. + * + * @param hiter + * The heap iterator + * @param lgContext + * The locality groups + * @param range + * The range to seek + * @param columnFamilies + * The column fams to seek + * @param inclusive + * The inclusiveness of the column fams + * @param lgSeekCache + * A cache returned by the previous call to this method + * @return A cache for this seek call + * @throws IOException + * thrown if an locality group seek fails + */ + public static LocalityGroupSeekCache seek(HeapIterator hiter, LocalityGroupContext lgContext, Range range, Collection columnFamilies, + boolean inclusive, LocalityGroupSeekCache lgSeekCache) throws IOException { + if (lgSeekCache == null) + lgSeekCache = new LocalityGroupSeekCache(); + + // determine if the arguments have changed since the last time + boolean sameArgs = false; + ImmutableSet cfSet = null; + if (lgSeekCache.lastUsed != null && inclusive == lgSeekCache.lastInclusive) { + if (columnFamilies instanceof Set) { + sameArgs = lgSeekCache.lastColumnFamilies.equals(columnFamilies); + } else { + cfSet = ImmutableSet.copyOf(columnFamilies); + sameArgs = lgSeekCache.lastColumnFamilies.equals(cfSet); } + } - if (include) { + // if the column families and inclusiveness have not changed, then we can simply re-seek the + // locality groups we discovered last round and rebuild the heap. + if (sameArgs) { + hiter.clear(); + for (LocalityGroup lgr : lgSeekCache.lastUsed) { lgr.getIterator().seek(range, EMPTY_CF_SET, false); hiter.addSource(lgr.getIterator()); - numLGSeeked++; - }// every column family is excluded, zero count, or not present + } + } else { // otherwise capture the parameters, and use the static seek method to locate the locality groups to use. + lgSeekCache.lastColumnFamilies = (cfSet == null ? ImmutableSet.copyOf(columnFamilies) : cfSet); + lgSeekCache.lastInclusive = inclusive; + lgSeekCache.lastUsed = _seek(hiter, lgContext, range, columnFamilies, inclusive); } - return numLGSeeked; + return lgSeekCache; } @Override public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { - seek(this, groups, nonDefaultColumnFamilies, range, columnFamilies, inclusive); + lgCache = seek(this, lgContext, range, columnFamilies, inclusive, lgCache); } @Override public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - LocalityGroup[] groupsCopy = new LocalityGroup[groups.length]; + LocalityGroup[] groupsCopy = new LocalityGroup[lgContext.groups.size()]; - for (int i = 0; i < groups.length; i++) { - groupsCopy[i] = new LocalityGroup(groups[i], env); + for (int i = 0; i < lgContext.groups.size(); i++) { + groupsCopy[i] = new LocalityGroup(lgContext.groups.get(i), env); if (interruptFlag != null) groupsCopy[i].getIterator().setInterruptFlag(interruptFlag); } - return new LocalityGroupIterator(groupsCopy, nonDefaultColumnFamilies); + return new LocalityGroupIterator(groupsCopy); } @Override public void setInterruptFlag(AtomicBoolean flag) { this.interruptFlag = flag; - for (LocalityGroup lgr : groups) { + for (LocalityGroup lgr : lgContext.groups) { lgr.getIterator().setInterruptFlag(flag); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6c21a8f0/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java index 0c658d5..4fa47c3 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java @@ -29,6 +29,8 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSet.Builder; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -50,16 +52,25 @@ public class LocalityGroupUtil { // private static final Logger log = Logger.getLogger(ColumnFamilySet.class); - public static final Set EMPTY_CF_SET = Collections.emptySet(); + // using an ImmutableSet here for more efficient comparisons in LocalityGroupIterator + public static final ImmutableSet EMPTY_CF_SET = ImmutableSet.of(); - public static Set families(Collection columns) { + /** + * Create a set of families to be passed into the SortedKeyValueIterator seek call from a supplied set of columns. We are using the ImmutableSet to enable + * faster comparisons down in the LocalityGroupIterator. + * + * @param columns + * The set of columns + * @return An immutable set of columns + */ + public static ImmutableSet families(Collection columns) { if (columns.size() == 0) return EMPTY_CF_SET; - Set result = new HashSet<>(columns.size()); - for (Column col : columns) { - result.add(new ArrayByteSequence(col.getColumnFamily())); + Builder builder = ImmutableSet.builder(); + for (Column c : columns) { + builder.add(new ArrayByteSequence(c.getColumnFamily())); } - return result; + return builder.build(); } @SuppressWarnings("serial") http://git-wip-us.apache.org/repos/asf/accumulo/blob/6c21a8f0/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index 462174f..a5f2f97 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -291,13 +291,11 @@ public class InMemoryMap { private SimpleMap maps[]; private Partitioner partitioner; private PreAllocatedArray> partitioned; - private Set nonDefaultColumnFamilies; LocalityGroupMap(Map> groups, boolean useNativeMap) { this.groupFams = new PreAllocatedArray<>(groups.size()); this.maps = new SimpleMap[groups.size() + 1]; this.partitioned = new PreAllocatedArray<>(groups.size() + 1); - this.nonDefaultColumnFamilies = new HashSet<>(); for (int i = 0; i < maps.length; i++) { maps[i] = newMap(useNativeMap); @@ -309,7 +307,6 @@ public class InMemoryMap { for (ByteSequence bs : cfset) map.put(bs, new MutableLong(1)); this.groupFams.set(count++, map); - nonDefaultColumnFamilies.addAll(cfset); } partitioner = new LocalityGroupUtil.Partitioner(this.groupFams); @@ -350,7 +347,7 @@ public class InMemoryMap { groups[i] = new LocalityGroup(maps[i].skvIterator(null), null, true); } - return new LocalityGroupIterator(groups, nonDefaultColumnFamilies); + return new LocalityGroupIterator(groups); } @Override