Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 87F1318B84 for ; Mon, 27 Apr 2015 06:57:06 +0000 (UTC) Received: (qmail 63337 invoked by uid 500); 27 Apr 2015 06:57:06 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 63248 invoked by uid 500); 27 Apr 2015 06:57:06 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 63053 invoked by uid 99); 27 Apr 2015 06:57:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Apr 2015 06:57:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 14A90E042F; Mon, 27 Apr 2015 06:57:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marcuse@apache.org To: commits@cassandra.apache.org Date: Mon, 27 Apr 2015 06:57:08 -0000 Message-Id: <63132b73f3f94ef6ad674add3f512637@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk Merge branch 'cassandra-2.1' into trunk Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d3b53aa4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d3b53aa4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d3b53aa4 Branch: refs/heads/trunk Commit: d3b53aa424d0fc70ab4e845165011508d3318de6 Parents: 246f07b 1262889 Author: Marcus Eriksson Authored: Mon Apr 27 08:54:29 2015 +0200 Committer: Marcus Eriksson Committed: Mon Apr 27 08:54:29 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/io/sstable/format/big/BigTableScanner.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d3b53aa4/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d3b53aa4/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index 01b1f23,0000000..cea9cfa mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@@ -1,366 -1,0 +1,366 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.IOException; +import java.util.*; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.RateLimiter; + +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; +import org.apache.cassandra.db.columniterator.LazyColumnIterator; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.AbstractBounds.Boundary; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.dht.AbstractBounds.isEmpty; +import static org.apache.cassandra.dht.AbstractBounds.maxLeft; +import static org.apache.cassandra.dht.AbstractBounds.minRight; + +public class BigTableScanner implements ISSTableScanner +{ + protected final RandomAccessReader dfile; + protected final RandomAccessReader ifile; + public final SSTableReader sstable; + + private final Iterator> rangeIterator; + private AbstractBounds currentRange; + + private final DataRange dataRange; + private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; + + protected Iterator iterator; + + public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) + { + return new BigTableScanner(sstable, dataRange, limiter); + } + public static ISSTableScanner getScanner(SSTableReader sstable, Collection> tokenRanges, RateLimiter limiter) + { + // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) + List> positions = sstable.getPositionsForRanges(tokenRanges); + if (positions.isEmpty()) + return new EmptySSTableScanner(sstable.getFilename()); + + return new BigTableScanner(sstable, tokenRanges, limiter); + } + + /** + * @param sstable SSTable to scan; must not be null + * @param dataRange a single range to scan; must not be null + * @param limiter background i/o RateLimiter; may be null + */ + private BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) + { + assert sstable != null; + + this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); + this.ifile = sstable.openIndexReader(); + this.sstable = sstable; + this.dataRange = dataRange; + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + + List> boundsList = new ArrayList<>(2); + addRange(dataRange.keyRange(), boundsList); + this.rangeIterator = boundsList.iterator(); + } + + /** + * @param sstable SSTable to scan; must not be null + * @param tokenRanges A set of token ranges to scan + * @param limiter background i/o RateLimiter; may be null + */ + private BigTableScanner(SSTableReader sstable, Collection> tokenRanges, RateLimiter limiter) + { + assert sstable != null; + + this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); + this.ifile = sstable.openIndexReader(); + this.sstable = sstable; + this.dataRange = null; + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + + List> boundsList = new ArrayList<>(tokenRanges.size()); + for (Range range : Range.normalize(tokenRanges)) + addRange(Range.makeRowRange(range), boundsList); + + this.rangeIterator = boundsList.iterator(); + } + + private void addRange(AbstractBounds requested, List> boundsList) + { + if (requested instanceof Range && ((Range)requested).isWrapAround()) + { + if (requested.right.compareTo(sstable.first) >= 0) + { + // since we wrap, we must contain the whole sstable prior to stopKey() + Boundary left = new Boundary(sstable.first, true); + Boundary right; + right = requested.rightBoundary(); + right = minRight(right, sstable.last, true); + if (!isEmpty(left, right)) + boundsList.add(AbstractBounds.bounds(left, right)); + } + if (requested.left.compareTo(sstable.last) <= 0) + { + // since we wrap, we must contain the whole sstable after dataRange.startKey() + Boundary right = new Boundary(sstable.last, true); + Boundary left; + left = requested.leftBoundary(); + left = maxLeft(left, sstable.first, true); + if (!isEmpty(left, right)) + boundsList.add(AbstractBounds.bounds(left, right)); + } + } + else + { + assert requested.left.compareTo(requested.right) <= 0 || requested.right.isMinimum(); + Boundary left, right; + left = requested.leftBoundary(); + right = requested.rightBoundary(); + left = maxLeft(left, sstable.first, true); + // apparently isWrapAround() doesn't count Bounds that extend to the limit (min) as wrapping + right = requested.right.isMinimum() ? new Boundary(sstable.last, true) + : minRight(right, sstable.last, true); + if (!isEmpty(left, right)) + boundsList.add(AbstractBounds.bounds(left, right)); + } + } + + private void seekToCurrentRangeStart() + { + long indexPosition = sstable.getIndexScanPosition(currentRange.left); + ifile.seek(indexPosition); + try + { + + while (!ifile.isEOF()) + { + indexPosition = ifile.getFilePointer(); + DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); + if (indexDecoratedKey.compareTo(currentRange.left) > 0 || currentRange.contains(indexDecoratedKey)) + { + // Found, just read the dataPosition and seek into index and data files + long dataPosition = ifile.readLong(); + ifile.seek(indexPosition); + dfile.seek(dataPosition); + break; + } + else + { + RowIndexEntry.Serializer.skip(ifile); + } + } + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + } + + public void close() throws IOException + { + FileUtils.close(dfile, ifile); + } + + public long getLengthInBytes() + { + return dfile.length(); + } + + public long getCurrentPosition() + { + return dfile.getFilePointer(); + } + + public String getBackingFiles() + { + return sstable.toString(); + } + + public boolean hasNext() + { + if (iterator == null) + iterator = createIterator(); + return iterator.hasNext(); + } + + public OnDiskAtomIterator next() + { + if (iterator == null) + iterator = createIterator(); + return iterator.next(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + private Iterator createIterator() + { + return new KeyScanningIterator(); + } + + protected class KeyScanningIterator extends AbstractIterator + { + private DecoratedKey nextKey; + private RowIndexEntry nextEntry; + private DecoratedKey currentKey; + private RowIndexEntry currentEntry; + + protected OnDiskAtomIterator computeNext() + { + try + { + if (nextEntry == null) + { + do + { + // we're starting the first range or we just passed the end of the previous range + if (!rangeIterator.hasNext()) + return endOfData(); + + currentRange = rangeIterator.next(); + seekToCurrentRangeStart(); + + if (ifile.isEOF()) + return endOfData(); + + currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); + currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version); + } while (!currentRange.contains(currentKey)); + } + else + { + // we're in the middle of a range + currentKey = nextKey; + currentEntry = nextEntry; + } + + if (ifile.isEOF()) + { + nextEntry = null; + nextKey = null; + } + else + { + // we need the position of the start of the next key, regardless of whether it falls in the current range + nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); + nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version); + + if (!currentRange.contains(nextKey)) + { + nextKey = null; + nextEntry = null; + } + } + + if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey())) + { + dfile.seek(currentEntry.position + currentEntry.headerOffset()); + ByteBufferUtil.readWithShortLength(dfile); // key + return new SSTableIdentityIterator(sstable, dfile, currentKey); + } + + return new LazyColumnIterator(currentKey, new IColumnIteratorFactory() + { + public OnDiskAtomIterator create() + { + return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry); + } + }); + + } - catch (IOException e) ++ catch (CorruptSSTableException | IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + } + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "(" + + "dfile=" + dfile + + " ifile=" + ifile + + " sstable=" + sstable + + ")"; + } + + public static class EmptySSTableScanner implements ISSTableScanner + { + private final String filename; + + public EmptySSTableScanner(String filename) + { + this.filename = filename; + } + + public long getLengthInBytes() + { + return 0; + } + + public long getCurrentPosition() + { + return 0; + } + + public String getBackingFiles() + { + return filename; + } + + public boolean hasNext() + { + return false; + } + + public OnDiskAtomIterator next() + { + return null; + } + + public void close() throws IOException { } + + public void remove() { } + } + + +}