Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 62196 invoked from network); 1 Aug 2009 22:31:44 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 1 Aug 2009 22:31:44 -0000 Received: (qmail 57380 invoked by uid 500); 1 Aug 2009 22:31:48 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 57345 invoked by uid 500); 1 Aug 2009 22:31:48 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Received: (qmail 57335 invoked by uid 99); 1 Aug 2009 22:31:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 01 Aug 2009 22:31:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 01 Aug 2009 22:31:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B2B73238889B; Sat, 1 Aug 2009 22:31:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r799949 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ db/filter/ io/ Date: Sat, 01 Aug 2009 22:31:16 -0000 To: cassandra-commits@incubator.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090801223116.B2B73238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jbellis Date: Sat Aug 1 22:31:15 2009 New Revision: 799949 URL: http://svn.apache.org/viewvc?rev=799949&view=rev Log: r/m SequenceFile. ColumnGroupReader moved to SSTableSliceIterator mostly unchanged for now. (finish cleaning this up in #332) patch by jbellis; reviewed by Stu Hood for CASSANDRA-330 Removed: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=799949&r1=799948&r2=799949&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Sat Aug 1 22:31:15 2009 @@ -481,7 +481,7 @@ */ public void serialize(ColumnFamily columnFamily, DataOutput dos) throws IOException { - // TODO whenever we change this we need to change the code in SequenceFile to match in two places. + // TODO whenever we change this we need to change the code in SSTableSliceIterator to match. // This SUCKS and is inefficient to boot. let's fix this ASAP. Collection columns = columnFamily.getSortedColumns(); Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=799949&r1=799948&r2=799949&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Sat Aug 1 22:31:15 2009 @@ -1,15 +1,17 @@ package org.apache.cassandra.db.filter; import java.util.ArrayList; +import java.util.List; +import java.util.Arrays; +import java.util.Collections; import java.io.IOException; +import org.apache.commons.lang.ArrayUtils; + import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.io.DataOutputBuffer; -import org.apache.cassandra.io.DataInputBuffer; -import org.apache.cassandra.io.SequenceFile; -import org.apache.cassandra.io.SSTableReader; +import org.apache.cassandra.io.*; import org.apache.cassandra.config.DatabaseDescriptor; import com.google.common.collect.AbstractIterator; @@ -25,7 +27,7 @@ private int curColumnIndex; private ColumnFamily curCF = null; private ArrayList curColumns = new ArrayList(); - private SequenceFile.ColumnGroupReader reader; + private ColumnGroupReader reader; private AbstractType comparator; public SSTableSliceIterator(String filename, String key, String cfName, AbstractType comparator, byte[] startColumn, boolean isAscending) @@ -39,7 +41,7 @@ AbstractType comparator1 = DatabaseDescriptor.getComparator(ssTable.getTableName(), cfName); long position = ssTable.getPosition(decoratedKey); if (position >= 0) - reader = new SequenceFile.ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position); + reader = new ColumnGroupReader(ssTable.getFilename(), decoratedKey, cfName, comparator1, startColumn, isAscending, position); this.comparator = comparator; this.startColumn = startColumn; curColumnIndex = isAscending ? 0 : -1; @@ -127,4 +129,162 @@ { reader.close(); } + + /** + * This is a reader that finds the block for a starting column and returns + * blocks before/after it for each next call. This function assumes that + * the CF is sorted by name and exploits the name index. + */ + public static class ColumnGroupReader + { + private String key_; + private String cfName_; + private String cfType_; + private AbstractType comparator_; + private String subComparatorName_; + private boolean isAscending_; + + private List columnIndexList_; + private long columnStartPosition_; + private int curRangeIndex_; + private int allColumnsSize_; + private int localDeletionTime_; + private long markedForDeleteAt_; + private BufferedRandomAccessFile file_; + + public ColumnGroupReader(String filename, String key, String cfName, AbstractType comparator, byte[] startColumn, boolean isAscending, long position) throws IOException + { + this.file_ = new BufferedRandomAccessFile(filename, "r"); + this.cfName_ = cfName; + this.comparator_ = comparator; + this.subComparatorName_ = DatabaseDescriptor.getSubComparator(SSTableReader.parseTableName(filename), cfName).getClass().getCanonicalName(); + this.key_ = key; + this.isAscending_ = isAscending; + init(startColumn, position); + } + + /** + * Build a list of index entries ready for search. + */ + private List getFullColumnIndexList(List columnIndexList, int totalNumCols) + { + if (columnIndexList.size() == 0) + { + /* if there is no column index, add an index entry that covers the full space. */ + return Arrays.asList(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, totalNumCols, comparator_)); + } + + List fullColIndexList = new ArrayList(); + int accumulatededCols = 0; + for (IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList) + accumulatededCols += colPosInfo.count(); + int remainingCols = totalNumCols - accumulatededCols; + + fullColIndexList.add(new IndexHelper.ColumnIndexInfo(ArrayUtils.EMPTY_BYTE_ARRAY, 0, columnIndexList.get(0).count(), comparator_)); + for (int i = 0; i < columnIndexList.size() - 1; i++) + { + IndexHelper.ColumnIndexInfo colPosInfo = columnIndexList.get(i); + fullColIndexList.add(new IndexHelper.ColumnIndexInfo(colPosInfo.name(), + colPosInfo.position(), + columnIndexList.get(i + 1).count(), + comparator_)); + } + byte[] columnName = columnIndexList.get(columnIndexList.size() - 1).name(); + fullColIndexList.add(new IndexHelper.ColumnIndexInfo(columnName, + columnIndexList.get(columnIndexList.size() - 1).position(), + remainingCols, + comparator_)); + return fullColIndexList; + } + + private void init(byte[] startColumn, long position) throws IOException + { + file_.seek(position); + String keyInDisk = file_.readUTF(); + assert keyInDisk.equals(key_); + + /* read off the size of this row */ + int dataSize = file_.readInt(); + /* skip the bloomfilter */ + int totalBytesRead = IndexHelper.skipBloomFilter(file_); + /* read off the index flag, it has to be true */ + boolean hasColumnIndexes = file_.readBoolean(); + totalBytesRead += 1; + + /* read the index */ + List colIndexList = new ArrayList(); + if (hasColumnIndexes) + totalBytesRead += IndexHelper.deserializeIndex(SSTableReader.parseTableName(file_.getPath()), cfName_, file_, colIndexList); + + /* need to do two things here. + * 1. move the file pointer to the beginning of the list of stored columns + * 2. calculate the size of all columns */ + String cfName = file_.readUTF(); + cfType_ = file_.readUTF(); + String comparatorName = file_.readUTF(); + assert comparatorName.equals(comparator_.getClass().getCanonicalName()); + String subComparatorName = file_.readUTF(); // subcomparator + localDeletionTime_ = file_.readInt(); + markedForDeleteAt_ = file_.readLong(); + int totalNumCols = file_.readInt(); + allColumnsSize_ = dataSize - (totalBytesRead + 4 * 2 + cfName.length() + cfType_.length() + comparatorName.length() + subComparatorName.length() + 4 + 8 + 4); + + columnStartPosition_ = file_.getFilePointer(); + columnIndexList_ = getFullColumnIndexList(colIndexList, totalNumCols); + + if (startColumn.length == 0 && !isAscending_) + { + /* in this case, we assume that we want to scan from the largest column in descending order. */ + curRangeIndex_ = columnIndexList_.size() - 1; + } + else + { + int index = Collections.binarySearch(columnIndexList_, new IndexHelper.ColumnIndexInfo(startColumn, 0, 0, comparator_)); + curRangeIndex_ = index < 0 ? (++index) * (-1) - 1 : index; + } + } + + private boolean getBlockFromCurIndex(DataOutputBuffer bufOut) throws IOException + { + if (curRangeIndex_ < 0 || curRangeIndex_ >= columnIndexList_.size()) + return false; + IndexHelper.ColumnIndexInfo curColPostion = columnIndexList_.get(curRangeIndex_); + long start = curColPostion.position(); + long end = curRangeIndex_ < columnIndexList_.size() - 1 + ? columnIndexList_.get(curRangeIndex_+1).position() + : allColumnsSize_; + + /* seek to the correct offset to the data, and calculate the data size */ + file_.seek(columnStartPosition_ + start); + long dataSize = end - start; + + bufOut.reset(); + // write CF info + bufOut.writeUTF(cfName_); + bufOut.writeUTF(cfType_); + bufOut.writeUTF(comparator_.getClass().getCanonicalName()); + bufOut.writeUTF(subComparatorName_); + bufOut.writeInt(localDeletionTime_); + bufOut.writeLong(markedForDeleteAt_); + // now write the columns + bufOut.writeInt(curColPostion.count()); + bufOut.write(file_, (int)dataSize); + return true; + } + + public boolean getNextBlock(DataOutputBuffer outBuf) throws IOException + { + boolean result = getBlockFromCurIndex(outBuf); + if (isAscending_) + curRangeIndex_++; + else + curRangeIndex_--; + return result; + } + + public void close() throws IOException + { + file_.close(); + } + } } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java?rev=799949&r1=799948&r2=799949&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java Sat Aug 1 22:31:15 2009 @@ -19,11 +19,8 @@ package org.apache.cassandra.io; import java.io.IOException; -import java.io.DataInput; import java.util.Iterator; -import org.apache.cassandra.io.IFileReader; -import org.apache.cassandra.io.SSTableReader; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.ColumnFamily; Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=799949&r1=799948&r2=799949&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Sat Aug 1 22:31:15 2009 @@ -133,7 +133,7 @@ * @param columnIndexList the structure which is filled in with the deserialized index @return number of bytes read from the input * @throws IOException */ - static int deserializeIndex(String tableName, String cfName, DataInput in, List columnIndexList) throws IOException + public static int deserializeIndex(String tableName, String cfName, DataInput in, List columnIndexList) throws IOException { /* read only the column index list */ int columnIndexSize = in.readInt(); @@ -343,7 +343,7 @@ position_ = position; } - int count() + public int count() { return columnCount_; } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=799949&r1=799948&r2=799949&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Sat Aug 1 22:31:15 2009 @@ -67,7 +67,7 @@ return dataFile; } - static String parseTableName(String filename) + public static String parseTableName(String filename) { return new File(filename).getParentFile().getName(); } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=799949&r1=799948&r2=799949&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Sat Aug 1 22:31:15 2009 @@ -23,9 +23,7 @@ import org.apache.log4j.Logger; -import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.SequenceFile.ColumnGroupReader; import org.apache.cassandra.utils.BloomFilter; import org.apache.cassandra.utils.FileUtils; import org.apache.cassandra.service.StorageService; Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=799949&r1=799948&r2=799949&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java Sat Aug 1 22:31:15 2009 @@ -26,7 +26,7 @@ public SSTableWriter(String filename, int keyCount, IPartitioner partitioner) throws IOException { super(filename, partitioner); - dataWriter = SequenceFile.bufferedWriter(dataFile, 4 * 1024 * 1024); + dataWriter = new AbstractWriter.BufferWriter(dataFile, 4 * 1024 * 1024); indexRAF = new BufferedRandomAccessFile(indexFilename(), "rw", 1024 * 1024); bf = new BloomFilter(keyCount, 15); }