Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9EC911830C for ; Mon, 21 Sep 2015 13:51:49 +0000 (UTC) Received: (qmail 87970 invoked by uid 500); 21 Sep 2015 13:51:27 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 87873 invoked by uid 500); 21 Sep 2015 13:51:27 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 87812 invoked by uid 99); 21 Sep 2015 13:51:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Sep 2015 13:51:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F2C86E0332; Mon, 21 Sep 2015 13:51:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Mon, 21 Sep 2015 13:51:27 -0000 Message-Id: <6f02d1d7c4934b7bb55464df6598bfd8@git.apache.org> In-Reply-To: <2f4784ce17f14c0a9e4b92c0d1b54c3d@git.apache.org> References: <2f4784ce17f14c0a9e4b92c0d1b54c3d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/7] accumulo git commit: ACCUMULO-3913 Added per table sampling http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java index 1c4676e..2227b25 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -43,6 +44,7 @@ import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; import org.apache.accumulo.core.iterators.system.TimeSettingIterator; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; @@ -458,7 +460,6 @@ public class FileManager { this.iflag = flag; ((InterruptibleIterator) this.iter).setInterruptFlag(iflag); } - } public class ScanFileManager { @@ -502,7 +503,8 @@ public class FileManager { return newlyReservedReaders; } - public synchronized List openFiles(Map files, boolean detachable) throws IOException { + public synchronized List openFiles(Map files, boolean detachable, SamplerConfigurationImpl samplerConfig) + throws IOException { List newlyReservedReaders = openFileRefs(files.keySet()); @@ -511,13 +513,22 @@ public class FileManager { for (FileSKVIterator reader : newlyReservedReaders) { String filename = getReservedReadeFilename(reader); InterruptibleIterator iter; + + FileSKVIterator source = reader; + if (samplerConfig != null) { + source = source.getSample(samplerConfig); + if (source == null) { + throw new SampleNotPresentException(); + } + } + if (detachable) { - FileDataSource fds = new FileDataSource(filename, reader); + FileDataSource fds = new FileDataSource(filename, source); dataSources.add(fds); SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds); iter = new ProblemReportingIterator(context, tablet.getTableId().toString(), filename, continueOnFailure, ssi); } else { - iter = new ProblemReportingIterator(context, tablet.getTableId().toString(), filename, continueOnFailure, reader); + iter = new ProblemReportingIterator(context, tablet.getTableId().toString(), filename, continueOnFailure, source); } DataFileValue value = files.get(new FileRef(filename)); if (value.isTimeSet()) { @@ -539,7 +550,7 @@ public class FileManager { fds.unsetIterator(); } - public synchronized void reattach() throws IOException { + public synchronized void reattach(SamplerConfigurationImpl samplerConfig) throws IOException { if (tabletReservedReaders.size() != 0) throw new IllegalStateException(); @@ -562,7 +573,14 @@ public class FileManager { for (FileDataSource fds : dataSources) { FileSKVIterator reader = map.get(fds.file).remove(0); - fds.setIterator(reader); + FileSKVIterator source = reader; + if (samplerConfig != null) { + source = source.getSample(samplerConfig); + if (source == null) { + throw new SampleNotPresentException(); + } + } + fds.setIterator(source); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index 2274eea..f5141ff 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.tserver; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -33,8 +35,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.ByteSequence; @@ -51,15 +56,20 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.SortedMapIterator; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.iterators.system.EmptyIterator; import org.apache.accumulo.core.iterators.system.InterruptibleIterator; import org.apache.accumulo.core.iterators.system.LocalityGroupIterator; import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup; import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; +import org.apache.accumulo.core.sample.Sampler; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.sample.impl.SamplerFactory; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.PreAllocatedArray; import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; @@ -68,7 +78,8 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; public class InMemoryMap { private SimpleMap map = null; @@ -80,22 +91,58 @@ public class InMemoryMap { private Map> lggroups; - public InMemoryMap(boolean useNativeMap, String memDumpDir) { - this(new HashMap>(), useNativeMap, memDumpDir); + private static Pair getSampler(AccumuloConfiguration config) { + try { + SamplerConfigurationImpl sampleConfig = SamplerConfigurationImpl.newSamplerConfig(config); + if (sampleConfig == null) { + return new Pair<>(null, null); + } + + return new Pair<>(sampleConfig, SamplerFactory.newSampler(sampleConfig, config)); + } catch (IOException e) { + throw new RuntimeException(e); + } } - public InMemoryMap(Map> lggroups, boolean useNativeMap, String memDumpDir) { - this.memDumpDir = memDumpDir; - this.lggroups = lggroups; + private AtomicReference> samplerRef = new AtomicReference<>(null); - if (lggroups.size() == 0) - map = newMap(useNativeMap); - else - map = new LocalityGroupMap(lggroups, useNativeMap); + private AccumuloConfiguration config; + + // defer creating sampler until first write. This was done because an empty sample map configured with no sampler will not flush after a user changes sample + // config. + private Sampler getOrCreateSampler() { + Pair pair = samplerRef.get(); + if (pair == null) { + pair = getSampler(config); + if (!samplerRef.compareAndSet(null, pair)) { + pair = samplerRef.get(); + } + } + + return pair.getSecond(); } public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError { - this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR)); + + boolean useNativeMap = config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED); + + this.memDumpDir = config.get(Property.TSERV_MEMDUMP_DIR); + this.lggroups = LocalityGroupUtil.getLocalityGroups(config); + + this.config = config; + + SimpleMap allMap; + SimpleMap sampleMap; + + if (lggroups.size() == 0) { + allMap = newMap(useNativeMap); + sampleMap = newMap(useNativeMap); + } else { + allMap = new LocalityGroupMap(lggroups, useNativeMap); + sampleMap = new LocalityGroupMap(lggroups, useNativeMap); + } + + map = new SampleMap(allMap, sampleMap); } private static SimpleMap newMap(boolean useNativeMap) { @@ -117,7 +164,7 @@ public class InMemoryMap { int size(); - InterruptibleIterator skvIterator(); + InterruptibleIterator skvIterator(SamplerConfigurationImpl samplerConfig); void delete(); @@ -126,6 +173,95 @@ public class InMemoryMap { void mutate(List mutations, int kvCount); } + private class SampleMap implements SimpleMap { + + private SimpleMap map; + private SimpleMap sample; + + public SampleMap(SimpleMap map, SimpleMap sampleMap) { + this.map = map; + this.sample = sampleMap; + } + + @Override + public Value get(Key key) { + return map.get(key); + } + + @Override + public Iterator> iterator(Key startKey) { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + return map.size(); + } + + @Override + public InterruptibleIterator skvIterator(SamplerConfigurationImpl samplerConfig) { + if (samplerConfig == null) + return map.skvIterator(null); + else { + Pair samplerAndConf = samplerRef.get(); + if (samplerAndConf == null) { + return EmptyIterator.EMPTY_ITERATOR; + } else if (samplerAndConf.getFirst() != null && samplerAndConf.getFirst().equals(samplerConfig)) { + return sample.skvIterator(null); + } else { + throw new SampleNotPresentException(); + } + } + } + + @Override + public void delete() { + map.delete(); + sample.delete(); + } + + @Override + public long getMemoryUsed() { + return map.getMemoryUsed() + sample.getMemoryUsed(); + } + + @Override + public void mutate(List mutations, int kvCount) { + map.mutate(mutations, kvCount); + + Sampler sampler = getOrCreateSampler(); + if (sampler != null) { + List sampleMutations = null; + + for (Mutation m : mutations) { + List colUpdates = m.getUpdates(); + List sampleColUpdates = null; + for (ColumnUpdate cvp : colUpdates) { + Key k = new Key(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(), false); + if (sampler.accept(k)) { + if (sampleColUpdates == null) { + sampleColUpdates = new ArrayList<>(); + } + sampleColUpdates.add(cvp); + } + } + + if (sampleColUpdates != null) { + if (sampleMutations == null) { + sampleMutations = new ArrayList<>(); + } + + sampleMutations.add(new LocalityGroupUtil.PartitionedMutation(m.getRow(), sampleColUpdates)); + } + } + + if (sampleMutations != null) { + sample.mutate(sampleMutations, kvCount); + } + } + } + } + private static class LocalityGroupMap implements SimpleMap { private PreAllocatedArray> groupFams; @@ -181,13 +317,16 @@ public class InMemoryMap { } @Override - public InterruptibleIterator skvIterator() { + public InterruptibleIterator skvIterator(SamplerConfigurationImpl samplerConfig) { + if (samplerConfig != null) + throw new SampleNotPresentException(); + LocalityGroup groups[] = new LocalityGroup[maps.length]; for (int i = 0; i < groups.length; i++) { if (i < groupFams.length) - groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams.get(i), false); + groups[i] = new LocalityGroup(maps[i].skvIterator(null), groupFams.get(i), false); else - groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true); + groups[i] = new LocalityGroup(maps[i].skvIterator(null), null, true); } return new LocalityGroupIterator(groups, nonDefaultColumnFamilies); @@ -264,7 +403,9 @@ public class InMemoryMap { } @Override - public synchronized InterruptibleIterator skvIterator() { + public InterruptibleIterator skvIterator(SamplerConfigurationImpl samplerConfig) { + if (samplerConfig != null) + throw new SampleNotPresentException(); if (map == null) throw new IllegalStateException(); @@ -327,7 +468,9 @@ public class InMemoryMap { } @Override - public InterruptibleIterator skvIterator() { + public InterruptibleIterator skvIterator(SamplerConfigurationImpl samplerConfig) { + if (samplerConfig != null) + throw new SampleNotPresentException(); return (InterruptibleIterator) nativeMap.skvIterator(); } @@ -410,16 +553,30 @@ public class InMemoryMap { private MemoryDataSource parent; private IteratorEnvironment env; private AtomicBoolean iflag; + private SamplerConfigurationImpl iteratorSamplerConfig; + + private SamplerConfigurationImpl getSamplerConfig() { + if (env != null) { + if (env.isSamplingEnabled()) { + return new SamplerConfigurationImpl(env.getSamplerConfiguration()); + } else { + return null; + } + } else { + return iteratorSamplerConfig; + } + } - MemoryDataSource() { - this(null, false, null, null); + MemoryDataSource(SamplerConfigurationImpl samplerConfig) { + this(null, false, null, null, samplerConfig); } - public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env, AtomicBoolean iflag) { + public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env, AtomicBoolean iflag, SamplerConfigurationImpl samplerConfig) { this.parent = parent; this.switched = switched; this.env = env; this.iflag = iflag; + this.iteratorSamplerConfig = samplerConfig; } @Override @@ -457,6 +614,10 @@ public class InMemoryMap { reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, SiteConfiguration.getInstance()); if (iflag != null) reader.setInterruptFlag(iflag); + + if (getSamplerConfig() != null) { + reader = reader.getSample(getSamplerConfig()); + } } return reader; @@ -466,7 +627,7 @@ public class InMemoryMap { public SortedKeyValueIterator iterator() throws IOException { if (iter == null) if (!switched) { - iter = map.skvIterator(); + iter = map.skvIterator(getSamplerConfig()); if (iflag != null) iter.setInterruptFlag(iflag); } else { @@ -485,7 +646,7 @@ public class InMemoryMap { @Override public DataSource getDeepCopyDataSource(IteratorEnvironment env) { - return new MemoryDataSource(parent == null ? this : parent, switched, env, iflag); + return new MemoryDataSource(parent == null ? this : parent, switched, env, iflag, iteratorSamplerConfig); } @Override @@ -562,7 +723,7 @@ public class InMemoryMap { } - public synchronized MemoryIterator skvIterator() { + public synchronized MemoryIterator skvIterator(SamplerConfigurationImpl iteratorSamplerConfig) { if (map == null) throw new NullPointerException(); @@ -570,8 +731,9 @@ public class InMemoryMap { throw new IllegalStateException("Can not obtain iterator after map deleted"); int mc = kvCount.get(); - MemoryDataSource mds = new MemoryDataSource(); - SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource()); + MemoryDataSource mds = new MemoryDataSource(iteratorSamplerConfig); + // TODO seems like a bug that two MemoryDataSources are created... may need to fix in older branches + SourceSwitchingIterator ssi = new SourceSwitchingIterator(mds); MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc)); mi.setSSI(ssi); mi.setMDS(mds); @@ -584,7 +746,7 @@ public class InMemoryMap { if (nextKVCount.get() - 1 != kvCount.get()) throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = " + kvCount.get()); - return map.skvIterator(); + return map.skvIterator(null); } private boolean deleted = false; @@ -615,9 +777,15 @@ public class InMemoryMap { Configuration newConf = new Configuration(conf); newConf.setInt("io.seqfile.compress.blocksize", 100000); - FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, SiteConfiguration.getInstance()); + AccumuloConfiguration siteConf = SiteConfiguration.getInstance(); - InterruptibleIterator iter = map.skvIterator(); + if (getOrCreateSampler() != null) { + siteConf = createSampleConfig(siteConf); + } + + FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, siteConf); + + InterruptibleIterator iter = map.skvIterator(null); HashSet allfams = new HashSet(); @@ -668,14 +836,28 @@ public class InMemoryMap { tmpMap.delete(); } + private AccumuloConfiguration createSampleConfig(AccumuloConfiguration siteConf) { + ConfigurationCopy confCopy = new ConfigurationCopy(Iterables.filter(siteConf, new Predicate>() { + @Override + public boolean apply(Entry input) { + return !input.getKey().startsWith(Property.TABLE_SAMPLER.getKey()); + } + })); + + for (Entry entry : samplerRef.get().getFirst().toTablePropertiesMap().entrySet()) { + confCopy.set(entry.getKey(), entry.getValue()); + } + + siteConf = confCopy; + return siteConf; + } + private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException { while (iter.hasTop() && activeIters.size() > 0) { // RFile does not support MemKey, so we move the kv count into the value only for the RFile. // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written - Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount); - out.append(iter.getTopKey(), newValue); + out.append(iter.getTopKey(), MemValue.encode(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount)); iter.next(); - } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java index 00c8be9..71a4cbd 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java @@ -61,10 +61,10 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible currVal = v; return; } - currVal = new Value(v); - int mc = MemValue.splitKVCount(currVal); - currKey = new MemKey(k, mc); + MemValue mv = MemValue.decode(v); + currVal = mv.value; + currKey = new MemKey(k, mv.kvCount); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java index bc44459..af6f2f1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java @@ -16,69 +16,38 @@ */ package org.apache.accumulo.tserver; -import java.io.DataOutput; -import java.io.IOException; - import org.apache.accumulo.core.data.Value; /** * */ -public class MemValue extends Value { - int kvCount; - boolean merged = false; +public class MemValue { - public MemValue() { - super(); - this.kvCount = Integer.MAX_VALUE; - } + Value value; + int kvCount; public MemValue(Value value, int kv) { - super(value); + this.value = value; this.kvCount = kv; } - // Override - @Override - public void write(final DataOutput out) throws IOException { - if (!merged) { - byte[] combinedBytes = new byte[getSize() + 4]; - System.arraycopy(value, 0, combinedBytes, 4, getSize()); - combinedBytes[0] = (byte) (kvCount >>> 24); - combinedBytes[1] = (byte) (kvCount >>> 16); - combinedBytes[2] = (byte) (kvCount >>> 8); - combinedBytes[3] = (byte) (kvCount); - value = combinedBytes; - merged = true; - } - super.write(out); - } - - @Override - public void set(final byte[] b) { - super.set(b); - merged = false; - } - - @Override - public void copy(byte[] b) { - super.copy(b); - merged = false; + public static Value encode(Value value, int kv) { + byte[] combinedBytes = new byte[value.getSize() + 4]; + System.arraycopy(value.get(), 0, combinedBytes, 4, value.getSize()); + combinedBytes[0] = (byte) (kv >>> 24); + combinedBytes[1] = (byte) (kv >>> 16); + combinedBytes[2] = (byte) (kv >>> 8); + combinedBytes[3] = (byte) (kv); + return new Value(combinedBytes); } - /** - * Takes a Value and will take out the embedded kvCount, and then return that value while replacing the Value with the original unembedded version - * - * @return The kvCount embedded in v. - */ - public static int splitKVCount(Value v) { - if (v instanceof MemValue) - return ((MemValue) v).kvCount; - + public static MemValue decode(Value v) { byte[] originalBytes = new byte[v.getSize() - 4]; byte[] combined = v.get(); System.arraycopy(combined, 4, originalBytes, 0, originalBytes.length); v.set(originalBytes); - return (combined[0] << 24) + ((combined[1] & 0xFF) << 16) + ((combined[2] & 0xFF) << 8) + (combined[3] & 0xFF); + int kv = (combined[0] << 24) + ((combined[1] & 0xFF) << 16) + ((combined[2] & 0xFF) << 8) + (combined[3] & 0xFF); + + return new MemValue(new Value(originalBytes), kv); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java index cf01dd3..3cb4d40 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java @@ -34,6 +34,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Key; @@ -749,6 +750,9 @@ public class NativeMap implements Iterable> { @Override public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + if (env != null && env.isSamplingEnabled()) { + throw new SampleNotPresentException(); + } return new NMSKVIter(map, interruptFlag); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java index 6c5b63d..73adec3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Map; +import org.apache.accumulo.core.client.SampleNotPresentException; +import org.apache.accumulo.core.client.admin.SamplerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -29,6 +31,7 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.system.MultiIterator; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.tserver.FileManager.ScanFileManager; @@ -40,10 +43,12 @@ public class TabletIteratorEnvironment implements IteratorEnvironment { private final IteratorScope scope; private final boolean fullMajorCompaction; private final AccumuloConfiguration config; - private final ArrayList> topLevelIterators = new ArrayList>(); + private final ArrayList> topLevelIterators; private Map files; private final Authorizations authorizations; // these will only be supplied during scan scope + private SamplerConfiguration samplerConfig; + private boolean enableSampleForDeepCopy; public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config) { if (scope == IteratorScope.majc) @@ -54,10 +59,11 @@ public class TabletIteratorEnvironment implements IteratorEnvironment { this.config = config; this.fullMajorCompaction = false; this.authorizations = Authorizations.EMPTY; + this.topLevelIterators = new ArrayList<>(); } - public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map files, - Authorizations authorizations) { + private TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map files, + Authorizations authorizations, SamplerConfigurationImpl samplerConfig, ArrayList> topLevelIterators) { if (scope == IteratorScope.majc) throw new IllegalArgumentException("must set if compaction is full"); @@ -67,6 +73,19 @@ public class TabletIteratorEnvironment implements IteratorEnvironment { this.fullMajorCompaction = false; this.files = files; this.authorizations = authorizations; + if (samplerConfig != null) { + enableSampleForDeepCopy = true; + this.samplerConfig = samplerConfig.toSamplerConfiguration(); + } else { + enableSampleForDeepCopy = false; + } + + this.topLevelIterators = topLevelIterators; + } + + public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map files, + Authorizations authorizations, SamplerConfigurationImpl samplerConfig) { + this(scope, config, trm, files, authorizations, samplerConfig, new ArrayList>()); } public TabletIteratorEnvironment(IteratorScope scope, boolean fullMajC, AccumuloConfiguration config) { @@ -78,6 +97,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment { this.config = config; this.fullMajorCompaction = fullMajC; this.authorizations = Authorizations.EMPTY; + this.topLevelIterators = new ArrayList>(); } @Override @@ -100,7 +120,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment { @Override public SortedKeyValueIterator reserveMapFileReader(String mapFileName) throws IOException { FileRef ref = new FileRef(mapFileName, new Path(mapFileName)); - return trm.openFiles(Collections.singletonMap(ref, files.get(ref)), false).get(0); + return trm.openFiles(Collections.singletonMap(ref, files.get(ref)), false, null).get(0); } @Override @@ -122,4 +142,37 @@ public class TabletIteratorEnvironment implements IteratorEnvironment { allIters.add(iter); return new MultiIterator(allIters, false); } + + @Override + public boolean isSamplingEnabled() { + return enableSampleForDeepCopy; + } + + @Override + public SamplerConfiguration getSamplerConfiguration() { + if (samplerConfig == null) { + // only create this once so that it stays the same, even if config changes + SamplerConfigurationImpl sci = SamplerConfigurationImpl.newSamplerConfig(config); + if (sci == null) { + return null; + } + samplerConfig = sci.toSamplerConfiguration(); + } + return samplerConfig; + } + + @Override + public IteratorEnvironment cloneWithSamplingEnabled() { + if (!scope.equals(IteratorScope.scan)) { + throw new UnsupportedOperationException(); + } + + SamplerConfigurationImpl sci = SamplerConfigurationImpl.newSamplerConfig(config); + if (sci == null) { + throw new SampleNotPresentException(); + } + + TabletIteratorEnvironment te = new TabletIteratorEnvironment(scope, config, trm, files, authorizations, sci, topLevelIterators); + return te; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index de89b50..d35e6af 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -61,6 +61,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.impl.CompressedIterators; import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig; import org.apache.accumulo.core.client.impl.DurabilityImpl; @@ -114,6 +115,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.replication.ReplicationConstants; import org.apache.accumulo.core.replication.thrift.ReplicationServicer; import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -123,6 +125,8 @@ import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.tabletserver.thrift.TDurability; +import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException; +import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor; @@ -447,8 +451,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable { @Override public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List columns, int batchSize, List ssiList, Map> ssio, List authorizations, boolean waitForWrites, boolean isolated, - long readaheadThreshold, long batchTimeOut) throws NotServingTabletException, ThriftSecurityException, - org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + long readaheadThreshold, TSamplerConfiguration tSamplerConfig, long batchTimeOut) throws NotServingTabletException, ThriftSecurityException, + org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException, TSampleNotPresentException { String tableId = new String(textent.getTable(), UTF_8); if (!security.canScan(credentials, tableId, Tables.getNamespaceId(getInstance(), tableId), range, columns, ssiList, ssio, authorizations)) @@ -480,10 +484,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable { for (TColumn tcolumn : columns) { columnSet.add(new Column(tcolumn)); } + final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio, new Authorizations(authorizations), readaheadThreshold, batchTimeOut); scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated, - scanSession.interruptFlag, scanSession.batchTimeOut); + scanSession.interruptFlag, SamplerConfigurationImpl.fromThrift(tSamplerConfig), scanSession.batchTimeOut); long sid = sessionManager.createSession(scanSession, true); @@ -502,7 +507,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { @Override public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException, - org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException, TSampleNotPresentException { ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID); if (scanSession == null) { throw new NoSuchScanIDException(); @@ -516,7 +521,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException, - org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException, TSampleNotPresentException { if (scanSession.nextBatchTask == null) { scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag); @@ -533,6 +538,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable { throw (NotServingTabletException) e.getCause(); else if (e.getCause() instanceof TooManyFilesException) throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift()); + else if (e.getCause() instanceof SampleNotPresentException) + throw new TSampleNotPresentException(scanSession.extent.toThrift()); else if (e.getCause() instanceof IOException) { sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); List empty = Collections.emptyList(); @@ -595,8 +602,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable { @Override public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map> tbatch, List tcolumns, - List ssiList, Map> ssio, List authorizations, boolean waitForWrites, long batchTimeOut) - throws ThriftSecurityException { + List ssiList, Map> ssio, List authorizations, boolean waitForWrites, + TSamplerConfiguration tSamplerConfig, long batchTimeOut) throws ThriftSecurityException, TSampleNotPresentException { // find all of the tables that need to be scanned final HashSet tables = new HashSet(); for (TKeyExtent keyExtent : tbatch.keySet()) { @@ -627,7 +634,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable { if (waitForWrites) writeTracker.waitForWrites(TabletType.type(batch.keySet())); - final MultiScanSession mss = new MultiScanSession(credentials, threadPoolExtent, batch, ssiList, ssio, new Authorizations(authorizations), batchTimeOut); + final MultiScanSession mss = new MultiScanSession(credentials, threadPoolExtent, batch, ssiList, ssio, new Authorizations(authorizations), + SamplerConfigurationImpl.fromThrift(tSamplerConfig), batchTimeOut); mss.numTablets = batch.size(); for (List ranges : batch.values()) { @@ -653,7 +661,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } @Override - public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException { + public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, TSampleNotPresentException { MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID); @@ -668,7 +676,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } } - private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException { + private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException, TSampleNotPresentException { if (session.lookupTask == null) { session.lookupTask = new LookupTask(TabletServer.this, scanID); @@ -679,6 +687,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable { MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); session.lookupTask = null; return scanResult; + } catch (ExecutionException e) { + sessionManager.removeSession(scanID); + if (e.getCause() instanceof SampleNotPresentException) { + throw new TSampleNotPresentException(); + } else { + log.warn("Failed to get multiscan result", e); + throw new RuntimeException(e); + } } catch (TimeoutException e1) { long timeout = TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT); sessionManager.removeIfNotAccessed(scanID, timeout); @@ -1116,7 +1132,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { IterConfig ic = compressedIters.decompress(tc.iterators); - Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag, 0); + Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag, null, 0); try { ScanBatch batch = scanner.read(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java index b97b88b..04915ef 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategy.java @@ -26,7 +26,10 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.accumulo.core.compaction.CompactionSettings; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.tserver.compaction.CompactionPlan; import org.apache.accumulo.tserver.compaction.CompactionStrategy; @@ -40,6 +43,22 @@ public class ConfigurableCompactionStrategy extends CompactionStrategy { boolean shouldCompact(Entry file, MajorCompactionRequest request); } + private static class NoSampleTest implements Test { + + @Override + public boolean shouldCompact(Entry file, MajorCompactionRequest request) { + try (FileSKVIterator reader = request.openReader(file.getKey())) { + SamplerConfigurationImpl sc = SamplerConfigurationImpl.newSamplerConfig(new ConfigurationCopy(request.getTableProperties())); + if (sc == null) { + return false; + } + return reader.getSample(sc) == null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + private static abstract class FileSizeTest implements Test { private final long esize; @@ -83,6 +102,9 @@ public class ConfigurableCompactionStrategy extends CompactionStrategy { for (Entry entry : es) { switch (CompactionSettings.valueOf(entry.getKey())) { + case SF_NO_SAMPLE: + tests.add(new NoSampleTest()); + break; case SF_LT_ESIZE_OPT: tests.add(new FileSizeTest(entry.getValue()) { @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java index 57a09ce..2d745cb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.impl.Translator; import org.apache.accumulo.core.client.impl.Translators; import org.apache.accumulo.core.conf.Property; @@ -111,7 +112,7 @@ public class LookupTask extends ScanTask { interruptFlag.set(true); lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList, session.ssio, - interruptFlag, session.batchTimeOut); + interruptFlag, session.samplerConfig, session.batchTimeOut); // if the tablet was closed it it possible that the // interrupt flag was set.... do not want it set for @@ -163,6 +164,8 @@ public class LookupTask extends ScanTask { log.warn("Iteration interrupted, when scan not cancelled", iie); addResult(iie); } + } catch (SampleNotPresentException e) { + addResult(e); } catch (Throwable e) { log.warn("exception while doing multi-scan ", e); addResult(e); http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java index e3f4146..ec28367 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java @@ -18,6 +18,7 @@ package org.apache.accumulo.tserver.scan; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.iterators.IterationInterruptedException; import org.apache.accumulo.server.util.Halt; import org.apache.accumulo.tserver.TabletServer; @@ -84,8 +85,8 @@ public class NextBatchTask extends ScanTask { log.warn("Iteration interrupted, when scan not cancelled", iie); addResult(iie); } - } catch (TooManyFilesException tmfe) { - addResult(tmfe); + } catch (TooManyFilesException | SampleNotPresentException e) { + addResult(e); } catch (OutOfMemoryError ome) { Halt.halt("Ran out of memory scanning " + scanSession.extent + " for " + scanSession.client); addResult(ome); http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java index fccac47..16fc218 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java @@ -20,6 +20,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import org.apache.accumulo.core.client.admin.SamplerConfiguration; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.impl.KeyExtent; @@ -36,6 +37,7 @@ public class MultiScanSession extends Session { public final List ssiList; public final Map> ssio; public final Authorizations auths; + public final SamplerConfiguration samplerConfig; public final long batchTimeOut; // stats @@ -47,13 +49,14 @@ public class MultiScanSession extends Session { public volatile ScanTask lookupTask; public MultiScanSession(TCredentials credentials, KeyExtent threadPoolExtent, Map> queries, List ssiList, - Map> ssio, Authorizations authorizations, long batchTimeOut) { + Map> ssio, Authorizations authorizations, SamplerConfiguration samplerConfig, long batchTimeOut) { super(credentials); this.queries = queries; this.ssiList = ssiList; this.ssio = ssio; this.auths = authorizations; this.threadPoolExtent = threadPoolExtent; + this.samplerConfig = samplerConfig; this.batchTimeOut = batchTimeOut; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 853714a..72c289c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.accumulo.core.client.admin.SamplerConfiguration; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -42,6 +43,7 @@ import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSou import org.apache.accumulo.core.iterators.system.StatsIterator; import org.apache.accumulo.core.iterators.system.VisibilityFilter; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.fs.FileRef; @@ -50,6 +52,8 @@ import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; import org.apache.accumulo.tserver.TabletIteratorEnvironment; import org.apache.accumulo.tserver.TabletServer; +import com.google.common.collect.Iterables; + class ScanDataSource implements DataSource { // data source state @@ -65,10 +69,10 @@ class ScanDataSource implements DataSource { private final ScanOptions options; ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, HashSet columnSet, List ssiList, - Map> ssio, AtomicBoolean interruptFlag, long batchTimeOut) { + Map> ssio, AtomicBoolean interruptFlag, SamplerConfiguration samplerConfig, long batchTimeOut) { this.tablet = tablet; expectedDeletionCount = tablet.getDataSourceDeletions(); - this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false, batchTimeOut); + this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false, samplerConfig, batchTimeOut); this.interruptFlag = interruptFlag; } @@ -117,6 +121,8 @@ class ScanDataSource implements DataSource { Map files; + SamplerConfigurationImpl samplerConfig = options.getSamplerConfigurationImpl(); + synchronized (tablet) { if (memIters != null) @@ -141,26 +147,26 @@ class ScanDataSource implements DataSource { // getIterators() throws an exception expectedDeletionCount = tablet.getDataSourceDeletions(); - memIters = tablet.getTabletMemory().getIterators(); + memIters = tablet.getTabletMemory().getIterators(samplerConfig); Pair> reservation = tablet.getDatafileManager().reserveFilesForScan(); fileReservationId = reservation.getFirst(); files = reservation.getSecond(); } - Collection mapfiles = fileManager.openFiles(files, options.isIsolated()); + Collection mapfiles = fileManager.openFiles(files, options.isIsolated(), samplerConfig); + + for (SortedKeyValueIterator skvi : Iterables.concat(mapfiles, memIters)) + ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag); List> iters = new ArrayList>(mapfiles.size() + memIters.size()); iters.addAll(mapfiles); iters.addAll(memIters); - for (SortedKeyValueIterator skvi : iters) - ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag); - MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent()); TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, tablet.getTableConfiguration(), fileManager, files, - options.getAuthorizations()); + options.getAuthorizations(), samplerConfig); statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter()); @@ -212,7 +218,7 @@ class ScanDataSource implements DataSource { public void reattachFileManager() throws IOException { if (fileManager != null) - fileManager.reattach(); + fileManager.reattach(options.getSamplerConfigurationImpl()); } public void detachFileManager() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java index 2a38fbd..c97f3ac 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java @@ -21,8 +21,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.accumulo.core.client.admin.SamplerConfiguration; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; final class ScanOptions { @@ -35,10 +37,11 @@ final class ScanOptions { private final AtomicBoolean interruptFlag; private final int num; private final boolean isolated; + private SamplerConfiguration samplerConfig; private final long batchTimeOut; ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set columnSet, List ssiList, Map> ssio, - AtomicBoolean interruptFlag, boolean isolated, long batchTimeOut) { + AtomicBoolean interruptFlag, boolean isolated, SamplerConfiguration samplerConfig, long batchTimeOut) { this.num = num; this.authorizations = authorizations; this.defaultLabels = defaultLabels; @@ -47,6 +50,7 @@ final class ScanOptions { this.ssio = ssio; this.interruptFlag = interruptFlag; this.isolated = isolated; + this.samplerConfig = samplerConfig; this.batchTimeOut = batchTimeOut; } @@ -82,6 +86,16 @@ final class ScanOptions { return isolated; } + public SamplerConfiguration getSamplerConfiguration() { + return samplerConfig; + } + + public SamplerConfigurationImpl getSamplerConfigurationImpl() { + if (samplerConfig == null) + return null; + return new SamplerConfigurationImpl(samplerConfig); + } + public long getBatchTimeOut() { return batchTimeOut; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index b8c260d..1f66302 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -51,6 +51,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; +import org.apache.accumulo.core.client.admin.SamplerConfiguration; import org.apache.accumulo.core.client.impl.DurabilityImpl; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -632,7 +633,8 @@ public class Tablet implements TabletCommitter { } public LookupResult lookup(List ranges, HashSet columns, Authorizations authorizations, List results, long maxResultSize, - List ssiList, Map> ssio, AtomicBoolean interruptFlag, long batchTimeOut) throws IOException { + List ssiList, Map> ssio, AtomicBoolean interruptFlag, SamplerConfiguration samplerConfig, long batchTimeOut) + throws IOException { if (ranges.size() == 0) { return new LookupResult(); @@ -650,7 +652,8 @@ public class Tablet implements TabletCommitter { tabletRange.clip(range); } - ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, batchTimeOut); + ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, samplerConfig, + batchTimeOut); LookupResult result = null; @@ -754,12 +757,13 @@ public class Tablet implements TabletCommitter { } public Scanner createScanner(Range range, int num, Set columns, Authorizations authorizations, List ssiList, - Map> ssio, boolean isolated, AtomicBoolean interruptFlag, long batchTimeOut) { + Map> ssio, boolean isolated, AtomicBoolean interruptFlag, SamplerConfiguration samplerConfig, long batchTimeOut) { // do a test to see if this range falls within the tablet, if it does not // then clip will throw an exception extent.toDataRange().clip(range); - ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated, batchTimeOut); + ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated, samplerConfig, + batchTimeOut); return new Scanner(this, range, opts); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java index 0b39d40..86cc262 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; import org.apache.accumulo.tserver.InMemoryMap; import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; @@ -156,11 +157,11 @@ class TabletMemory implements Closeable { tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other); } - public List getIterators() { + public List getIterators(SamplerConfigurationImpl samplerConfig) { List toReturn = new ArrayList(2); - toReturn.add(memTable.skvIterator()); + toReturn.add(memTable.skvIterator(samplerConfig)); if (otherMemTable != null) - toReturn.add(otherMemTable.skvIterator()); + toReturn.add(otherMemTable.skvIterator(samplerConfig)); return toReturn; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java index da7157a..7b4d447 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java @@ -26,16 +26,22 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.accumulo.core.client.SampleNotPresentException; +import org.apache.accumulo.core.client.admin.SamplerConfiguration; +import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; @@ -45,21 +51,56 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IterationInterruptedException; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.sample.RowSampler; +import org.apache.accumulo.core.sample.Sampler; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.sample.impl.SamplerFactory; import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ZooConfiguration; import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; import org.apache.hadoop.io.Text; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import com.google.common.collect.ImmutableMap; + public class InMemoryMapTest { + private static class SampleIE extends BaseIteratorEnvironment { + + private final SamplerConfiguration sampleConfig; + + public SampleIE() { + this.sampleConfig = null; + } + + public SampleIE(SamplerConfigurationImpl sampleConfig) { + this.sampleConfig = sampleConfig.toSamplerConfiguration(); + } + + @Override + public boolean isSamplingEnabled() { + return sampleConfig != null; + } + + @Override + public SamplerConfiguration getSamplerConfiguration() { + return sampleConfig; + } + } + + @Rule + public ExpectedException thrown = ExpectedException.none(); + @BeforeClass public static void setUp() throws Exception { // suppress log messages having to do with not having an instance @@ -101,20 +142,42 @@ public class InMemoryMapTest { } static Set newCFSet(String... cfs) { - HashSet cfSet = new HashSet(); + HashSet cfSet = new HashSet<>(); for (String cf : cfs) { cfSet.add(new ArrayByteSequence(cf)); } return cfSet; } + static Set toTextSet(String... cfs) { + HashSet cfSet = new HashSet<>(); + for (String cf : cfs) { + cfSet.add(new Text(cf)); + } + return cfSet; + } + + static ConfigurationCopy newConfig(String memDumpDir) { + ConfigurationCopy config = new ConfigurationCopy(DefaultConfiguration.getInstance()); + config.set(Property.TSERV_NATIVEMAP_ENABLED, "" + false); + config.set(Property.TSERV_MEMDUMP_DIR, memDumpDir); + return config; + } + + static InMemoryMap newInMemoryMap(boolean useNative, String memDumpDir) throws LocalityGroupConfigurationError { + ConfigurationCopy config = new ConfigurationCopy(DefaultConfiguration.getInstance()); + config.set(Property.TSERV_NATIVEMAP_ENABLED, "" + useNative); + config.set(Property.TSERV_MEMDUMP_DIR, memDumpDir); + return new InMemoryMap(config); + } + @Test public void test2() throws Exception { - InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); - MemoryIterator ski1 = imm.skvIterator(); + MemoryIterator ski1 = imm.skvIterator(null); mutate(imm, "r1", "foo:cq1", 3, "bar1"); - MemoryIterator ski2 = imm.skvIterator(); + MemoryIterator ski2 = imm.skvIterator(null); ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); assertFalse(ski1.hasTop()); @@ -128,17 +191,17 @@ public class InMemoryMapTest { @Test public void test3() throws Exception { - InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); mutate(imm, "r1", "foo:cq1", 3, "bar1"); mutate(imm, "r1", "foo:cq1", 3, "bar2"); - MemoryIterator ski1 = imm.skvIterator(); + MemoryIterator ski1 = imm.skvIterator(null); mutate(imm, "r1", "foo:cq1", 3, "bar3"); mutate(imm, "r3", "foo:cq1", 3, "bar9"); mutate(imm, "r3", "foo:cq1", 3, "bara"); - MemoryIterator ski2 = imm.skvIterator(); + MemoryIterator ski2 = imm.skvIterator(null); ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false); ae(ski1, "r1", "foo:cq1", 3, "bar2"); @@ -154,11 +217,11 @@ public class InMemoryMapTest { @Test public void test4() throws Exception { - InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); mutate(imm, "r1", "foo:cq1", 3, "bar1"); mutate(imm, "r1", "foo:cq1", 3, "bar2"); - MemoryIterator ski1 = imm.skvIterator(); + MemoryIterator ski1 = imm.skvIterator(null); mutate(imm, "r1", "foo:cq1", 3, "bar3"); imm.delete(0); @@ -186,13 +249,13 @@ public class InMemoryMapTest { @Test public void test5() throws Exception { - InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); mutate(imm, "r1", "foo:cq1", 3, "bar1"); mutate(imm, "r1", "foo:cq1", 3, "bar2"); mutate(imm, "r1", "foo:cq1", 3, "bar3"); - MemoryIterator ski1 = imm.skvIterator(); + MemoryIterator ski1 = imm.skvIterator(null); ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false); ae(ski1, "r1", "foo:cq1", 3, "bar3"); @@ -204,13 +267,13 @@ public class InMemoryMapTest { ski1.close(); - imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); mutate(imm, "r1", "foo:cq1", 3, "bar1"); mutate(imm, "r1", "foo:cq2", 3, "bar2"); mutate(imm, "r1", "foo:cq3", 3, "bar3"); - ski1 = imm.skvIterator(); + ski1 = imm.skvIterator(null); ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false); ae(ski1, "r1", "foo:cq1", 3, "bar1"); @@ -225,18 +288,18 @@ public class InMemoryMapTest { @Test public void test6() throws Exception { - InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); mutate(imm, "r1", "foo:cq1", 3, "bar1"); mutate(imm, "r1", "foo:cq2", 3, "bar2"); mutate(imm, "r1", "foo:cq3", 3, "bar3"); mutate(imm, "r1", "foo:cq4", 3, "bar4"); - MemoryIterator ski1 = imm.skvIterator(); + MemoryIterator ski1 = imm.skvIterator(null); mutate(imm, "r1", "foo:cq5", 3, "bar5"); - SortedKeyValueIterator dc = ski1.deepCopy(null); + SortedKeyValueIterator dc = ski1.deepCopy(new SampleIE()); ski1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false); ae(ski1, "r1", "foo:cq1", 3, "bar1"); @@ -271,12 +334,12 @@ public class InMemoryMapTest { private void deepCopyAndDelete(int interleaving, boolean interrupt) throws Exception { // interleaving == 0 intentionally omitted, this runs the test w/o deleting in mem map - InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); mutate(imm, "r1", "foo:cq1", 3, "bar1"); mutate(imm, "r1", "foo:cq2", 3, "bar2"); - MemoryIterator ski1 = imm.skvIterator(); + MemoryIterator ski1 = imm.skvIterator(null); AtomicBoolean iflag = new AtomicBoolean(false); ski1.setInterruptFlag(iflag); @@ -287,7 +350,7 @@ public class InMemoryMapTest { iflag.set(true); } - SortedKeyValueIterator dc = ski1.deepCopy(null); + SortedKeyValueIterator dc = ski1.deepCopy(new SampleIE()); if (interleaving == 2) { imm.delete(0); @@ -338,7 +401,7 @@ public class InMemoryMapTest { @Test public void testBug1() throws Exception { - InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); for (int i = 0; i < 20; i++) { mutate(imm, "r1", "foo:cq" + i, 3, "bar" + i); @@ -348,7 +411,7 @@ public class InMemoryMapTest { mutate(imm, "r2", "foo:cq" + i, 3, "bar" + i); } - MemoryIterator ski1 = imm.skvIterator(); + MemoryIterator ski1 = imm.skvIterator(null); ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(ski1); imm.delete(0); @@ -366,14 +429,14 @@ public class InMemoryMapTest { @Test public void testSeekBackWards() throws Exception { - InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); mutate(imm, "r1", "foo:cq1", 3, "bar1"); mutate(imm, "r1", "foo:cq2", 3, "bar2"); mutate(imm, "r1", "foo:cq3", 3, "bar3"); mutate(imm, "r1", "foo:cq4", 3, "bar4"); - MemoryIterator skvi1 = imm.skvIterator(); + MemoryIterator skvi1 = imm.skvIterator(null); skvi1.seek(new Range(nk("r1", "foo:cq3", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false); ae(skvi1, "r1", "foo:cq3", 3, "bar3"); @@ -385,14 +448,14 @@ public class InMemoryMapTest { @Test public void testDuplicateKey() throws Exception { - InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); Mutation m = new Mutation(new Text("r1")); m.put(new Text("foo"), new Text("cq"), 3, new Value("v1".getBytes())); m.put(new Text("foo"), new Text("cq"), 3, new Value("v2".getBytes())); imm.mutate(Collections.singletonList(m)); - MemoryIterator skvi1 = imm.skvIterator(); + MemoryIterator skvi1 = imm.skvIterator(null); skvi1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); ae(skvi1, "r1", "foo:cq", 3, "v2"); ae(skvi1, "r1", "foo:cq", 3, "v1"); @@ -410,12 +473,12 @@ public class InMemoryMapTest { // - hard to get this timing test to run well on apache build machines @Test @Ignore - public void parallelWriteSpeed() throws InterruptedException, IOException { + public void parallelWriteSpeed() throws Exception { List timings = new ArrayList(); for (int threads : new int[] {1, 2, 16, /* 64, 256 */}) { final long now = System.currentTimeMillis(); final long counts[] = new long[threads]; - final InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + final InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); ExecutorService e = Executors.newFixedThreadPool(threads); for (int j = 0; j < threads; j++) { final int threadId = j; @@ -451,12 +514,12 @@ public class InMemoryMapTest { @Test public void testLocalityGroups() throws Exception { + ConfigurationCopy config = newConfig(tempFolder.newFolder().getAbsolutePath()); + config.set(Property.TABLE_LOCALITY_GROUP_PREFIX + "lg1", LocalityGroupUtil.encodeColumnFamilies(toTextSet("cf1", "cf2"))); + config.set(Property.TABLE_LOCALITY_GROUP_PREFIX + "lg2", LocalityGroupUtil.encodeColumnFamilies(toTextSet("cf3", "cf4"))); + config.set(Property.TABLE_LOCALITY_GROUPS.getKey(), "lg1,lg2"); - Map> lggroups1 = new HashMap>(); - lggroups1.put("lg1", newCFSet("cf1", "cf2")); - lggroups1.put("lg2", newCFSet("cf3", "cf4")); - - InMemoryMap imm = new InMemoryMap(lggroups1, false, tempFolder.newFolder().getAbsolutePath()); + InMemoryMap imm = new InMemoryMap(config); Mutation m1 = new Mutation("r1"); m1.put("cf1", "x", 2, "1"); @@ -480,10 +543,10 @@ public class InMemoryMapTest { imm.mutate(Arrays.asList(m1, m2, m3, m4, m5)); - MemoryIterator iter1 = imm.skvIterator(); + MemoryIterator iter1 = imm.skvIterator(null); seekLocalityGroups(iter1); - SortedKeyValueIterator dc1 = iter1.deepCopy(null); + SortedKeyValueIterator dc1 = iter1.deepCopy(new SampleIE()); seekLocalityGroups(dc1); assertTrue(imm.getNumEntries() == 10); @@ -497,6 +560,254 @@ public class InMemoryMapTest { // seekLocalityGroups(iter1.deepCopy(null)); } + @Test + public void testSample() throws Exception { + + SamplerConfigurationImpl sampleConfig = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "7")); + Sampler sampler = SamplerFactory.newSampler(sampleConfig, DefaultConfiguration.getInstance()); + + ConfigurationCopy config1 = newConfig(tempFolder.newFolder().getAbsolutePath()); + for (Entry entry : sampleConfig.toTablePropertiesMap().entrySet()) { + config1.set(entry.getKey(), entry.getValue()); + } + + ConfigurationCopy config2 = newConfig(tempFolder.newFolder().getAbsolutePath()); + config2.set(Property.TABLE_LOCALITY_GROUP_PREFIX + "lg1", LocalityGroupUtil.encodeColumnFamilies(toTextSet("cf2"))); + config2.set(Property.TABLE_LOCALITY_GROUPS.getKey(), "lg1"); + for (Entry entry : sampleConfig.toTablePropertiesMap().entrySet()) { + config2.set(entry.getKey(), entry.getValue()); + } + + for (ConfigurationCopy config : Arrays.asList(config1, config2)) { + + InMemoryMap imm = new InMemoryMap(config); + + TreeMap expectedSample = new TreeMap<>(); + TreeMap expectedAll = new TreeMap<>(); + TreeMap expectedNone = new TreeMap<>(); + + MemoryIterator iter0 = imm.skvIterator(sampleConfig); + + for (int r = 0; r < 100; r++) { + String row = String.format("r%06d", r); + mutate(imm, row, "cf1:cq1", 5, "v" + (2 * r), sampler, expectedSample, expectedAll); + mutate(imm, row, "cf2:cq2", 5, "v" + ((2 * r) + 1), sampler, expectedSample, expectedAll); + } + + assertTrue(expectedSample.size() > 0); + + MemoryIterator iter1 = imm.skvIterator(sampleConfig); + MemoryIterator iter2 = imm.skvIterator(null); + SortedKeyValueIterator iter0dc1 = iter0.deepCopy(new SampleIE()); + SortedKeyValueIterator iter0dc2 = iter0.deepCopy(new SampleIE(sampleConfig)); + SortedKeyValueIterator iter1dc1 = iter1.deepCopy(new SampleIE()); + SortedKeyValueIterator iter1dc2 = iter1.deepCopy(new SampleIE(sampleConfig)); + SortedKeyValueIterator iter2dc1 = iter2.deepCopy(new SampleIE()); + SortedKeyValueIterator iter2dc2 = iter2.deepCopy(new SampleIE(sampleConfig)); + + assertEquals(expectedNone, readAll(iter0)); + assertEquals(expectedNone, readAll(iter0dc1)); + assertEquals(expectedNone, readAll(iter0dc2)); + assertEquals(expectedSample, readAll(iter1)); + assertEquals(expectedAll, readAll(iter2)); + assertEquals(expectedAll, readAll(iter1dc1)); + assertEquals(expectedAll, readAll(iter2dc1)); + assertEquals(expectedSample, readAll(iter1dc2)); + assertEquals(expectedSample, readAll(iter2dc2)); + + imm.delete(0); + + assertEquals(expectedNone, readAll(iter0)); + assertEquals(expectedNone, readAll(iter0dc1)); + assertEquals(expectedNone, readAll(iter0dc2)); + assertEquals(expectedSample, readAll(iter1)); + assertEquals(expectedAll, readAll(iter2)); + assertEquals(expectedAll, readAll(iter1dc1)); + assertEquals(expectedAll, readAll(iter2dc1)); + assertEquals(expectedSample, readAll(iter1dc2)); + assertEquals(expectedSample, readAll(iter2dc2)); + + SortedKeyValueIterator iter0dc3 = iter0.deepCopy(new SampleIE()); + SortedKeyValueIterator iter0dc4 = iter0.deepCopy(new SampleIE(sampleConfig)); + SortedKeyValueIterator iter1dc3 = iter1.deepCopy(new SampleIE()); + SortedKeyValueIterator iter1dc4 = iter1.deepCopy(new SampleIE(sampleConfig)); + SortedKeyValueIterator iter2dc3 = iter2.deepCopy(new SampleIE()); + SortedKeyValueIterator iter2dc4 = iter2.deepCopy(new SampleIE(sampleConfig)); + + assertEquals(expectedNone, readAll(iter0dc3)); + assertEquals(expectedNone, readAll(iter0dc4)); + assertEquals(expectedAll, readAll(iter1dc3)); + assertEquals(expectedAll, readAll(iter2dc3)); + assertEquals(expectedSample, readAll(iter1dc4)); + assertEquals(expectedSample, readAll(iter2dc4)); + + iter1.close(); + iter2.close(); + } + } + + @Test + public void testInterruptingSample() throws Exception { + runInterruptSampleTest(false, false, false); + runInterruptSampleTest(false, true, false); + runInterruptSampleTest(true, false, false); + runInterruptSampleTest(true, true, false); + runInterruptSampleTest(true, true, true); + } + + private void runInterruptSampleTest(boolean deepCopy, boolean delete, boolean dcAfterDelete) throws Exception { + SamplerConfigurationImpl sampleConfig1 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "2")); + Sampler sampler = SamplerFactory.newSampler(sampleConfig1, DefaultConfiguration.getInstance()); + + ConfigurationCopy config1 = newConfig(tempFolder.newFolder().getAbsolutePath()); + for (Entry entry : sampleConfig1.toTablePropertiesMap().entrySet()) { + config1.set(entry.getKey(), entry.getValue()); + } + + InMemoryMap imm = new InMemoryMap(config1); + + TreeMap expectedSample = new TreeMap<>(); + TreeMap expectedAll = new TreeMap<>(); + + for (int r = 0; r < 1000; r++) { + String row = String.format("r%06d", r); + mutate(imm, row, "cf1:cq1", 5, "v" + (2 * r), sampler, expectedSample, expectedAll); + mutate(imm, row, "cf2:cq2", 5, "v" + ((2 * r) + 1), sampler, expectedSample, expectedAll); + } + + assertTrue(expectedSample.size() > 0); + + MemoryIterator miter = imm.skvIterator(sampleConfig1); + AtomicBoolean iFlag = new AtomicBoolean(false); + miter.setInterruptFlag(iFlag); + SortedKeyValueIterator iter = miter; + + if (delete && !dcAfterDelete) { + imm.delete(0); + } + + if (deepCopy) { + iter = iter.deepCopy(new SampleIE(sampleConfig1)); + } + + if (delete && dcAfterDelete) { + imm.delete(0); + } + + assertEquals(expectedSample, readAll(iter)); + iFlag.set(true); + try { + readAll(iter); + Assert.fail(); + } catch (IterationInterruptedException iie) {} + + miter.close(); + } + + private void mutate(InMemoryMap imm, String row, String cols, int ts, String val, Sampler sampler, TreeMap expectedSample, + TreeMap expectedAll) { + mutate(imm, row, cols, ts, val); + Key k1 = nk(row, cols, ts); + if (sampler.accept(k1)) { + expectedSample.put(k1, new Value(val.getBytes())); + } + expectedAll.put(k1, new Value(val.getBytes())); + } + + @Test(expected = SampleNotPresentException.class) + public void testDifferentSampleConfig() throws Exception { + SamplerConfigurationImpl sampleConfig = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "7")); + + ConfigurationCopy config1 = newConfig(tempFolder.newFolder().getAbsolutePath()); + for (Entry entry : sampleConfig.toTablePropertiesMap().entrySet()) { + config1.set(entry.getKey(), entry.getValue()); + } + + InMemoryMap imm = new InMemoryMap(config1); + + mutate(imm, "r", "cf:cq", 5, "b"); + + SamplerConfigurationImpl sampleConfig2 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "9")); + MemoryIterator iter = imm.skvIterator(sampleConfig2); + iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + } + + @Test(expected = SampleNotPresentException.class) + public void testNoSampleConfig() throws Exception { + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + mutate(imm, "r", "cf:cq", 5, "b"); + + SamplerConfigurationImpl sampleConfig2 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "9")); + MemoryIterator iter = imm.skvIterator(sampleConfig2); + iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + } + + @Test + public void testEmptyNoSampleConfig() throws Exception { + InMemoryMap imm = newInMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + SamplerConfigurationImpl sampleConfig2 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "9")); + + // when in mem map is empty should be able to get sample iterator with any sample config + MemoryIterator iter = imm.skvIterator(sampleConfig2); + iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + Assert.assertFalse(iter.hasTop()); + } + + @Test + public void testDeferredSamplerCreation() throws Exception { + SamplerConfigurationImpl sampleConfig1 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "9")); + + ConfigurationCopy config1 = newConfig(tempFolder.newFolder().getAbsolutePath()); + for (Entry entry : sampleConfig1.toTablePropertiesMap().entrySet()) { + config1.set(entry.getKey(), entry.getValue()); + } + + InMemoryMap imm = new InMemoryMap(config1); + + // change sampler config after creating in mem map. + SamplerConfigurationImpl sampleConfig2 = new SamplerConfigurationImpl(RowSampler.class.getName(), ImmutableMap.of("hasher", "murmur3_32", "modulus", "7")); + for (Entry entry : sampleConfig2.toTablePropertiesMap().entrySet()) { + config1.set(entry.getKey(), entry.getValue()); + } + + TreeMap expectedSample = new TreeMap<>(); + TreeMap expectedAll = new TreeMap<>(); + Sampler sampler = SamplerFactory.newSampler(sampleConfig2, config1); + + for (int i = 0; i < 100; i++) { + mutate(imm, "r" + i, "cf:cq", 5, "v" + i, sampler, expectedSample, expectedAll); + } + + MemoryIterator iter = imm.skvIterator(sampleConfig2); + iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + Assert.assertEquals(expectedSample, readAll(iter)); + + SortedKeyValueIterator dc = iter.deepCopy(new SampleIE(sampleConfig2)); + dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + Assert.assertEquals(expectedSample, readAll(dc)); + + iter = imm.skvIterator(null); + iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + Assert.assertEquals(expectedAll, readAll(iter)); + + iter = imm.skvIterator(sampleConfig1); + thrown.expect(SampleNotPresentException.class); + iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + } + + private TreeMap readAll(SortedKeyValueIterator iter) throws IOException { + iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + + TreeMap actual = new TreeMap<>(); + while (iter.hasTop()) { + actual.put(iter.getTopKey(), iter.getTopValue()); + iter.next(); + } + return actual; + } + private void seekLocalityGroups(SortedKeyValueIterator iter1) throws IOException { iter1.seek(new Range(), newCFSet("cf1"), true); ae(iter1, "r1", "cf1:x", 2, "1"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java index 55226fb..0388c1f 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java @@ -41,6 +41,7 @@ import org.apache.accumulo.core.file.NoSuchMetaStoreException; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.fs.FileRef; import org.apache.hadoop.io.Text; @@ -133,6 +134,11 @@ public class DefaultCompactionStrategyTest { @Override public void close() throws IOException {} + @Override + public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) { + return null; + } + } static final DefaultConfiguration dfault = AccumuloConfiguration.getDefaultConfiguration();