Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D89691746F for ; Wed, 11 Feb 2015 15:23:44 +0000 (UTC) Received: (qmail 57954 invoked by uid 500); 11 Feb 2015 15:23:37 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 57811 invoked by uid 500); 11 Feb 2015 15:23:37 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 57382 invoked by uid 99); 11 Feb 2015 15:23:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2015 15:23:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 175A5E07EC; Wed, 11 Feb 2015 15:23:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benedict@apache.org To: commits@cassandra.apache.org Date: Wed, 11 Feb 2015 15:23:41 -0000 Message-Id: <5b9332e061f24321ad7c32249290131b@git.apache.org> In-Reply-To: <7501bce8c33b4b93b41663742cefbc43@git.apache.org> References: <7501bce8c33b4b93b41663742cefbc43@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/7] cassandra git commit: Merge branch 'cassandra-2.1' into trunk http://git-wip-us.apache.org/repos/asf/cassandra/blob/02c34893/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index de2bbc6,0000000..889ade3 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@@ -1,1908 -1,0 +1,2046 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.RateLimiter; + +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import com.clearspring.analytics.stream.cardinality.ICardinality; +import org.apache.cassandra.cache.CachingOptions; +import org.apache.cassandra.cache.InstrumentingCache; +import org.apache.cassandra.cache.KeyCacheKey; +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.*; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.io.compress.CompressedRandomAccessReader; +import org.apache.cassandra.io.compress.CompressedThrottledReader; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.metadata.*; +import org.apache.cassandra.io.util.*; +import org.apache.cassandra.metrics.RestorableMeter; +import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.cassandra.utils.concurrent.Ref; +import org.apache.cassandra.utils.concurrent.RefCounted; + +import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR; + +/** - * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen. - * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead. ++ * An SSTableReader can be constructed in a number of places, but typically is either ++ * read from disk at startup, or constructed from a flushed memtable, or after compaction ++ * to replace some existing sstables. However once created, an sstablereader may also be modified. ++ * ++ * A reader's OpenReason describes its current stage in its lifecycle, as follows: ++ * ++ * NORMAL ++ * From: None => Reader has been read from disk, either at startup or from a flushed memtable ++ * EARLY => Reader is the final result of a compaction ++ * MOVED_START => Reader WAS being compacted, but this failed and it has been restored to NORMAL status ++ * ++ * EARLY ++ * From: None => Reader is a compaction replacement that is either incomplete and has been opened ++ * to represent its partial result status, or has been finished but the compaction ++ * it is a part of has not yet completed fully ++ * EARLY => Same as from None, only it is not the first time it has been ++ * ++ * MOVED_START ++ * From: NORMAL => Reader is being compacted. This compaction has not finished, but the compaction result ++ * is either partially or fully opened, to either partially or fully replace this reader. ++ * This reader's start key has been updated to represent this, so that reads only hit ++ * one or the other reader. ++ * ++ * METADATA_CHANGE ++ * From: NORMAL => Reader has seen low traffic and the amount of memory available for index summaries is ++ * constrained, so its index summary has been downsampled. ++ * METADATA_CHANGE => Same ++ * ++ * Note that in parallel to this, there are two different Descriptor types; TMPLINK and FINAL; the latter corresponds ++ * to NORMAL state readers and all readers that replace a NORMAL one. TMPLINK is used for EARLY state readers and ++ * no others. ++ * ++ * When a reader is being compacted, if the result is large its replacement may be opened as EARLY before compaction ++ * completes in order to present the result to consumers earlier. In this case the reader will itself be changed to ++ * a MOVED_START state, where its start no longer represents its on-disk minimum key. This is to permit reads to be ++ * directed to only one reader when the two represent the same data. The EARLY file can represent a compaction result ++ * that is either partially complete and still in-progress, or a complete and immutable sstable that is part of a larger ++ * macro compaction action that has not yet fully completed. ++ * ++ * Currently ALL compaction results at least briefly go through an EARLY open state prior to completion, regardless ++ * of if early opening is enabled. ++ * ++ * Since a reader can be created multiple times over the same shared underlying resources, and the exact resources ++ * it shares between each instance differ subtly, we track the lifetime of any underlying resource with its own ++ * reference count, which each instance takes a Ref to. Each instance then tracks references to itself, and once these ++ * all expire it releases its Refs to these underlying resources. ++ * ++ * There is some shared cleanup behaviour needed only once all sstablereaders in a certain stage of their lifecycle ++ * (i.e. EARLY or NORMAL opening), and some that must only occur once all readers of any kind over a single logical ++ * sstable have expired. These are managed by the TypeTidy and GlobalTidy classes at the bottom, and are effectively ++ * managed as another resource each instance tracks its own Ref instance to, to ensure all of these resources are ++ * cleaned up safely and can be debugged otherwise. ++ * ++ * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies + */ - public abstract class SSTableReader extends SSTable implements RefCounted ++public abstract class SSTableReader extends SSTable implements RefCounted +{ + private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class); + + private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1); + private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0); + + public static final Comparator maxTimestampComparator = new Comparator() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + long ts1 = o1.getMaxTimestamp(); + long ts2 = o2.getMaxTimestamp(); + return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1)); + } + }; + + public static final Comparator sstableComparator = new Comparator() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + return o1.first.compareTo(o2.first); + } + }; + + public static final Ordering sstableOrdering = Ordering.from(sstableComparator); + + /** + * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound + * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created + * later than maxDataAge. + * + * The field is not serialized to disk, so relying on it for more than what truncate does is not advised. + * + * When a new sstable is flushed, maxDataAge is set to the time of creation. + * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables. + * + * The age is in milliseconds since epoc and is local to this host. + */ + public final long maxDataAge; + + public enum OpenReason + { + NORMAL, + EARLY, - METADATA_CHANGE ++ METADATA_CHANGE, ++ MOVED_START + } + + public final OpenReason openReason; + + // indexfile and datafile: might be null before a call to load() + protected SegmentedFile ifile; + protected SegmentedFile dfile; - + protected IndexSummary indexSummary; + protected IFilter bf; + + protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; + + protected InstrumentingCache keyCache; + + protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker(); + + // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted, + // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone - protected final AtomicBoolean isCompacted = new AtomicBoolean(false); + protected final AtomicBoolean isSuspect = new AtomicBoolean(false); + + // not final since we need to be able to change level on a file. + protected volatile StatsMetadata sstableMetadata; + + protected final AtomicLong keyCacheHit = new AtomicLong(0); + protected final AtomicLong keyCacheRequest = new AtomicLong(0); + - private final Tidier tidy = new Tidier(); - private final RefCounted refCounted = RefCounted.Impl.get(tidy); ++ private final InstanceTidier tidy = new InstanceTidier(descriptor, metadata); ++ private final Ref selfRef = new Ref<>(this, tidy); + - @VisibleForTesting - public RestorableMeter readMeter; - protected ScheduledFuture readMeterSyncFuture; ++ private RestorableMeter readMeter; + + /** + * Calculate approximate key count. + * If cardinality estimator is available on all given sstables, then this method use them to estimate + * key count. + * If not, then this uses index summaries. + * + * @param sstables SSTables to calculate key count + * @return estimated key count + */ + public static long getApproximateKeyCount(Collection sstables) + { + long count = -1; + + // check if cardinality estimator is available for all SSTables + boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate() + { + public boolean apply(SSTableReader sstable) + { + return sstable.descriptor.version.hasNewStatsFile(); + } + }); + + // if it is, load them to estimate key count + if (cardinalityAvailable) + { + boolean failed = false; + ICardinality cardinality = null; + for (SSTableReader sstable : sstables) + { + try + { + CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION); + assert metadata != null : sstable.getFilename(); + if (cardinality == null) + cardinality = metadata.cardinalityEstimator; + else + cardinality = cardinality.merge(metadata.cardinalityEstimator); + } + catch (IOException e) + { + logger.warn("Reading cardinality from Statistics.db failed.", e); + failed = true; + break; + } + catch (CardinalityMergeException e) + { + logger.warn("Cardinality merge failed.", e); + failed = true; + break; + } + } + if (cardinality != null && !failed) + count = cardinality.cardinality(); + } + + // if something went wrong above or cardinality is not available, calculate using index summary + if (count < 0) + { + for (SSTableReader sstable : sstables) + count += sstable.estimatedKeys(); + } + return count; + } + + /** + * Estimates how much of the keys we would keep if the sstables were compacted together + */ + public static double estimateCompactionGain(Set overlapping) + { + Set cardinalities = new HashSet<>(overlapping.size()); + for (SSTableReader sstable : overlapping) + { + try + { + ICardinality cardinality = ((CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION)).cardinalityEstimator; + if (cardinality != null) + cardinalities.add(cardinality); + else + logger.debug("Got a null cardinality estimator in: "+sstable.getFilename()); + } + catch (IOException e) + { + logger.warn("Could not read up compaction metadata for " + sstable, e); + } + } + long totalKeyCountBefore = 0; + for (ICardinality cardinality : cardinalities) + { + totalKeyCountBefore += cardinality.cardinality(); + } + if (totalKeyCountBefore == 0) + return 1; + + long totalKeyCountAfter = mergeCardinalities(cardinalities).cardinality(); + logger.debug("Estimated compaction gain: {}/{}={}", totalKeyCountAfter, totalKeyCountBefore, ((double)totalKeyCountAfter)/totalKeyCountBefore); + return ((double)totalKeyCountAfter)/totalKeyCountBefore; + } + + private static ICardinality mergeCardinalities(Collection cardinalities) + { + ICardinality base = new HyperLogLogPlus(13, 25); // see MetadataCollector.cardinality + try + { + base = base.merge(cardinalities.toArray(new ICardinality[cardinalities.size()])); + } + catch (CardinalityMergeException e) + { + logger.warn("Could not merge cardinalities", e); + } + return base; + } + + public static SSTableReader open(Descriptor descriptor) throws IOException + { + CFMetaData metadata; + if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)) + { + int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR); + String parentName = descriptor.cfname.substring(0, i); + CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName); + ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1)); + metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def)); + } + else + { + metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname); + } + return open(descriptor, metadata); + } + + public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException + { + IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR) + ? new LocalPartitioner(metadata.getKeyValidator()) + : StorageService.getPartitioner(); + return open(desc, componentsFor(desc), metadata, p); + } + + public static SSTableReader open(Descriptor descriptor, Set components, CFMetaData metadata, IPartitioner partitioner) throws IOException + { + return open(descriptor, components, metadata, partitioner, true); + } + + public static SSTableReader openNoValidation(Descriptor descriptor, Set components, CFMetaData metadata) throws IOException + { + return open(descriptor, components, metadata, StorageService.getPartitioner(), false); + } + + /** + * Open SSTable reader to be used in batch mode(such as sstableloader). + * + * @param descriptor + * @param components + * @param metadata + * @param partitioner + * @return opened SSTableReader + * @throws IOException + */ + public static SSTableReader openForBatch(Descriptor descriptor, Set components, CFMetaData metadata, IPartitioner partitioner) throws IOException + { + // Minimum components without which we can't do anything + assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; + assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; + + Map sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, + EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS)); + ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); + StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS); + + // Check if sstable is created using same partitioner. + // Partitioner can be null, which indicates older version of sstable or no stats available. + // In that case, we skip the check. + String partitionerName = partitioner.getClass().getCanonicalName(); + if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) + { + logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", + descriptor, validationMetadata.partitioner, partitionerName)); + System.exit(1); + } + + logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); + SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(), + statsMetadata, OpenReason.NORMAL); + + // special implementation of load to use non-pooled SegmentedFile builders + SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder(); + SegmentedFile.Builder dbuilder = sstable.compression + ? new CompressedSegmentedFile.Builder(null) + : new BufferedSegmentedFile.Builder(); + if (!sstable.loadSummary(ibuilder, dbuilder)) + sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL); + sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)); + sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA)); + sstable.bf = FilterFactory.AlwaysPresent; - sstable.tidy.setup(sstable); ++ sstable.setup(); + return sstable; + } + + private static SSTableReader open(Descriptor descriptor, + Set components, + CFMetaData metadata, + IPartitioner partitioner, + boolean validate) throws IOException + { + // Minimum components without which we can't do anything + assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; + assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; + + Map sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, + EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS)); + ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); + StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS); + + // Check if sstable is created using same partitioner. + // Partitioner can be null, which indicates older version of sstable or no stats available. + // In that case, we skip the check. + String partitionerName = partitioner.getClass().getCanonicalName(); + if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) + { + logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", + descriptor, validationMetadata.partitioner, partitionerName)); + System.exit(1); + } + + logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); + SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(), + statsMetadata, OpenReason.NORMAL); + + // load index and filter + long start = System.nanoTime(); + sstable.load(validationMetadata); + logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + ++ sstable.setup(); + if (validate) + sstable.validate(); + + if (sstable.getKeyCache() != null) + logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity()); + - sstable.tidy.setup(sstable); + return sstable; + } + + public static void logOpenException(Descriptor descriptor, IOException e) + { + if (e instanceof FileNotFoundException) + logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage()); + else + logger.error("Corrupt sstable {}; skipped", descriptor, e); + } + + public static Collection openAll(Set>> entries, + final CFMetaData metadata, + final IPartitioner partitioner) + { + final Collection sstables = new LinkedBlockingQueue<>(); + + ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors()); + for (final Map.Entry> entry : entries) + { + Runnable runnable = new Runnable() + { + public void run() + { + SSTableReader sstable; + try + { + sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner); + } + catch (IOException ex) + { + logger.error("Corrupt sstable {}; skipped", entry, ex); + return; + } + sstables.add(sstable); + } + }; + executor.submit(runnable); + } + + executor.shutdown(); + try + { + executor.awaitTermination(7, TimeUnit.DAYS); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + + return sstables; + + } + + /** + * Open a RowIndexedReader which already has its state initialized (by SSTableWriter). + */ + public static SSTableReader internalOpen(Descriptor desc, + Set components, + CFMetaData metadata, + IPartitioner partitioner, + SegmentedFile ifile, + SegmentedFile dfile, + IndexSummary isummary, + IFilter bf, + long maxDataAge, + StatsMetadata sstableMetadata, + OpenReason openReason) + { + assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null; + + SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); + + reader.bf = bf; + reader.ifile = ifile; + reader.dfile = dfile; + reader.indexSummary = isummary; - reader.tidy.setup(reader); ++ reader.setup(); + + return reader; + } + + + private static SSTableReader internalOpen(final Descriptor descriptor, + Set components, + CFMetaData metadata, + IPartitioner partitioner, + Long maxDataAge, + StatsMetadata sstableMetadata, + OpenReason openReason) + { + Factory readerFactory = descriptor.getFormat().getReaderFactory(); + + return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); + } + + protected SSTableReader(final Descriptor desc, + Set components, + CFMetaData metadata, + IPartitioner partitioner, + long maxDataAge, + StatsMetadata sstableMetadata, + OpenReason openReason) + { + super(desc, components, metadata, partitioner); + this.sstableMetadata = sstableMetadata; + this.maxDataAge = maxDataAge; + this.openReason = openReason; - + this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata); - - tidy.deletingTask = new SSTableDeletingTask(this); - - // Don't track read rates for tables in the system keyspace. Also don't track reads for special operations (like early open) - // this is to avoid overflowing the executor queue (see CASSANDRA-8066) - if (SystemKeyspace.NAME.equals(desc.ksname) || openReason != OpenReason.NORMAL) - { - readMeter = null; - readMeterSyncFuture = null; - return; - } - - readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation); - // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now - readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable() - { - public void run() - { - if (!isCompacted.get()) - { - meterSyncThrottle.acquire(); - SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter); - } - } - }, 1, 5, TimeUnit.MINUTES); + } + + public static long getTotalBytes(Iterable sstables) + { + long sum = 0; + for (SSTableReader sstable : sstables) + { + sum += sstable.onDiskLength(); + } + return sum; + } + + public boolean equals(Object that) + { + return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor); + } + + public int hashCode() + { + return this.descriptor.hashCode(); + } + + public String getFilename() + { + return dfile.path; + } + + public String getIndexFilename() + { + return ifile.path; + } + + public void setTrackedBy(DataTracker tracker) + { - tidy.deletingTask.setTracker(tracker); ++ tidy.type.deletingTask.setTracker(tracker); + // under normal operation we can do this at any time, but SSTR is also used outside C* proper, + // e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache + // here when we know we're being wired into the rest of the server infrastructure. + keyCache = CacheService.instance.keyCache; + } + + private void load(ValidationMetadata validation) throws IOException + { + if (metadata.getBloomFilterFpChance() == 1.0) + { + // bf is disabled. + load(false, true); + bf = FilterFactory.AlwaysPresent; + } + else if (!components.contains(Component.FILTER) || validation == null) + { + // bf is enabled, but filter component is missing. + load(true, true); + } + else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance()) + { + // bf fp chance in sstable metadata and it has changed since compaction. + load(true, true); + } + else + { + // bf is enabled and fp chance matches the currently configured value. + load(false, true); + loadBloomFilter(); + } + } + + /** + * Load bloom filter from Filter.db file. + * + * @throws IOException + */ + private void loadBloomFilter() throws IOException + { + DataInputStream stream = null; + try + { + stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER)))); + bf = FilterFactory.deserialize(stream, true); + } + finally + { + FileUtils.closeQuietly(stream); + } + } + + /** + * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter. + * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can + * avoid persisting it to disk by setting this to false + */ + private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException + { + SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + SegmentedFile.Builder dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + + boolean summaryLoaded = loadSummary(ibuilder, dbuilder); + if (recreateBloomFilter || !summaryLoaded) + buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL); + + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk + saveSummary(ibuilder, dbuilder); - tidy.setup(this); + } + + /** + * Build index summary(and optionally bloom filter) by reading through Index.db file. + * + * @param recreateBloomFilter true if recreate bloom filter + * @param ibuilder + * @param dbuilder + * @param summaryLoaded true if index summary is already loaded and not need to build again + * @throws IOException + */ + private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException + { + // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. + RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + + try + { + long indexSize = primaryIndex.length(); + long histogramCount = sstableMetadata.estimatedRowSize.count(); + long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed() + ? histogramCount + : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional + + if (recreateBloomFilter) + bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true); + + IndexSummaryBuilder summaryBuilder = null; + if (!summaryLoaded) + summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel); + + long indexPosition; + RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata); + + while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) + { + ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); + RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version); + DecoratedKey decoratedKey = partitioner.decorateKey(key); + if (first == null) + first = decoratedKey; + last = decoratedKey; + + if (recreateBloomFilter) + bf.add(decoratedKey.getKey()); + + // if summary was already read from disk we don't want to re-populate it using primary index + if (!summaryLoaded) + { + summaryBuilder.maybeAddEntry(decoratedKey, indexPosition); + ibuilder.addPotentialBoundary(indexPosition); + dbuilder.addPotentialBoundary(indexEntry.position); + } + } + + if (!summaryLoaded) + indexSummary = summaryBuilder.build(partitioner); + } + finally + { + FileUtils.closeQuietly(primaryIndex); + } + + first = getMinimalKey(first); + last = getMinimalKey(last); + } + + /** + * Load index summary from Summary.db file if it exists. + * + * if loaded index summary has different index interval from current value stored in schema, + * then Summary.db file will be deleted and this returns false to rebuild summary. + * + * @param ibuilder + * @param dbuilder + * @return true if index summary is loaded successfully from Summary.db file. + */ + public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY)); + if (!summariesFile.exists()) + return false; + + DataInputStream iStream = null; + try + { + iStream = new DataInputStream(new FileInputStream(summariesFile)); + indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval()); + first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); + last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); + ibuilder.deserializeBounds(iStream); + dbuilder.deserializeBounds(iStream); + } + catch (IOException e) + { ++ if (indexSummary != null) ++ indexSummary.close(); + logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage()); + // corrupted; delete it and fall back to creating a new summary + FileUtils.closeQuietly(iStream); + // delete it and fall back to creating a new summary + FileUtils.deleteWithConfirm(summariesFile); + return false; + } + finally + { + FileUtils.closeQuietly(iStream); + } + + return true; + } + + /** + * Save index summary to Summary.db file. + * + * @param ibuilder + * @param dbuilder + */ + public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + saveSummary(ibuilder, dbuilder, indexSummary); + } + + private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary) + { + File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY)); + if (summariesFile.exists()) + FileUtils.deleteWithConfirm(summariesFile); + + DataOutputStreamAndChannel oStream = null; + try + { + oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile)); + IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel()); + ByteBufferUtil.writeWithLength(first.getKey(), oStream); + ByteBufferUtil.writeWithLength(last.getKey(), oStream); + ibuilder.serializeBounds(oStream); + dbuilder.serializeBounds(oStream); + } + catch (IOException e) + { + logger.debug("Cannot save SSTable Summary: ", e); + + // corrupted hence delete it and let it load it now. + if (summariesFile.exists()) + FileUtils.deleteWithConfirm(summariesFile); + } + finally + { + FileUtils.closeQuietly(oStream); + } + } + + public void setReplacedBy(SSTableReader replacement) + { - synchronized (tidy.replaceLock) ++ synchronized (tidy.global) + { - assert tidy.replacedBy == null; - tidy.replacedBy = replacement; - replacement.tidy.replaces = this; - replacement.tidy.replaceLock = tidy.replaceLock; ++ assert replacement != null; ++ assert !tidy.isReplaced; ++ assert tidy.global.live == this; ++ tidy.isReplaced = true; ++ tidy.global.live = replacement; + } + } + + public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose) + { - synchronized (tidy.replaceLock) ++ synchronized (tidy.global) + { - assert tidy.replacedBy == null; ++ assert openReason != OpenReason.EARLY; + + if (newStart.compareTo(this.first) > 0) + { + if (newStart.compareTo(this.last) > 0) + { + this.tidy.runOnClose = new Runnable() + { + public void run() + { + CLibrary.trySkipCache(dfile.path, 0, 0); + CLibrary.trySkipCache(ifile.path, 0, 0); + runOnClose.run(); + } + }; + } + else + { + final long dataStart = getPosition(newStart, Operator.GE).position; + final long indexStart = getIndexScanPosition(newStart); + this.tidy.runOnClose = new Runnable() + { + public void run() + { + CLibrary.trySkipCache(dfile.path, 0, dataStart); + CLibrary.trySkipCache(ifile.path, 0, indexStart); + runOnClose.run(); + } + }; + } + } + - SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata, - openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE); - replacement.readMeterSyncFuture = this.readMeterSyncFuture; - replacement.readMeter = this.readMeter; ++ SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(), ++ dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(), ++ maxDataAge, sstableMetadata, OpenReason.MOVED_START); + replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last; + replacement.last = this.last; + setReplacedBy(replacement); + return replacement; + } + } + + /** + * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will + * be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have + * its DeletingTask removed, and have its periodic read-meter sync task cancelled. + * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader + * @return a new SSTableReader + * @throws IOException + */ + public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException + { - synchronized (tidy.replaceLock) ++ synchronized (tidy.global) + { - assert tidy.replacedBy == null; ++ assert openReason != OpenReason.EARLY; + + int minIndexInterval = metadata.getMinIndexInterval(); + int maxIndexInterval = metadata.getMaxIndexInterval(); + double effectiveInterval = indexSummary.getEffectiveIndexInterval(); + + IndexSummary newSummary; + long oldSize = bytesOnDisk(); + + // We have to rebuild the summary from the on-disk primary index in three cases: + // 1. The sampling level went up, so we need to read more entries off disk + // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary + // at full sampling (and consequently at any other sampling level) + // 3. The max_index_interval was lowered, forcing us to raise the sampling level + if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval) + { + newSummary = buildSummaryAtLevel(samplingLevel); + } + else if (samplingLevel < indexSummary.getSamplingLevel()) + { + // we can use the existing index summary to make a smaller one + newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner); + + SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + SegmentedFile.Builder dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + saveSummary(ibuilder, dbuilder, newSummary); + } + else + { + throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " + + "no adjustments to min/max_index_interval"); + } + + long newSize = bytesOnDisk(); + StorageMetrics.load.inc(newSize - oldSize); + parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize); + - SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata, - openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE); - replacement.readMeterSyncFuture = this.readMeterSyncFuture; - replacement.readMeter = this.readMeter; ++ SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(), ++ dfile.sharedCopy(), newSummary, bf.sharedCopy(), maxDataAge, ++ sstableMetadata, OpenReason.METADATA_CHANGE); + replacement.first = this.first; + replacement.last = this.last; + setReplacedBy(replacement); + return replacement; + } + } + + private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException + { + // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. + RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + try + { + long indexSize = primaryIndex.length(); + IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel); + + long indexPosition; + while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) + { + summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition); + RowIndexEntry.Serializer.skip(primaryIndex); + } + + return summaryBuilder.build(partitioner); + } + finally + { + FileUtils.closeQuietly(primaryIndex); + } + } + ++ public RestorableMeter getReadMeter() ++ { ++ return readMeter; ++ } ++ + public int getIndexSummarySamplingLevel() + { + return indexSummary.getSamplingLevel(); + } + + public long getIndexSummaryOffHeapSize() + { + return indexSummary.getOffHeapSize(); + } + + public int getMinIndexInterval() + { + return indexSummary.getMinIndexInterval(); + } + + public double getEffectiveIndexInterval() + { + return indexSummary.getEffectiveIndexInterval(); + } + + public void releaseSummary() + { - indexSummary.close(); ++ tidy.releaseSummary(); + indexSummary = null; + } + + private void validate() + { + if (this.first.compareTo(this.last) > 0) ++ { ++ selfRef().release(); + throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last)); ++ } + } + + /** + * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away, + * modulo downsampling of the index summary). + */ + public long getIndexScanPosition(RowPosition key) + { + return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary); + } + + protected static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary) + { + if (binarySearchResult == -1) + return -1; + else + return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult)); + } + + protected static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult) + { + if (binarySearchResult < 0) + { + // binary search gives us the first index _greater_ than the key searched for, + // i.e., its insertion position + int greaterThan = (binarySearchResult + 1) * -1; + if (greaterThan == 0) + return -1; + return greaterThan - 1; + } + else + { + return binarySearchResult; + } + } + + /** + * Returns the compression metadata for this sstable. + * @throws IllegalStateException if the sstable is not compressed + */ + public CompressionMetadata getCompressionMetadata() + { + if (!compression) + throw new IllegalStateException(this + " is not compressed"); + + CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata(); + + //We need the parent cf metadata + String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName; + cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName)); + + return cmd; + } + + /** + * Returns the amount of memory in bytes used off heap by the compression meta-data. + * @return the amount of memory in bytes used off heap by the compression meta-data + */ + public long getCompressionMetadataOffHeapSize() + { + if (!compression) + return 0; + + return getCompressionMetadata().offHeapSize(); + } + + /** + * For testing purposes only. + */ + public void forceFilterFailures() + { + bf = FilterFactory.AlwaysPresent; + } + + public IFilter getBloomFilter() + { + return bf; + } + + public long getBloomFilterSerializedSize() + { + return bf.serializedSize(); + } + + /** + * Returns the amount of memory in bytes used off heap by the bloom filter. + * @return the amount of memory in bytes used off heap by the bloom filter + */ + public long getBloomFilterOffHeapSize() + { + return bf.offHeapSize(); + } + + /** + * @return An estimate of the number of keys in this SSTable based on the index summary. + */ + public long estimatedKeys() + { + return indexSummary.getEstimatedKeyCount(); + } + + /** + * @param ranges + * @return An estimate of the number of keys for given ranges in this SSTable. + */ + public long estimatedKeysForRanges(Collection> ranges) + { + long sampleKeyCount = 0; + List> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges); + for (Pair sampleIndexRange : sampleIndexes) + sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1); + + // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling + long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel(); + return Math.max(1, estimatedKeys); + } + + /** + * Returns the number of entries in the IndexSummary. At full sampling, this is approximately 1/INDEX_INTERVALth of + * the keys in this SSTable. + */ + public int getIndexSummarySize() + { + return indexSummary.size(); + } + + /** + * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling. + */ + public int getMaxIndexSummarySize() + { + return indexSummary.getMaxNumberOfEntries(); + } + + /** + * Returns the key for the index summary entry at `index`. + */ + public byte[] getIndexSummaryKey(int index) + { + return indexSummary.getKey(index); + } + + private static List> getSampleIndexesForRanges(IndexSummary summary, Collection> ranges) + { + // use the index to determine a minimal section for each range + List> positions = new ArrayList<>(); + + for (Range range : Range.normalize(ranges)) + { + RowPosition leftPosition = range.left.maxKeyBound(); + RowPosition rightPosition = range.right.maxKeyBound(); + + int left = summary.binarySearch(leftPosition); + if (left < 0) + left = (left + 1) * -1; + else + // left range are start exclusive + left = left + 1; + if (left == summary.size()) + // left is past the end of the sampling + continue; + + int right = Range.isWrapAround(range.left, range.right) + ? summary.size() - 1 + : summary.binarySearch(rightPosition); + if (right < 0) + { + // range are end inclusive so we use the previous index from what binarySearch give us + // since that will be the last index we will return + right = (right + 1) * -1; + if (right == 0) + // Means the first key is already stricly greater that the right bound + continue; + right--; + } + + if (left > right) + // empty range + continue; + positions.add(Pair.create(left, right)); + } + return positions; + } + + public Iterable getKeySamples(final Range range) + { + final List> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range)); + + if (indexRanges.isEmpty()) + return Collections.emptyList(); + + return new Iterable() + { + public Iterator iterator() + { + return new Iterator() + { + private Iterator> rangeIter = indexRanges.iterator(); + private Pair current; + private int idx; + + public boolean hasNext() + { + if (current == null || idx > current.right) + { + if (rangeIter.hasNext()) + { + current = rangeIter.next(); + idx = current.left; + return true; + } + return false; + } + + return true; + } + + public DecoratedKey next() + { + byte[] bytes = indexSummary.getKey(idx++); + return partitioner.decorateKey(ByteBuffer.wrap(bytes)); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + + /** + * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges. + * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable. + */ + public List> getPositionsForRanges(Collection> ranges) + { + // use the index to determine a minimal section for each range + List> positions = new ArrayList<>(); + for (Range range : Range.normalize(ranges)) + { + assert !range.isWrapAround() || range.right.isMinimum(); + // truncate the range so it at most covers the sstable + AbstractBounds bounds = range.toRowBounds(); + RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound(); + RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right; + + if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0) + continue; + + long left = getPosition(leftBound, Operator.GT).position; + long right = (rightBound.compareTo(last) > 0) + ? (openReason == OpenReason.EARLY + // if opened early, we overlap with the old sstables by one key, so we know that the last + // (and further) key(s) will be streamed from these if necessary + ? getPosition(last.getToken().maxKeyBound(), Operator.GT).position + : uncompressedLength()) + : getPosition(rightBound, Operator.GT).position; + + if (left == right) + // empty range + continue; + + assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right); + positions.add(Pair.create(left, right)); + } + return positions; + } + + public void invalidateCacheKey(DecoratedKey key) + { + KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey()); + keyCache.remove(cacheKey); + } + + public void cacheKey(DecoratedKey key, RowIndexEntry info) + { + CachingOptions caching = metadata.getCaching(); + + if (!caching.keyCache.isEnabled() + || keyCache == null + || keyCache.getCapacity() == 0) + { + return; + } + + KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey()); + logger.trace("Adding cache entry for {} -> {}", cacheKey, info); + keyCache.put(cacheKey, info); + } + + public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats) + { + return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats); + } + + protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats) + { + if (keyCache != null && keyCache.getCapacity() > 0) { + if (updateStats) + { + RowIndexEntry cachedEntry = keyCache.get(unifiedKey); + keyCacheRequest.incrementAndGet(); + if (cachedEntry != null) + { + keyCacheHit.incrementAndGet(); + bloomFilterTracker.addTruePositive(); + } + return cachedEntry; + } + else + { + return keyCache.getInternal(unifiedKey); + } + } + return null; + } + + /** + * Get position updating key cache and stats. + * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean) + */ + public RowIndexEntry getPosition(RowPosition key, Operator op) + { + return getPosition(key, op, true); + } + + /** + * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to + * allow key selection by token bounds but only if op != * EQ + * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins. + * @param updateCacheAndStats true if updating stats and cache + * @return The index entry corresponding to the key, or null if the key is not present + */ + public abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats); + + //Corresponds to a name column + public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet columns); + public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet columns, RowIndexEntry indexEntry); + + //Corresponds to a slice query + public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse); + public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry); + + /** + * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists. + */ + public DecoratedKey firstKeyBeyond(RowPosition token) + { + long sampledPosition = getIndexScanPosition(token); + if (sampledPosition == -1) + sampledPosition = 0; + + Iterator segments = ifile.iterator(sampledPosition); + while (segments.hasNext()) + { + FileDataInput in = segments.next(); + try + { + while (!in.isEOF()) + { + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); + if (indexDecoratedKey.compareTo(token) > 0) + return indexDecoratedKey; + + RowIndexEntry.Serializer.skip(in); + } + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, in.getPath()); + } + finally + { + FileUtils.closeQuietly(in); + } + } + + return null; + } + + /** + * @return The length in bytes of the data for this SSTable. For + * compressed files, this is not the same thing as the on disk size (see + * onDiskLength()) + */ + public long uncompressedLength() + { + return dfile.length; + } + + /** + * @return The length in bytes of the on disk size for this SSTable. For + * compressed files, this is not the same thing as the data length (see + * length()) + */ + public long onDiskLength() + { + return dfile.onDiskLength; + } + + /** + * Mark the sstable as obsolete, i.e., compacted into newer sstables. + * + * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere + * except for threads holding a reference. + * + * @return true if the this is the first time the file was marked obsolete. Calling this + * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize). + */ + public boolean markObsolete() + { + if (logger.isDebugEnabled()) + logger.debug("Marking {} compacted", getFilename()); + - synchronized (tidy.replaceLock) ++ synchronized (tidy.global) + { - assert tidy.replacedBy == null : getFilename(); ++ assert !tidy.isReplaced; + } - return !isCompacted.getAndSet(true); ++ return !tidy.global.isCompacted.getAndSet(true); + } + + public boolean isMarkedCompacted() + { - return isCompacted.get(); ++ return tidy.global.isCompacted.get(); + } + + public void markSuspect() + { + if (logger.isDebugEnabled()) + logger.debug("Marking {} as a suspect for blacklisting.", getFilename()); + + isSuspect.getAndSet(true); + } + + public boolean isMarkedSuspect() + { + return isSuspect.get(); + } + + + /** + * I/O SSTableScanner + * @return A Scanner for seeking over the rows of the SSTable. + */ + public ISSTableScanner getScanner() + { + return getScanner((RateLimiter) null); + } + + public ISSTableScanner getScanner(RateLimiter limiter) + { + return getScanner(DataRange.allData(partitioner), limiter); + } + + /** + * + * @param dataRange filter to use when reading the columns + * @return A Scanner for seeking over the rows of the SSTable. + */ + public ISSTableScanner getScanner(DataRange dataRange) + { + return getScanner(dataRange, null); + } + + /** + * Direct I/O SSTableScanner over a defined range of tokens. + * + * @param range the range of keys to cover + * @return A Scanner for seeking over the rows of the SSTable. + */ + public ISSTableScanner getScanner(Range range, RateLimiter limiter) + { + if (range == null) + return getScanner(limiter); + return getScanner(Collections.singletonList(range), limiter); + } + + /** + * Direct I/O SSTableScanner over a defined collection of ranges of tokens. + * + * @param ranges the range of keys to cover + * @return A Scanner for seeking over the rows of the SSTable. + */ + public abstract ISSTableScanner getScanner(Collection> ranges, RateLimiter limiter); + + /** + * + * @param dataRange filter to use when reading the columns + * @return A Scanner for seeking over the rows of the SSTable. + */ + public abstract ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter); + + + + public FileDataInput getFileDataInput(long position) + { + return dfile.getSegment(position); + } + + /** + * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time). + * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable. + * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host + * @return True iff this sstable contains data that's newer than the given age parameter. + */ + public boolean newSince(long age) + { + return maxDataAge > age; + } + + public void createLinks(String snapshotDirectoryPath) + { + for (Component component : components) + { + File sourceFile = new File(descriptor.filenameFor(component)); + File targetLink = new File(snapshotDirectoryPath, sourceFile.getName()); + FileUtils.createHardLink(sourceFile, targetLink); + } + } + + public boolean isRepaired() + { + return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE; + } + + public SSTableReader getCurrentReplacement() + { - synchronized (tidy.replaceLock) - { - SSTableReader cur = this, next = tidy.replacedBy; - while (next != null) - { - cur = next; - next = next.tidy.replacedBy; - } - return cur; - } ++ return tidy.global.live; + } + + /** + * TODO: Move someplace reusable + */ + public abstract static class Operator + { + public static final Operator EQ = new Equals(); + public static final Operator GE = new GreaterThanOrEqualTo(); + public static final Operator GT = new GreaterThan(); + + /** + * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs. + * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward. + */ + public abstract int apply(int comparison); + + final static class Equals extends Operator + { + public int apply(int comparison) { return -comparison; } + } + + final static class GreaterThanOrEqualTo extends Operator + { + public int apply(int comparison) { return comparison >= 0 ? 0 : 1; } + } + + final static class GreaterThan extends Operator + { + public int apply(int comparison) { return comparison > 0 ? 0 : 1; } + } + } + + public long getBloomFilterFalsePositiveCount() + { + return bloomFilterTracker.getFalsePositiveCount(); + } + + public long getRecentBloomFilterFalsePositiveCount() + { + return bloomFilterTracker.getRecentFalsePositiveCount(); + } + + public long getBloomFilterTruePositiveCount() + { + return bloomFilterTracker.getTruePositiveCount(); + } + + public long getRecentBloomFilterTruePositiveCount() + { + return bloomFilterTracker.getRecentTruePositiveCount(); + } + + public InstrumentingCache getKeyCache() + { + return keyCache; + } + + public EstimatedHistogram getEstimatedRowSize() + { + return sstableMetadata.estimatedRowSize; + } + + public EstimatedHistogram getEstimatedColumnCount() + { + return sstableMetadata.estimatedColumnCount; + } + + public double getEstimatedDroppableTombstoneRatio(int gcBefore) + { + return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore); + } + + public double getDroppableTombstonesBefore(int gcBefore) + { + return sstableMetadata.getDroppableTombstonesBefore(gcBefore); + } + + public double getCompressionRatio() + { + return sstableMetadata.compressionRatio; + } + + public ReplayPosition getReplayPosition() + { + return sstableMetadata.replayPosition; + } + + public long getMinTimestamp() + { + return sstableMetadata.minTimestamp; + } + + public long getMaxTimestamp() + { + return sstableMetadata.maxTimestamp; + } + + public Set getAncestors() + { + try + { + CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION); + return compactionMetadata.ancestors; + } + catch (IOException e) + { + SSTableReader.logOpenException(descriptor, e); + return Collections.emptySet(); + } + } + + public int getSSTableLevel() + { + return sstableMetadata.sstableLevel; + } + + /** + * Reloads the sstable metadata from disk. + * + * Called after level is changed on sstable, for example if the sstable is dropped to L0 + * + * Might be possible to remove in future versions + * + * @throws IOException + */ + public void reloadSSTableMetadata() throws IOException + { + this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS); + } + + public StatsMetadata getSSTableMetadata() + { + return sstableMetadata; + } + + public RandomAccessReader openDataReader(RateLimiter limiter) + { + assert limiter != null; + return compression + ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter) + : ThrottledReader.open(new File(getFilename()), limiter); + } + + public RandomAccessReader openDataReader() + { + return compression + ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata()) + : RandomAccessReader.open(new File(getFilename())); + } + + public RandomAccessReader openIndexReader() + { + return RandomAccessReader.open(new File(getIndexFilename())); + } + + /** + * @param component component to get timestamp. + * @return last modified time for given component. 0 if given component does not exist or IO error occurs. + */ + public long getCreationTimeFor(Component component) + { + return new File(descriptor.filenameFor(component)).lastModified(); + } + + /** + * @return Number of key cache hit + */ + public long getKeyCacheHit() + { + return keyCacheHit.get(); + } + + /** + * @return Number of key cache request + */ + public long getKeyCacheRequest() + { + return keyCacheRequest.get(); + } + + /** + * Increment the total row read count and read rate for this SSTable. This should not be incremented for range + * slice queries, row cache hits, or non-query reads, like compaction. + */ + public void incrementReadCount() + { + if (readMeter != null) + readMeter.mark(); + } + + public static class SizeComparator implements Comparator + { + public int compare(SSTableReader o1, SSTableReader o2) + { + return Longs.compare(o1.onDiskLength(), o2.onDiskLength()); + } + } + - public Ref tryRef() ++ public Ref tryRef() + { - return refCounted.tryRef(); ++ return selfRef.tryRef(); + } + - public Ref sharedRef() ++ public Ref selfRef() + { - return refCounted.sharedRef(); ++ return selfRef; + } + - private static final class Tidier implements Tidy ++ public Ref ref() + { - private String name; - private CFMetaData metadata; - // indexfile and datafile: might be null before a call to load() - private SegmentedFile ifile; - private SegmentedFile dfile; ++ return selfRef.ref(); ++ } + - private IndexSummary indexSummary; - private IFilter bf; ++ void setup() ++ { ++ tidy.setup(this); ++ this.readMeter = tidy.global.readMeter; ++ } + - private AtomicBoolean isCompacted; ++ @VisibleForTesting ++ public void overrideReadMeter(RestorableMeter readMeter) ++ { ++ this.readMeter = tidy.global.readMeter = readMeter; ++ } + - /** - * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources, - * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple. - * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed. - * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources. - */ - private Object replaceLock = new Object(); - private SSTableReader replacedBy; - private SSTableReader replaces; - private SSTableDeletingTask deletingTask; ++ /** ++ * One instance per SSTableReader we create. This references the type-shared tidy, which in turn references ++ * the globally shared tidy, i.e. ++ * ++ * InstanceTidier => DescriptorTypeTitdy => GlobalTidy ++ * ++ * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example), but there can only be ++ * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy for one single logical sstable. ++ * ++ * When the InstanceTidier cleansup, it releases its reference to its DescriptorTypeTidy; when all InstanceTidiers ++ * for that type have run, the DescriptorTypeTidy cleansup. DescriptorTypeTidy behaves in the same way towards GlobalTidy. ++ * ++ * For ease, we stash a direct reference to both our type-shared and global tidier ++ */ ++ private static final class InstanceTidier implements Tidy ++ { ++ private final Descriptor descriptor; ++ private final CFMetaData metadata; ++ private IFilter bf; ++ private IndexSummary summary; ++ ++ private SegmentedFile dfile; ++ private SegmentedFile ifile; + private Runnable runOnClose; ++ private boolean isReplaced = false; + - @VisibleForTesting - public RestorableMeter readMeter; - private volatile ScheduledFuture readMeterSyncFuture; ++ // a reference to our shared per-Descriptor.Type tidy instance, that ++ // we will release when we are ourselves released ++ private Ref typeRef; + - private void setup(SSTableReader reader) - { - name = reader.toString(); - metadata = reader.metadata; - ifile = reader.ifile; - dfile = reader.dfile; - indexSummary = reader.indexSummary; - bf = reader.bf; - isCompacted = reader.isCompacted; - readMeterSyncFuture = reader.readMeterSyncFuture; - } ++ // a convenience stashing of the shared per-descriptor-type tidy instance itself ++ // and the per-logical-sstable globally shared state that it is linked to ++ private DescriptorTypeTidy type; ++ private GlobalTidy global; + - public String name() ++ private boolean setup; ++ ++ void setup(SSTableReader reader) + { - return name; ++ this.setup = true; ++ this.bf = reader.bf; ++ this.summary = reader.indexSummary; ++ this.dfile = reader.dfile; ++ this.ifile = reader.ifile; ++ // get a new reference to the shared descriptor-type tidy ++ this.typeRef = DescriptorTypeTidy.get(reader); ++ this.type = typeRef.get(); ++ this.global = type.globalRef.get(); + } + - private void dropPageCache() ++ InstanceTidier(Descriptor descriptor, CFMetaData metadata) + { - dropPageCache(dfile.path); - dropPageCache(ifile.path); ++ this.descriptor = descriptor; ++ this.metadata = metadata; + } + - private void dropPageCache(String filePath) ++ public void tidy() + { - RandomAccessFile file = null; ++ // don't try to cleanup if the sstablereader was never fully constructed ++ if (!setup) ++ return; + - try ++ final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId); ++ final OpOrder.Barrier barrier; ++ if (cfs != null) + { - file = new RandomAccessFile(filePath, "r"); - - int fd = CLibrary.getfd(file.getFD()); ++ barrier = cfs.readOrdering.newBarrier(); ++ barrier.issue(); ++ } ++ else ++ barrier = null; + - if (fd > 0) ++ ScheduledExecutors.nonPeriodicTasks.execute(new Runnable() ++ { ++ public void run() + { - if (logger.isDebugEnabled()) - logger.debug(String.format("Dropping page cache of file %s.", filePath)); - - CLibrary.trySkipCache(fd, 0, 0); ++ if (barrier != null) ++ barrier.await(); ++ bf.close(); ++ dfile.close(); ++ ifile.close(); ++ if (summary != null) ++ summary.close(); ++ if (runOnClose != null) ++ runOnClose.run(); ++ typeRef.release(); + } - } - catch (IOException e) - { - // we don't care if cache cleanup fails - } - finally - { - FileUtils.closeQuietly(file); - } ++ }); + } + - public void tidy() ++ public String name() + { - if (readMeterSyncFuture != null) - readMeterSyncFuture.cancel(false); ++ return descriptor.toString(); ++ } + - synchronized (replaceLock) - { - boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get(); ++ void releaseSummary() ++ { ++ summary.close(); ++ assert summary.isCleanedUp(); ++ summary = null; ++ } ++ } + - if (replacedBy != null) - { - closeBf = replacedBy.bf != bf; - closeSummary = replacedBy.indexSummary != indexSummary; - closeFiles = replacedBy.dfile != dfile; - // if the replacement sstablereader uses a different path, clean up our paths - deleteFiles = !dfile.path.equals(replacedBy.dfile.path); - } ++ /** ++ * One shared between all instances of a given Descriptor.Type. ++ * Performs only two things: the deletion of the sstables for the type, ++ * if necessary; and the shared reference to the globally shared state. ++ * ++ * All InstanceTidiers, on setup(), ask the static get() method for their shared state, ++ * and stash a reference to it to be released when they are. Once all such references are ++ * released, the shared tidy will be performed. ++ */ ++ static final class DescriptorTypeTidy implements Tidy ++ { ++ // keyed by REAL descriptor (TMPLINK/FINAL), mapping to the shared DescriptorTypeTidy for that descriptor ++ static final ConcurrentMap> lookup = new ConcurrentHashMap<>(); + - if (replaces != null) - { - closeBf &= replaces.bf != bf; - closeSummary &= replaces.indexSummary != indexSummary; - closeFiles &= replaces.dfile != dfile; - deleteFiles &= !dfile.path.equals(replaces.dfile.path); - } ++ private final Descriptor desc; ++ private final Ref globalRef; ++ private final SSTableDeletingTask deletingTask; + - boolean deleteAll = false; - if (isCompacted.get()) - { - assert replacedBy == null; - if (replaces != null && !deleteFiles) - { - replaces.tidy.replacedBy = null; - replaces.tidy.deletingTask = deletingTask; - replaces.markObsolete(); - } - else - { - deleteAll = true; - } - } - else - { - closeSummary &= indexSummary != null; - if (replaces != null) - replaces.tidy.replacedBy = replacedBy; - if (replacedBy != null) - replacedBy.tidy.replaces = replaces; - } ++ DescriptorTypeTidy(Descriptor desc, SSTableReader sstable) ++ { ++ this.desc = desc; ++ this.deletingTask = new SSTableDeletingTask(desc, sstable); ++ // get a new reference to the shared global tidy ++ this.globalRef = GlobalTidy.get(sstable); ++ } + - scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll); ++ public void tidy() ++ { ++ lookup.remove(desc); ++ boolean isCompacted = globalRef.get().isCompacted.get(); ++ globalRef.release(); ++ switch (desc.type) ++ { ++ case FINAL: ++ if (isCompacted) ++ deletingTask.run(); ++ break; ++ case TEMPLINK: ++ deletingTask.run(); ++ break; ++ default: ++ throw new IllegalStateException(); + } + } + - private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll) ++ public String name() + { - final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId); - final OpOrder.Barrier barrier; - if (cfs != null) ++ return desc.toString(); ++ } ++ ++ // get a new reference to the shared DescriptorTypeTidy for this sstable ++ public static Ref get(SSTableReader sstable) ++ { ++ Descriptor desc = sstable.descriptor; ++ if (sstable.openReason == OpenReason.EARLY) ++ desc = desc.asType(Descriptor.Type.TEMPLINK); ++ Ref refc = lookup.get(desc); ++ if (refc != null) ++ return refc.ref(); ++ final DescriptorTypeTidy tidy = new DescriptorTypeTidy(desc, sstable); ++ refc = new Ref<>(tidy, tidy); ++ Ref ex = lookup.putIfAbsent(desc, refc); ++ assert ex == null; ++ return refc; ++ } ++ } ++ ++ /** ++ * One instance per logical sstable. This both tracks shared cleanup and some shared state related ++ * to the sstable's lifecycle. All DescriptorTypeTidy instances, on construction, obtain a reference to us ++ * via our static get(). There should only ever be at most two such references extant at any one time, ++ * since only TMPLINK and FINAL type descriptors should be open as readers. When all files of both ++ * kinds have been released, this shared tidy will be performed. ++ */ ++ static final class GlobalTidy implements Tidy ++ { ++ // keyed by FINAL descriptor, mapping to the shared GlobalTidy for that descriptor ++ static final ConcurrentMap> lookup = new ConcurrentHashMap<>(); ++ ++ private final Descriptor desc; ++ // a single convenience property for getting the most recent version of an sstable, not related to tidying ++ private SSTableReader live; ++ // the readMeter that is shared between all instances of the sstable, and can be overridden in all of them ++ // at once also, for testing purposes ++ private RestorableMeter readMeter; ++ // the scheduled persistence of the readMeter, that we will cancel once all instances of this logical ++ // sstable have been released ++ private final ScheduledFuture readMeterSyncFuture; ++ // shared state managing if the logical sstable has been c