cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1149628 [1/2] - in /cassandra/trunk: src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/io/u...
Date Fri, 22 Jul 2011 14:56:29 GMT
Author: jbellis
Date: Fri Jul 22 14:56:25 2011
New Revision: 1149628

URL: http://svn.apache.org/viewvc?rev=1149628&view=rev
Log:
split BRAF into RandomAccessReader and SequentialWriter
patch by pyaskevich and jbellis for CASSANDRA-2921

Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/SequentialWriter.java
Removed:
    cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java
    cassandra/trunk/test/long/org/apache/cassandra/db/LongTableTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java Fri Jul 22 14:56:25 2011
@@ -37,8 +37,7 @@ import org.apache.cassandra.db.compactio
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.compaction.CompactionType;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -243,7 +242,7 @@ public abstract class AutoSavingCache<K,
             logger.debug("Saving {}", path);
             File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile());
 
-            BufferedRandomAccessFile out = new BufferedRandomAccessFile(tmpFile, "rw", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
+            DataOutputStream out = SequentialWriter.open(tmpFile, true).stream;
             try
             {
                 for (K key : keys)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Fri Jul 22 14:56:25 2011
@@ -31,6 +31,7 @@ import java.util.zip.Checksum;
 import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -42,8 +43,6 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.DeletionService;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -204,7 +203,7 @@ public class CommitLog implements Commit
             final long segment = CommitLogSegment.idFromFilename(file.getName());
 
             int bufferSize = (int) Math.min(Math.max(file.length(), 1), 32 * 1024 * 1024);
-            BufferedRandomAccessFile reader = new BufferedRandomAccessFile(new File(file.getAbsolutePath()), "r", bufferSize, true);
+            RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), bufferSize, true);
             assert reader.length() <= Integer.MAX_VALUE;
 
             try

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Fri Jul 22 14:56:25 2011
@@ -31,13 +31,13 @@ import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.net.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 
 public class CommitLogSegment
 {
@@ -45,7 +45,7 @@ public class CommitLogSegment
     private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile("CommitLog-(\\d+).log");
 
     public final long id;
-    private final BufferedRandomAccessFile logWriter;
+    private final SequentialWriter logWriter;
 
     // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we could delete this segment
     public final Set<Integer> cfDirty = new HashSet<Integer>();
@@ -88,9 +88,9 @@ public class CommitLogSegment
         return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
     }
 
-    private static BufferedRandomAccessFile createWriter(String file) throws IOException
+    private static SequentialWriter createWriter(String file) throws IOException
     {
-        return new BufferedRandomAccessFile(new File(file), "rw", 128 * 1024, true);
+        return SequentialWriter.open(new File(file), 128 * 1024, true);
     }
 
     public ReplayPosition write(RowMutation rowMutation) throws IOException
@@ -106,18 +106,18 @@ public class CommitLogSegment
             Checksum checksum = new CRC32();
             byte[] serializedRow = rowMutation.getSerializedBuffer(MessagingService.version_);
             checksum.update(serializedRow.length);
-            logWriter.writeInt(serializedRow.length);
-            logWriter.writeLong(checksum.getValue());
+            logWriter.stream.writeInt(serializedRow.length);
+            logWriter.stream.writeLong(checksum.getValue());
             logWriter.write(serializedRow);
             checksum.update(serializedRow, 0, serializedRow.length);
-            logWriter.writeLong(checksum.getValue());
+            logWriter.stream.writeLong(checksum.getValue());
 
             return cLogCtx;
         }
         catch (IOException e)
         {
             if (currentPosition != -1)
-                logWriter.seek(currentPosition);
+                logWriter.truncate(currentPosition);
             throw e;
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Fri Jul 22 14:56:25 2011
@@ -42,8 +42,8 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.*;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
@@ -470,9 +470,9 @@ public class CompactionManager implement
         // we'll also loop through the index at the same time, using the position from the index to recover if the
         // row header (key or data size) is corrupt. (This means our position in the index file will be one row
         // "ahead" of the data file.)
-        final BufferedRandomAccessFile dataFile = BufferedRandomAccessFile.getUncachingReader(sstable.getFilename());
+        final RandomAccessReader dataFile = RandomAccessReader.open(new File(sstable.getFilename()), true);
         String indexFilename = sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
-        BufferedRandomAccessFile indexFile = BufferedRandomAccessFile.getUncachingReader(indexFilename);
+        RandomAccessReader indexFile = RandomAccessReader.open(new File(indexFilename), true);
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -559,7 +559,7 @@ public class CompactionManager implement
                     {
                         throwIfFatal(th);
                         logger.warn("Non-fatal error reading row (stacktrace follows)", th);
-                        writer.reset();
+                        writer.resetAndTruncate();
 
                         if (currentIndexKey != null
                             && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
@@ -589,7 +589,7 @@ public class CompactionManager implement
                                     throw new IOError(th2);
 
                                 logger.warn("Retry failed too.  Skipping to next row (retry's stacktrace follows)", th2);
-                                writer.reset();
+                                writer.resetAndTruncate();
                                 dataFile.seek(nextRowPositionFromIndex);
                                 badRows++;
                             }
@@ -1119,9 +1119,9 @@ public class CompactionManager implement
 
     private static class ScrubInfo implements CompactionInfo.Holder
     {
-        private final BufferedRandomAccessFile dataFile;
+        private final RandomAccessReader dataFile;
         private final SSTableReader sstable;
-        public ScrubInfo(BufferedRandomAccessFile dataFile, SSTableReader sstable)
+        public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
         {
             this.dataFile = dataFile;
             this.sstable = sstable;

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java Fri Jul 22 14:56:25 2011
@@ -24,19 +24,18 @@ package org.apache.cassandra.io.sstable;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.Iterator;
 
 import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CloseableIterator;
 
 public class KeyIterator extends AbstractIterator<DecoratedKey> implements CloseableIterator<DecoratedKey>
 {
-    private final BufferedRandomAccessFile in;
+    private final RandomAccessReader in;
     private final Descriptor desc;
 
     public KeyIterator(Descriptor desc)
@@ -44,10 +43,7 @@ public class KeyIterator extends Abstrac
         this.desc = desc;
         try
         {
-            in = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)),
-                                              "r",
-                                              BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE,
-                                              true);
+            in = RandomAccessReader.open(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), true);
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Fri Jul 22 14:56:25 2011
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -168,7 +168,7 @@ public abstract class SSTable
     }
 
     /** @return An estimate of the number of keys contained in the given data file. */
-    static long estimateRowsFromData(Descriptor desc, BufferedRandomAccessFile dfile) throws IOException
+    static long estimateRowsFromData(Descriptor desc, RandomAccessReader dfile) throws IOException
     {
         // collect sizes for the first 1000 keys, or first 100 megabytes of data
         final int SAMPLES_CAP = 1000, BYTES_CAP = (int)Math.min(100000000, dfile.length());
@@ -187,7 +187,7 @@ public abstract class SSTable
     }
 
     /** @return An estimate of the number of keys contained in the given index file. */
-    static long estimateRowsFromIndex(BufferedRandomAccessFile ifile) throws IOException
+    static long estimateRowsFromIndex(RandomAccessReader ifile) throws IOException
     {
         // collect sizes for the first 10000 keys, or first 10 megabytes of data
         final int SAMPLES_CAP = 10000, BYTES_CAP = (int)Math.min(10000000, ifile.length());

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Fri Jul 22 14:56:25 2011
@@ -36,7 +36,7 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.BytesReadTracker;
 
 public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
@@ -70,7 +70,7 @@ public class SSTableIdentityIterator imp
      * @param dataSize length of row data
      * @throws IOException
      */
-    public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize)
+    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize)
     throws IOException
     {
         this(sstable, file, key, dataStart, dataSize, false);
@@ -86,7 +86,7 @@ public class SSTableIdentityIterator imp
      * @param checkData if true, do its best to deserialize and check the coherence of row data
      * @throws IOException
      */
-    public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean checkData)
+    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize, boolean checkData)
     throws IOException
     {
         this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, false);
@@ -114,9 +114,9 @@ public class SSTableIdentityIterator imp
 
         try
         {
-            if (input instanceof BufferedRandomAccessFile)
+            if (input instanceof RandomAccessReader)
             {
-                BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+                RandomAccessReader file = (RandomAccessReader) input;
                 file.seek(this.dataStart);
                 if (checkData)
                 {
@@ -150,9 +150,9 @@ public class SSTableIdentityIterator imp
             ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, inputWithTracker);
             columnCount = inputWithTracker.readInt();
 
-            if (input instanceof BufferedRandomAccessFile)
+            if (input instanceof RandomAccessReader)
             {
-                BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+                RandomAccessReader file = (RandomAccessReader) input;
                 columnPosition = file.getFilePointer();
             }
         }
@@ -174,9 +174,9 @@ public class SSTableIdentityIterator imp
 
     public boolean hasNext()
     {
-        if (input instanceof BufferedRandomAccessFile)
+        if (input instanceof RandomAccessReader)
         {
-            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            RandomAccessReader file = (RandomAccessReader) input;
             return file.getFilePointer() < finishedAt;
         }
         else
@@ -217,9 +217,9 @@ public class SSTableIdentityIterator imp
     public String getPath()
     {
         // if input is from file, then return that path, otherwise it's from streaming
-        if (input instanceof BufferedRandomAccessFile)
+        if (input instanceof RandomAccessReader)
         {
-            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            RandomAccessReader file = (RandomAccessReader) input;
             return file.getPath();
         }
         else
@@ -231,9 +231,9 @@ public class SSTableIdentityIterator imp
     public void echoData(DataOutput out) throws IOException
     {
         // only effective when input is from file
-        if (input instanceof BufferedRandomAccessFile)
+        if (input instanceof RandomAccessReader)
         {
-            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            RandomAccessReader file = (RandomAccessReader) input;
             file.seek(dataStart);
             while (file.getFilePointer() < finishedAt)
             {
@@ -249,9 +249,9 @@ public class SSTableIdentityIterator imp
     public ColumnFamily getColumnFamilyWithColumns() throws IOException
     {
         ColumnFamily cf = columnFamily.cloneMeShallow();
-        if (input instanceof BufferedRandomAccessFile)
+        if (input instanceof RandomAccessReader)
         {
-            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            RandomAccessReader file = (RandomAccessReader) input;
             file.seek(columnPosition - 4); // seek to before column count int
             ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, false, fromRemote);
         }
@@ -282,9 +282,9 @@ public class SSTableIdentityIterator imp
     public void reset()
     {
         // only effective when input is from file
-        if (input instanceof BufferedRandomAccessFile)
+        if (input instanceof RandomAccessReader)
         {
-            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            RandomAccessReader file = (RandomAccessReader) input;
             try
             {
                 file.seek(columnPosition);

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Jul 22 14:56:25 2011
@@ -39,10 +39,7 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
 
@@ -255,10 +252,7 @@ public class SSTableReader extends SSTab
         SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 
         // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
-        BufferedRandomAccessFile input = new BufferedRandomAccessFile(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
-                                                                      "r",
-                                                                      BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE,
-                                                                      true);
+        RandomAccessReader input = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
         try
         {
             if (keyCache != null && keyCache.getCapacity() - keyCache.size() < keysToLoadInCache.size())

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Fri Jul 22 14:56:25 2011
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CloseableIterator;
 
@@ -39,7 +39,7 @@ public class SSTableScanner implements C
 {
     private static Logger logger = LoggerFactory.getLogger(SSTableScanner.class);
 
-    protected final BufferedRandomAccessFile file;
+    protected final RandomAccessReader file;
     public final SSTableReader sstable;
     private IColumnIterator row;
     protected boolean exhausted = false;
@@ -53,7 +53,7 @@ public class SSTableScanner implements C
     {
         try
         {
-            this.file = new BufferedRandomAccessFile(new File(sstable.getFilename()), "r", bufferSize, skipCache);
+            this.file = RandomAccessReader.open(new File(sstable.getFilename()), skipCache);
         }
         catch (IOException e)
         {
@@ -70,7 +70,7 @@ public class SSTableScanner implements C
     {
         try
         {
-            this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", bufferSize);
+            this.file = RandomAccessReader.open(new File(sstable.getFilename()), bufferSize);
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Jul 22 14:56:25 2011
@@ -36,10 +36,9 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.utils.BloomFilter;
@@ -52,7 +51,7 @@ public class SSTableWriter extends SSTab
 
     private IndexWriter iwriter;
     private SegmentedFile.Builder dbuilder;
-    private final BufferedRandomAccessFile dataFile;
+    private final SequentialWriter dataFile;
     private DecoratedKey lastWrittenKey;
     private FileMark dataMark;
     private SSTableMetadata.Collector sstableMetadataCollector;
@@ -78,7 +77,7 @@ public class SSTableWriter extends SSTab
               partitioner);
         iwriter = new IndexWriter(descriptor, partitioner, keyCount);
         dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-        dataFile = new BufferedRandomAccessFile(new File(getFilename()), "rw", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
+        dataFile = SequentialWriter.open(new File(getFilename()), true);
         this.sstableMetadataCollector = sstableMetadataCollector;
     }
     
@@ -88,12 +87,12 @@ public class SSTableWriter extends SSTab
         iwriter.mark();
     }
 
-    public void reset()
+    public void resetAndTruncate()
     {
         try
         {
-            dataFile.reset(dataMark);
-            iwriter.reset();
+            dataFile.resetAndTruncate(dataMark);
+            iwriter.resetAndTruncate();
         }
         catch (IOException e)
         {
@@ -130,8 +129,8 @@ public class SSTableWriter extends SSTab
     public long append(AbstractCompactedRow row) throws IOException
     {
         long currentPosition = beforeAppend(row.key);
-        ByteBufferUtil.writeWithShortLength(row.key.key, dataFile);
-        row.write(dataFile);
+        ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream);
+        row.write(dataFile.stream);
         // max timestamp is not collected here, because we want to avoid deserializing an EchoedRow
         // instead, it is collected when calling ColumnFamilyStore.createCompactionWriter
         sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
@@ -143,16 +142,16 @@ public class SSTableWriter extends SSTab
     public void append(DecoratedKey decoratedKey, ColumnFamily cf) throws IOException
     {
         long startPosition = beforeAppend(decoratedKey);
-        ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile);
+        ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
 
         // serialize index and bloom filter into in-memory structure
         ColumnIndexer.RowHeader header = ColumnIndexer.serialize(cf);
 
         // write out row size
-        dataFile.writeLong(header.serializedSize() + cf.serializedSizeForSSTable());
+        dataFile.stream.writeLong(header.serializedSize() + cf.serializedSizeForSSTable());
 
         // write out row header and data
-        int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, header, dataFile);
+        int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, header, dataFile.stream);
         afterAppend(decoratedKey, startPosition);
 
         // track max column timestamp
@@ -164,10 +163,10 @@ public class SSTableWriter extends SSTab
     public void append(DecoratedKey decoratedKey, ByteBuffer value) throws IOException
     {
         long currentPosition = beforeAppend(decoratedKey);
-        ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile);
+        ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
         assert value.remaining() > 0;
-        dataFile.writeLong(value.remaining());
-        ByteBufferUtil.write(value, dataFile);
+        dataFile.stream.writeLong(value.remaining());
+        ByteBufferUtil.write(value, dataFile.stream);
         afterAppend(decoratedKey, currentPosition);
     }
 
@@ -233,11 +232,8 @@ public class SSTableWriter extends SSTab
 
     private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata) throws IOException
     {
-        BufferedRandomAccessFile out = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_STATS)),
-                                                                     "rw",
-                                                                     BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE,
-                                                                     true);
-        SSTableMetadata.serializer.serialize(sstableMetadata, out);
+        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), true);
+        SSTableMetadata.serializer.serialize(sstableMetadata, out.stream);
         out.close();
     }
 
@@ -382,7 +378,7 @@ public class SSTableWriter extends SSTab
     static class RowIndexer implements Closeable
     {
         protected final Descriptor desc;
-        public final BufferedRandomAccessFile dfile;
+        public final RandomAccessReader dfile;
         private final OperationType type;
 
         protected IndexWriter iwriter;
@@ -391,10 +387,10 @@ public class SSTableWriter extends SSTab
 
         RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
         {
-            this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type);
+            this(desc, RandomAccessReader.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true), cfs, type);
         }
 
-        protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, ColumnFamilyStore cfs, OperationType type) throws IOException
+        protected RowIndexer(Descriptor desc, RandomAccessReader dfile, ColumnFamilyStore cfs, OperationType type) throws IOException
         {
             this.desc = desc;
             this.dfile = dfile;
@@ -549,12 +545,12 @@ public class SSTableWriter extends SSTab
      */
     static class CommutativeRowIndexer extends RowIndexer
     {
-        protected BufferedRandomAccessFile writerDfile;
+        protected SequentialWriter writerDfile;
 
         CommutativeRowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
         {
-            super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type);
-            writerDfile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true);
+            super(desc, RandomAccessReader.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true), cfs, type);
+            writerDfile = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true);
         }
 
         @Override
@@ -583,8 +579,8 @@ public class SSTableWriter extends SSTab
                 // update index writer
                 iwriter.afterAppend(key, writerDfile.getFilePointer());
                 // write key and row
-                ByteBufferUtil.writeWithShortLength(key.key, writerDfile);
-                row.write(writerDfile);
+                ByteBufferUtil.writeWithShortLength(key.key, writerDfile.stream);
+                row.write(writerDfile.stream);
 
                 rows++;
             }
@@ -593,7 +589,7 @@ public class SSTableWriter extends SSTab
             if (writerDfile.getFilePointer() != dfile.getFilePointer())
             {
                 // truncate file to new, reduced length
-                writerDfile.setLength(writerDfile.getFilePointer());
+                writerDfile.truncate(writerDfile.getFilePointer());
             }
             writerDfile.sync();
 
@@ -613,7 +609,7 @@ public class SSTableWriter extends SSTab
      */
     static class IndexWriter implements Closeable
     {
-        private final BufferedRandomAccessFile indexFile;
+        private final SequentialWriter indexFile;
         public final Descriptor desc;
         public final IPartitioner partitioner;
         public final SegmentedFile.Builder builder;
@@ -625,7 +621,7 @@ public class SSTableWriter extends SSTab
         {
             this.desc = desc;
             this.partitioner = part;
-            indexFile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "rw", 8 * 1024 * 1024, true);
+            indexFile = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), 8 * 1024 * 1024, true);
             builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
             summary = new IndexSummary(keyCount);
             bf = BloomFilter.getFilter(keyCount, 15);
@@ -635,8 +631,8 @@ public class SSTableWriter extends SSTab
         {
             bf.add(key.key);
             long indexPosition = indexFile.getFilePointer();
-            ByteBufferUtil.writeWithShortLength(key.key, indexFile);
-            indexFile.writeLong(dataPosition);
+            ByteBufferUtil.writeWithShortLength(key.key, indexFile.stream);
+            indexFile.stream.writeLong(dataPosition);
             if (logger.isTraceEnabled())
                 logger.trace("wrote index of " + key + " at " + indexPosition);
 
@@ -671,12 +667,12 @@ public class SSTableWriter extends SSTab
             mark = indexFile.mark();
         }
 
-        public void reset() throws IOException
+        public void resetAndTruncate() throws IOException
         {
             // we can't un-set the bloom filter addition, but extra keys in there are harmless.
             // we can't reset dbuilder either, but that is the last thing called in afterappend so
             // we assume that if that worked then we won't be trying to reset.
-            indexFile.reset(mark);
+            indexFile.resetAndTruncate(mark);
         }
 
         public String toString()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java Fri Jul 22 14:56:25 2011
@@ -59,7 +59,7 @@ public class BufferedSegmentedFile exten
     {
         try
         {
-            BufferedRandomAccessFile file = new BufferedRandomAccessFile(path, "r", bufferSize);
+            RandomAccessReader file = RandomAccessReader.open(new File(path), bufferSize);
             file.seek(position);
             return file;
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java Fri Jul 22 14:56:25 2011
@@ -88,7 +88,7 @@ public class MmappedSegmentedFile extend
         try
         {
             // FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row
-            BufferedRandomAccessFile file = new BufferedRandomAccessFile(path, "r", bufferSize);
+            RandomAccessReader file = RandomAccessReader.open(new File(path), bufferSize);
             file.seek(position);
             return file;
         }

Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/RandomAccessReader.java?rev=1149628&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/RandomAccessReader.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/RandomAccessReader.java Fri Jul 22 14:56:25 2011
@@ -0,0 +1,334 @@
+package org.apache.cassandra.io.util;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+
+import org.apache.cassandra.utils.CLibrary;
+
+public class RandomAccessReader extends RandomAccessFile implements FileDataInput
+{
+    public static final long MAX_BYTES_IN_PAGE_CACHE = (long) Math.pow(2, 27); // 128mb
+
+    // default buffer size, 64Kb
+    public static final int DEFAULT_BUFFER_SIZE = 65536;
+
+    // absolute filesystem path to the file
+    private final String filePath;
+
+    // buffer which will cache file blocks
+    protected byte[] buffer;
+
+    // `current` as current position in file
+    // `bufferOffset` is the offset of the beginning of the buffer
+    // `markedPointer` folds the offset of the last file mark
+    protected long bufferOffset, current = 0, markedPointer;
+    // `validBufferBytes` is the number of bytes in the buffer that are actually valid;
+    //  this will be LESS than buffer capacity if buffer is not full!
+    protected int validBufferBytes = 0;
+
+    // channel liked with the file, used to retrieve data and force updates.
+    private final FileChannel channel;
+
+    private final boolean skipIOCache;
+
+    // file descriptor
+    private final int fd;
+
+    // used if skip I/O cache was enabled
+    private long bytesSinceCacheFlush = 0;
+
+    private final long fileLength;
+
+    public RandomAccessReader(File file, int bufferSize, boolean skipIOCache) throws IOException
+    {
+        super(file, "r");
+
+        channel = super.getChannel();
+        filePath = file.getAbsolutePath();
+
+        // allocating required size of the buffer
+        if (bufferSize <= 0)
+            throw new IllegalArgumentException("bufferSize must be positive");
+        buffer = new byte[bufferSize];
+
+        this.skipIOCache = skipIOCache;
+        fd = CLibrary.getfd(getFD());
+
+        // we can cache file length in read-only mode
+        fileLength = channel.size();
+        validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations
+    }
+
+    public static RandomAccessReader open(File file, boolean skipIOCache) throws IOException
+    {
+        return open(file, DEFAULT_BUFFER_SIZE, skipIOCache);
+    }
+
+    public static RandomAccessReader open(File file) throws IOException
+    {
+        return open(file, DEFAULT_BUFFER_SIZE, false);
+    }
+
+    public static RandomAccessReader open(File file, int bufferSize) throws IOException
+    {
+        return open(file, bufferSize, false);
+    }
+
+    public static RandomAccessReader open(File file, int bufferSize, boolean skipIOCache) throws IOException
+    {
+        return new RandomAccessReader(file, bufferSize, skipIOCache);
+    }
+
+    // convert open into open
+    public static RandomAccessReader open(SequentialWriter writer) throws IOException
+    {
+        return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Read data from file starting from current currentOffset to populate buffer.
+     * @throws IOException on any I/O error.
+     */
+    protected void reBuffer() throws IOException
+    {
+        resetBuffer();
+
+        if (bufferOffset >= channel.size())
+            return;
+
+        channel.position(bufferOffset); // setting channel position
+
+        int read = 0;
+
+        while (read < buffer.length)
+        {
+            int n = super.read(buffer, read, buffer.length - read);
+            if (n < 0)
+                break;
+            read += n;
+        }
+
+        validBufferBytes = read;
+
+        bytesSinceCacheFlush += read;
+
+        if (skipIOCache && bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
+        {
+            // with random I/O we can't control what we are skipping so
+            // it will be more appropriate to just skip a whole file after
+            // we reach threshold
+            CLibrary.trySkipCache(this.fd, 0, 0);
+            bytesSinceCacheFlush = 0;
+        }
+    }
+
+    @Override
+    public long getFilePointer()
+    {
+        return current;
+    }
+
+    public String getPath()
+    {
+        return filePath;
+    }
+
+    public void reset() throws IOException
+    {
+        seek(markedPointer);
+    }
+
+    public long bytesPastMark()
+    {
+        long bytes = current - markedPointer;
+        assert bytes >= 0;
+        return bytes;
+    }
+
+    public FileMark mark()
+    {
+        markedPointer = current;
+        return new BufferedRandomAccessFileMark(markedPointer);
+    }
+
+    public void reset(FileMark mark) throws IOException
+    {
+        assert mark instanceof BufferedRandomAccessFileMark;
+        seek(((BufferedRandomAccessFileMark) mark).pointer);
+    }
+
+    public long bytesPastMark(FileMark mark)
+    {
+        assert mark instanceof BufferedRandomAccessFileMark;
+        long bytes = current - ((BufferedRandomAccessFileMark) mark).pointer;
+        assert bytes >= 0;
+        return bytes;
+    }
+
+    /**
+     * @return true if there is no more data to read
+     * @throws IOException on any I/O error.
+     */
+    public boolean isEOF() throws IOException
+    {
+        return getFilePointer() == length();
+    }
+
+    public long bytesRemaining() throws IOException
+    {
+        return length() - getFilePointer();
+    }
+
+    protected int bufferCursor()
+    {
+        return (int) (current - bufferOffset);
+    }
+
+    protected void resetBuffer()
+    {
+        bufferOffset = current;
+        validBufferBytes = 0;
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        buffer = null;
+
+        if (skipIOCache && bytesSinceCacheFlush > 0)
+            CLibrary.trySkipCache(fd, 0, 0);
+
+        super.close();
+    }
+
+    @Override
+    public String toString()
+    {
+        return getClass().getSimpleName() + "(" + "filePath='" + filePath + "'" + ", skipIOCache=" + skipIOCache + ")";
+    }
+
+    /**
+     * Class to hold a mark to the position of the file
+     */
+    protected static class BufferedRandomAccessFileMark implements FileMark
+    {
+        long pointer;
+
+        public BufferedRandomAccessFileMark(long pointer)
+        {
+            this.pointer = pointer;
+        }
+    }
+
+    @Override
+    public void seek(long newPosition) throws IOException
+    {
+        if (newPosition < 0)
+            throw new IllegalArgumentException("new position should not be negative");
+
+        if (newPosition > length()) // it is save to call length() in read-only mode
+            throw new EOFException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode",
+                                                 newPosition, getPath(), length()));
+
+        current = newPosition;
+
+        if (newPosition > (bufferOffset + validBufferBytes) || newPosition < bufferOffset)
+            reBuffer();
+    }
+
+    @Override
+    // -1 will be returned if there is nothing to read; higher-level methods like readInt
+    // or readFully (from RandomAccessFile) will throw EOFException but this should not
+    public int read() throws IOException
+    {
+        if (buffer == null)
+            throw new ClosedChannelException();
+
+        if (isEOF())
+            return -1; // required by RandomAccessFile
+
+        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+            reBuffer();
+
+        assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
+
+        return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
+    }
+
+    @Override
+    public int read(byte[] buffer) throws IOException
+    {
+        return read(buffer, 0, buffer.length);
+    }
+
+    @Override
+    // -1 will be returned if there is nothing to read; higher-level methods like readInt
+    // or readFully (from RandomAccessFile) will throw EOFException but this should not
+    public int read(byte[] buff, int offset, int length) throws IOException
+    {
+        if (buffer == null)
+            throw new ClosedChannelException();
+
+        if (length == 0)
+            return 0;
+
+        if (isEOF())
+            return -1;
+
+        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+            reBuffer();
+
+        assert current >= bufferOffset && current < bufferOffset + validBufferBytes
+                : String.format("File (%s), current offset %d, buffer offset %d, buffer limit %d",
+                                getPath(),
+                                current,
+                                bufferOffset,
+                                validBufferBytes);
+
+        int toCopy = Math.min(length, validBufferBytes - bufferCursor());
+
+        System.arraycopy(buffer, bufferCursor(), buff, offset, toCopy);
+        current += toCopy;
+
+        return toCopy;
+    }
+
+    public ByteBuffer readBytes(int length) throws IOException
+    {
+        assert length >= 0 : "buffer length should not be negative: " + length;
+
+        byte[] buff = new byte[length];
+        readFully(buff); // reading data buffer
+
+        return ByteBuffer.wrap(buff);
+    }
+
+    @Override
+    public long length() throws IOException
+    {
+        return fileLength;
+    }
+
+    @Override
+    public void write(int value) throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void write(byte[] buffer) throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void write(byte[] buffer, int offset, int length) throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/SequentialWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/SequentialWriter.java?rev=1149628&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/SequentialWriter.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/SequentialWriter.java Fri Jul 22 14:56:25 2011
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.*;
+import java.nio.channels.ClosedChannelException;
+
+import org.apache.cassandra.utils.CLibrary;
+
+public class SequentialWriter extends OutputStream
+{
+    // isDirty - true if this.buffer contains any un-synced bytes
+    private boolean isDirty = false, syncNeeded = false;
+
+    // absolute path to the given file
+    private final String filePath;
+
+    // so we can use the write(int) path w/o tons of new byte[] allocations
+    private final byte[] singleByteBuffer = new byte[1];
+
+    private byte[] buffer;
+    private final boolean skipIOCache;
+    private final int fd;
+
+    private long current = 0, bufferOffset;
+    private int validBufferBytes;
+
+    private final RandomAccessFile out;
+
+    // used if skip I/O cache was enabled
+    private long ioCacheStartOffset = 0, bytesSinceCacheFlush = 0;
+
+    public final DataOutputStream stream;
+
+    public SequentialWriter(File file, int bufferSize, boolean skipIOCache) throws IOException
+    {
+        out = new RandomAccessFile(file, "rw");
+
+        filePath = file.getAbsolutePath();
+
+        buffer = new byte[bufferSize];
+        this.skipIOCache = skipIOCache;
+        fd = CLibrary.getfd(out.getFD());
+        stream = new DataOutputStream(this);
+    }
+
+    public static SequentialWriter open(File file) throws IOException
+    {
+        return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
+    }
+
+    public static SequentialWriter open(File file, int bufferSize) throws IOException
+    {
+        return open(file, bufferSize, false);
+    }
+
+    public static SequentialWriter open(File file, boolean skipIOCache) throws IOException
+    {
+        return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache);
+    }
+
+    public static SequentialWriter open(File file, int bufferSize, boolean skipIOCache) throws IOException
+    {
+        return new SequentialWriter(file, bufferSize, skipIOCache);
+    }
+
+    public void write(int value) throws IOException
+    {
+        singleByteBuffer[0] = (byte) value;
+        write(singleByteBuffer, 0, 1);
+    }
+
+    public void write(byte[] buffer) throws IOException
+    {
+        write(buffer, 0, buffer.length);
+    }
+
+    public void write(byte[] data, int offset, int length) throws IOException
+    {
+        if (buffer == null)
+            throw new ClosedChannelException();
+
+        while (length > 0)
+        {
+            int n = writeAtMost(data, offset, length);
+            offset += n;
+            length -= n;
+            isDirty = true;
+            syncNeeded = true;
+        }
+    }
+
+    /*
+     * Write at most "length" bytes from "b" starting at position "offset", and
+     * return the number of bytes written. caller is responsible for setting
+     * isDirty.
+     */
+    private int writeAtMost(byte[] data, int offset, int length) throws IOException
+    {
+        if (current >= bufferOffset + buffer.length)
+            reBuffer();
+
+        assert current < bufferOffset + buffer.length
+                : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
+
+
+        int toCopy = Math.min(length, buffer.length - bufferCursor());
+
+        // copy bytes from external buffer
+        System.arraycopy(data, offset, buffer, bufferCursor(), toCopy);
+
+        assert current <= bufferOffset + buffer.length
+                : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
+
+        validBufferBytes = Math.max(validBufferBytes, bufferCursor() + toCopy);
+        current += toCopy;
+
+        return toCopy;
+    }
+
+    /**
+     * Synchronize file contents with disk.
+     * @throws java.io.IOException on any I/O error.
+     */
+    public void sync() throws IOException
+    {
+        if (syncNeeded)
+        {
+            flush();
+            out.getFD().sync();
+
+            syncNeeded = false;
+        }
+    }
+
+    /**
+     * If buffer is dirty, flush it's contents to the operating system. Does not imply fsync().
+     *
+     * Currently, for implementation reasons, this also invalidates the buffer.
+     *
+     * @throws java.io.IOException on any I/O error.
+     */
+    @Override
+    public void flush() throws IOException
+    {
+        if (isDirty)
+        {
+            out.write(buffer, 0, validBufferBytes);
+
+            if (skipIOCache)
+            {
+                // we don't know when the data reaches disk since we aren't
+                // calling flush
+                // so we continue to clear pages we don't need from the first
+                // offset we see
+                // periodically we update this starting offset
+                bytesSinceCacheFlush += validBufferBytes;
+
+                if (bytesSinceCacheFlush >= RandomAccessReader.MAX_BYTES_IN_PAGE_CACHE)
+                {
+                    CLibrary.trySkipCache(this.fd, ioCacheStartOffset, 0);
+                    ioCacheStartOffset = bufferOffset;
+                    bytesSinceCacheFlush = 0;
+                }
+            }
+
+            // Remember that we wrote, so we don't write it again on next flush().
+            resetBuffer();
+
+            isDirty = false;
+        }
+    }
+
+    public long getFilePointer()
+    {
+        return current;
+    }
+
+    public long length() throws IOException
+    {
+        return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes);
+    }
+
+    public String getPath()
+    {
+        return filePath;
+    }
+
+
+    private void reBuffer() throws IOException
+    {
+        flush();
+        resetBuffer();
+    }
+
+    private void resetBuffer()
+    {
+        bufferOffset = current;
+        validBufferBytes = 0;
+    }
+
+    private int bufferCursor()
+    {
+        return (int) (current - bufferOffset);
+    }
+
+    public FileMark mark()
+    {
+        return new BufferedFileWriterMark(current);
+    }
+
+    public void resetAndTruncate(FileMark mark) throws IOException
+    {
+        assert mark instanceof BufferedFileWriterMark;
+
+        // synchronize current buffer with disk
+        // because we don't want any data loss
+        sync();
+
+        // setting marker as a current offset
+        current = ((BufferedFileWriterMark) mark).pointer;
+
+        // truncate file to given position
+        truncate(current);
+
+        // reset channel position
+        out.seek(current);
+
+        resetBuffer();
+    }
+
+    public void truncate(long toSize) throws IOException
+    {
+        out.getChannel().truncate(toSize);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        sync();
+
+        buffer = null;
+
+        if (skipIOCache && bytesSinceCacheFlush > 0)
+            CLibrary.trySkipCache(fd, 0, 0);
+
+        out.close(); // this will also close channel for us
+    }
+
+    /**
+     * Class to hold a mark to the position of the file
+     */
+    protected static class BufferedFileWriterMark implements FileMark
+    {
+        long pointer;
+
+        public BufferedFileWriterMark(long pointer)
+        {
+            this.pointer = pointer;
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Fri Jul 22 14:56:25 2011
@@ -28,7 +28,7 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.StorageService;
 
 import org.apache.commons.cli.*;
@@ -235,7 +235,7 @@ public class SSTableExport
     public static void export(String ssTableFile, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException
     {
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
-        SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+        SSTableScanner scanner = reader.getDirectScanner(RandomAccessReader.DEFAULT_BUFFER_SIZE);
 
         IPartitioner<?> partitioner = StorageService.getPartitioner();
 
@@ -292,7 +292,7 @@ public class SSTableExport
 
 
         SSTableIdentityIterator row;
-        SSTableScanner scanner = reader.getDirectScanner(BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE);
+        SSTableScanner scanner = reader.getDirectScanner(RandomAccessReader.DEFAULT_BUFFER_SIZE);
 
         outs.println("{");
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java Fri Jul 22 14:56:25 2011
@@ -79,7 +79,7 @@ public final class CLibrary
     public static native int fcntl(int fd, int command, long flags) throws LastErrorException;
 
     // fadvice
-    public static native int posix_fadvise(int fd, int offset, int len, int flag) throws LastErrorException;
+    public static native int posix_fadvise(int fd, long offset, int len, int flag) throws LastErrorException;
         
     private static int errno(RuntimeException e)
     {
@@ -189,7 +189,7 @@ public final class CLibrary
         }
     }
 
-    public static void trySkipCache(int fd, int offset, int len)
+    public static void trySkipCache(int fd, long offset, int len)
     {
         if (fd < 0)
             return;
@@ -200,10 +200,6 @@ public final class CLibrary
             {
                 posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED);
             }
-            else if (System.getProperty("os.name").toLowerCase().contains("mac"))
-            {
-                tryFcntl(fd, F_NOCACHE, 1);
-            }
         }
         catch (UnsatisfiedLinkError e)
         {

Modified: cassandra/trunk/test/long/org/apache/cassandra/db/LongTableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/LongTableTest.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/test/long/org/apache/cassandra/db/LongTableTest.java (original)
+++ cassandra/trunk/test/long/org/apache/cassandra/db/LongTableTest.java Fri Jul 22 14:56:25 2011
@@ -18,31 +18,15 @@
 
 package org.apache.cassandra.db;
 
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.*;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-import org.apache.commons.lang.StringUtils;
 import org.junit.Test;
 
-import static junit.framework.Assert.*;
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.utils.WrappedRunnable;
 import static org.apache.cassandra.Util.column;
-import static org.apache.cassandra.Util.getBytes;
+
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Fri Jul 22 14:56:25 2011
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.io.File;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.text.DecimalFormat;
@@ -35,6 +36,7 @@ import static junit.framework.Assert.*;
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.WrappedRunnable;
 import static org.apache.cassandra.Util.column;
 import static org.apache.cassandra.Util.getBytes;
@@ -43,7 +45,6 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 
@@ -405,7 +406,7 @@ public class TableTest extends CleanupHe
         // verify that we do indeed have multiple index entries
         SSTableReader sstable = cfStore.getSSTables().iterator().next();
         long position = sstable.getPosition(key, SSTableReader.Operator.EQ);
-        BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.getFilename(), "r");
+        RandomAccessReader file = RandomAccessReader.open(new File(sstable.getFilename()));
         file.seek(position);
         assert ByteBufferUtil.readWithShortLength(file).equals(key.key);
         SSTableReader.readRowSize(file, sstable.descriptor);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java Fri Jul 22 14:56:25 2011
@@ -19,6 +19,7 @@
 
 package org.apache.cassandra.io.sstable;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -26,7 +27,7 @@ import java.util.*;
 import org.junit.Test;
 
 import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SSTableTest extends CleanupHelper
@@ -50,7 +51,7 @@ public class SSTableTest extends Cleanup
 
     private void verifySingle(SSTableReader sstable, ByteBuffer bytes, ByteBuffer key) throws IOException
     {
-        BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.getFilename(), "r");
+        RandomAccessReader file = RandomAccessReader.open(new File(sstable.getFilename()));
         file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ));
         assert key.equals(ByteBufferUtil.readWithShortLength(file));
         int size = (int)SSTableReader.readRowSize(file, sstable.descriptor);
@@ -87,7 +88,7 @@ public class SSTableTest extends Cleanup
     {
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>(map.keySet());
         Collections.shuffle(keys);
-        BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.getFilename(), "r");
+        RandomAccessReader file = RandomAccessReader.open(new File(sstable.getFilename()));
         for (ByteBuffer key : keys)
         {
             file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ));

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java?rev=1149628&r1=1149627&r2=1149628&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java Fri Jul 22 14:56:25 2011
@@ -20,31 +20,23 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import static org.junit.Assert.*;
-
+import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.CounterColumnType;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.streaming.OperationType;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.NodeId;
 import org.junit.Test;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -206,8 +198,8 @@ public class SSTableWriterCommutativeTes
         SSTableReader cleaned = SSTableUtils.prepare().ks(keyspace).cf(cfname).generation(0).write(cleanedEntries);
 
         // verify
-        BufferedRandomAccessFile origFile    = new BufferedRandomAccessFile(orig.descriptor.filenameFor(SSTable.COMPONENT_DATA), "r", 8 * 1024 * 1024);
-        BufferedRandomAccessFile cleanedFile = new BufferedRandomAccessFile(cleaned.descriptor.filenameFor(SSTable.COMPONENT_DATA), "r", 8 * 1024 * 1024);
+        RandomAccessReader origFile    = RandomAccessReader.open(new File(orig.descriptor.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024);
+        RandomAccessReader cleanedFile = RandomAccessReader.open(new File(cleaned.descriptor.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024);
 
         while(origFile.getFilePointer() < origFile.length() && cleanedFile.getFilePointer() < cleanedFile.length())
         {



Mime
View raw message