cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [2/3] cassandra git commit: SequentialWriter extends BDOSP
Date Mon, 17 Aug 2015 08:32:21 GMT
SequentialWriter extends BDOSP

patch by ariel; reviewed by benedict for CASSANDRA-9500


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29687a8b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29687a8b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29687a8b

Branch: refs/heads/trunk
Commit: 29687a8bb93a1df637e0ef32d3784e338afeecd9
Parents: 136e700
Author: Ariel Weisberg <ariel@weisberg.ws>
Authored: Wed Jul 22 18:34:55 2015 -0400
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Mon Aug 17 09:25:58 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  10 +-
 .../io/compress/CompressedSequentialWriter.java |  33 +++-
 .../io/sstable/format/big/BigTableWriter.java   |   6 +-
 .../io/util/BufferedDataOutputStreamPlus.java   | 104 ++++++----
 .../io/util/ChecksummedSequentialWriter.java    |   3 +-
 .../cassandra/io/util/SequentialWriter.java     | 196 +++++--------------
 .../org/apache/cassandra/utils/SyncUtil.java    |  22 ++-
 .../compaction/CompactionAwareWriterTest.java   |   9 +-
 .../CompressedSequentialWriterTest.java         |   2 +-
 .../io/util/BufferedDataOutputStreamTest.java   |  62 +++++-
 .../io/util/BufferedRandomAccessFileTest.java   |   6 +-
 .../compression/CompressedInputStreamTest.java  |   2 +-
 13 files changed, 240 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3686a3d..b4b568c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@
  * Use byte to serialize MT hash length (CASSANDRA-9792)
  * Replace usage of Adler32 with CRC32 (CASSANDRA-8684)
  * Fix migration to new format from 2.1 SSTable (CASSANDRA-10006)
+ * SequentialWriter should extend BufferedDataOutputStreamPlus (CASSANDRA-9500)
 Merged from 2.2:
  * Fix histogram overflow exception (CASSANDRA-9973)
  * Route gossip messages over dedicated socket (CASSANDRA-9237)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 52fc48f..9eef23e 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -94,10 +94,10 @@ public class ColumnIndex
 
         private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
         {
-            ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer.stream);
-            DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer.stream);
+            ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
+            DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
             if (header.hasStatic())
-                UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer.stream,
version);
+                UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer,
version);
         }
 
         public ColumnIndex build() throws IOException
@@ -135,7 +135,7 @@ public class ColumnIndex
                 startPosition = currentPosition();
             }
 
-            UnfilteredSerializer.serializer.serialize(unfiltered, header, writer.stream,
version);
+            UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, version);
             lastClustering = unfiltered.clustering();
             ++written;
 
@@ -153,7 +153,7 @@ public class ColumnIndex
 
         private ColumnIndex close() throws IOException
         {
-            UnfilteredSerializer.serializer.writeEndOfPartition(writer.stream);
+            UnfilteredSerializer.serializer.writeEndOfPartition(writer);
 
             // It's possible we add no rows, just a top level deletion
             if (written == 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index a4afa3f..8e1ebff 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.io.compress;
 
+import static org.apache.cassandra.utils.Throwables.merge;
+
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
@@ -31,6 +33,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.schema.CompressionParams;
 
@@ -81,7 +84,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     {
         try
         {
-            return channel.position();
+            return fchannel.position();
         }
         catch (IOException e)
         {
@@ -130,9 +133,6 @@ public class CompressedSequentialWriter extends SequentialWriter
             compressed.rewind();
             crcMetadata.appendDirect(compressed, true);
             lastFlushOffset += compressedLength + 4;
-
-            // adjust our bufferOffset to account for the new uncompressed data we've now
written out
-            resetBuffer();
         }
         catch (IOException e)
         {
@@ -155,6 +155,8 @@ public class CompressedSequentialWriter extends SequentialWriter
     @Override
     public FileMark mark()
     {
+        if (!buffer.hasRemaining())
+            doFlush();
         return new CompressedFileWriterMark(chunkOffset, current(), buffer.position(), chunkCount
+ 1);
     }
 
@@ -189,8 +191,8 @@ public class CompressedSequentialWriter extends SequentialWriter
         {
             compressed.clear();
             compressed.limit(chunkSize);
-            channel.position(chunkOffset);
-            channel.read(compressed);
+            fchannel.position(chunkOffset);
+            fchannel.read(compressed);
 
             try
             {
@@ -209,7 +211,7 @@ public class CompressedSequentialWriter extends SequentialWriter
             checksum.update(compressed);
 
             crcCheckBuffer.clear();
-            channel.read(crcCheckBuffer);
+            fchannel.read(crcCheckBuffer);
             crcCheckBuffer.flip();
             if (crcCheckBuffer.getInt() != (int) checksum.getValue())
                 throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
@@ -229,7 +231,6 @@ public class CompressedSequentialWriter extends SequentialWriter
 
         // Mark as dirty so we can guarantee the newly buffered bytes won't be lost on a
rebuffer
         buffer.position(realMark.validBufferBytes);
-        isDirty = true;
 
         bufferOffset = truncateTarget - buffer.position();
         chunkCount = realMark.nextChunkIndex - 1;
@@ -248,7 +249,7 @@ public class CompressedSequentialWriter extends SequentialWriter
         {
             try
             {
-                channel.position(chunkOffset);
+                fchannel.position(chunkOffset);
             }
             catch (IOException e)
             {
@@ -281,6 +282,20 @@ public class CompressedSequentialWriter extends SequentialWriter
             sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
             metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit();
         }
+
+        @Override
+        protected Throwable doPreCleanup(Throwable accumulate)
+        {
+            accumulate = super.doPreCleanup(accumulate);
+            if (compressed != null)
+            {
+                try { FileUtils.clean(compressed); }
+                catch (Throwable t) { accumulate = merge(accumulate, t); }
+                compressed = null;
+            }
+
+            return accumulate;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 2b60479..77bf3d6 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -342,7 +342,7 @@ public class BigTableWriter extends SSTableWriter
         File file = new File(desc.filenameFor(Component.STATS));
         try (SequentialWriter out = SequentialWriter.open(file))
         {
-            desc.getMetadataSerializer().serialize(components, out.stream);
+            desc.getMetadataSerializer().serialize(components, out);
             out.setDescriptor(desc).finish();
         }
         catch (IOException e)
@@ -407,8 +407,8 @@ public class BigTableWriter extends SSTableWriter
             long indexStart = indexFile.getFilePointer();
             try
             {
-                ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
-                rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
+                ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile);
+                rowIndexEntrySerializer.serialize(indexEntry, indexFile);
             }
             catch (IOException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index b6f3231..9434219 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -27,6 +27,8 @@ import java.nio.channels.WritableByteChannel;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 
+import net.nicoulaj.compilecommand.annotations.DontInline;
+
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 import org.apache.cassandra.utils.vint.VIntCoding;
@@ -41,7 +43,16 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
 {
     private static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX
+ "nio_data_output_stream_plus_buffer_size", 1024 * 32);
 
-    ByteBuffer buffer;
+    protected ByteBuffer buffer;
+
+    //Allow derived classes to specify writing to the channel
+    //directly shouldn't happen because they intercept via doFlush for things
+    //like compression or checksumming
+    //Another hack for this value is that it also indicates that flushing early
+    //should not occur, flushes aligned with buffer size are desired
+    //Unless... it's the last flush. Compression and checksum formats
+    //expect block (same as buffer size) alignment for everything except the last block
+    protected boolean strictFlushing = false;
 
     public BufferedDataOutputStreamPlus(RandomAccessFile ras)
     {
@@ -142,40 +153,53 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
         else
         {
             assert toWrite.isDirect();
+            MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
+
             if (toWrite.remaining() > buffer.remaining())
             {
-                doFlush();
-                MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
-                if (toWrite.remaining() > buffer.remaining())
+                if (strictFlushing)
                 {
-                    while (hollowBuffer.hasRemaining())
-                        channel.write(hollowBuffer);
+                    writeExcessSlow();
                 }
                 else
                 {
-                    buffer.put(hollowBuffer);
+                    doFlush();
+                    while (hollowBuffer.remaining() > buffer.capacity())
+                        channel.write(hollowBuffer);
                 }
             }
-            else
-            {
-                MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
-                buffer.put(hollowBuffer);
-            }
+
+            buffer.put(hollowBuffer);
         }
     }
 
+    // writes anything we can't fit into the buffer
+    @DontInline
+    private void writeExcessSlow() throws IOException
+    {
+        int originalLimit = hollowBuffer.limit();
+        while (originalLimit - hollowBuffer.position() > buffer.remaining())
+        {
+            hollowBuffer.limit(hollowBuffer.position() + buffer.remaining());
+            buffer.put(hollowBuffer);
+            doFlush();
+        }
+        hollowBuffer.limit(originalLimit);
+    }
 
     @Override
     public void write(int b) throws IOException
     {
-        ensureRemaining(1);
+        if (!buffer.hasRemaining())
+            doFlush();
         buffer.put((byte) (b & 0xFF));
     }
 
     @Override
     public void writeBoolean(boolean v) throws IOException
     {
-        ensureRemaining(1);
+        if (!buffer.hasRemaining())
+            doFlush();
         buffer.put(v ? (byte)1 : (byte)0);
     }
 
@@ -188,29 +212,34 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
     @Override
     public void writeShort(int v) throws IOException
     {
-        ensureRemaining(2);
-        buffer.putShort((short) v);
+        writeChar(v);
     }
 
     @Override
     public void writeChar(int v) throws IOException
     {
-        ensureRemaining(2);
-        buffer.putChar((char) v);
+        if (buffer.remaining() < 2)
+            writeSlow(v, 2);
+        else
+            buffer.putChar((char) v);
     }
 
     @Override
     public void writeInt(int v) throws IOException
     {
-        ensureRemaining(4);
-        buffer.putInt(v);
+        if (buffer.remaining() < 4)
+            writeSlow(v, 4);
+        else
+            buffer.putInt(v);
     }
 
     @Override
     public void writeLong(long v) throws IOException
     {
-        ensureRemaining(8);
-        buffer.putLong(v);
+        if (buffer.remaining() < 8)
+            writeSlow(v, 8);
+        else
+            buffer.putLong(v);
     }
 
     @Override
@@ -225,27 +254,33 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
         int size = VIntCoding.computeUnsignedVIntSize(value);
         if (size == 1)
         {
-            ensureRemaining(1);
-            buffer.put((byte) value);
+            write((int) value);
             return;
         }
 
-        ensureRemaining(size);
-        buffer.put(VIntCoding.encodeVInt(value, size), 0, size);
+        write(VIntCoding.encodeVInt(value, size), 0, size);
     }
 
     @Override
     public void writeFloat(float v) throws IOException
     {
-        ensureRemaining(4);
-        buffer.putFloat(v);
+        writeInt(Float.floatToRawIntBits(v));
     }
 
     @Override
     public void writeDouble(double v) throws IOException
     {
-        ensureRemaining(8);
-        buffer.putDouble(v);
+        writeLong(Double.doubleToRawLongBits(v));
+    }
+
+    @DontInline
+    private void writeSlow(long bytes, int count) throws IOException
+    {
+        int origCount = count;
+        if (ByteOrder.BIG_ENDIAN == buffer.order())
+            while (count > 0) writeByte((int) (bytes >>> (8 * --count)));
+        else
+            while (count > 0) writeByte((int) (bytes >>> (8 * (origCount - count--))));
     }
 
     @Override
@@ -275,6 +310,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
             write(buffer);
     }
 
+    @DontInline
     protected void doFlush() throws IOException
     {
         buffer.flip();
@@ -300,15 +336,11 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
         buffer = null;
     }
 
-    protected void ensureRemaining(int minimum) throws IOException
-    {
-        if (buffer.remaining() < minimum)
-            doFlush();
-    }
-
     @Override
     public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
     {
+        if (strictFlushing)
+            throw new UnsupportedOperationException();
         //Don't allow writes to the underlying channel while data is buffered
         flush();
         return f.apply(channel);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index d5e6be9..8203a37 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -31,10 +31,11 @@ public class ChecksummedSequentialWriter extends SequentialWriter
     {
         super(file, bufferSize, BufferType.ON_HEAP);
         crcWriter = new SequentialWriter(crcPath, 8 * 1024, BufferType.ON_HEAP);
-        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
+        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter);
         crcMetadata.writeChunkSize(buffer.capacity());
     }
 
+    @Override
     protected void flushData()
     {
         super.flushData();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index ddabe89..905a5c6 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -18,10 +18,7 @@
 package org.apache.cassandra.io.util;
 
 import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
-import java.nio.channels.WritableByteChannel;
 import java.nio.file.StandardOpenOption;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,35 +29,27 @@ import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
 import static org.apache.cassandra.utils.Throwables.merge;
+
 import org.apache.cassandra.utils.SyncUtil;
 
 /**
  * Adds buffering, mark, and fsyncing to OutputStream.  We always fsync on close; we may
also
  * fsync incrementally if Config.trickle_fsync is enabled.
  */
-public class SequentialWriter extends OutputStream implements WritableByteChannel, Transactional
+public class SequentialWriter extends BufferedDataOutputStreamPlus implements Transactional
 {
     private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
 
-    // isDirty - true if this.buffer contains any un-synced bytes
-    protected boolean isDirty = false, syncNeeded = false;
-
     // absolute path to the given file
     private final String filePath;
 
-    protected ByteBuffer buffer;
-    private int directoryFD;
-    // directory should be synced only after first file sync, in other words, only once per
file
-    private boolean directorySynced = false;
-
     // Offset for start of buffer relative to underlying file
     protected long bufferOffset;
 
-    protected final FileChannel channel;
+    protected final FileChannel fchannel;
 
     // whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by
kernel causing read
     // latency spikes
@@ -68,7 +57,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     private int trickleFsyncByteInterval;
     private int bytesSinceTrickleFsync = 0;
 
-    public final DataOutputPlus stream;
     protected long lastFlushOffset;
 
     protected Runnable runPostFlush;
@@ -84,13 +72,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         @Override
         protected Throwable doPreCleanup(Throwable accumulate)
         {
-            if (directoryFD >= 0)
-            {
-                try { CLibrary.tryCloseFD(directoryFD); }
-                catch (Throwable t) { accumulate = merge(accumulate, t); }
-                directoryFD = -1;
-            }
-
             // close is idempotent
             try { channel.close(); }
             catch (Throwable t) { accumulate = merge(accumulate, t); }
@@ -127,30 +108,45 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         }
     }
 
-    public SequentialWriter(File file, int bufferSize, BufferType bufferType)
-    {
+    // TODO: we should specify as a parameter if we permit an existing file or not
+    private static FileChannel openChannel(File file) {
         try
         {
             if (file.exists())
-                channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
+            {
+                return FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
+            }
             else
-                channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
+            {
+                FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ,
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+                try
+                {
+                    SyncUtil.trySyncDir(file.getParentFile());
+                }
+                catch (Throwable t)
+                {
+                    try { channel.close(); }
+                    catch (Throwable t2) { t.addSuppressed(t2); }
+                }
+                return channel;
+            }
         }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
+    }
 
-        filePath = file.getAbsolutePath();
+    public SequentialWriter(File file, int bufferSize, BufferType bufferType)
+    {
+        super(openChannel(file), bufferType.allocate(bufferSize));
+        strictFlushing = true;
+        fchannel = (FileChannel)channel;
 
-        // Allow children to allocate buffer as direct (snappy compression) if necessary
-        buffer = bufferType.allocate(bufferSize);
+        filePath = file.getAbsolutePath();
 
         this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
         this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb()
* 1024;
-
-        directoryFD = CLibrary.tryOpenDirectory(file.getParent());
-        stream = new WrappedDataOutputStreamPlus(this, this);
     }
 
     /**
@@ -174,73 +170,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters,
sstableMetadataCollector);
     }
 
-    public void write(int value) throws ClosedChannelException
-    {
-        if (buffer == null)
-            throw new ClosedChannelException();
-
-        if (!buffer.hasRemaining())
-        {
-            reBuffer();
-        }
-
-        buffer.put((byte) value);
-
-        isDirty = true;
-        syncNeeded = true;
-    }
-
-    public void write(byte[] buffer) throws IOException
-    {
-        write(buffer, 0, buffer.length);
-    }
-
-    public void write(byte[] data, int offset, int length) throws IOException
-    {
-        if (buffer == null)
-            throw new ClosedChannelException();
-
-        int position = offset;
-        int remaining = length;
-        while (remaining > 0)
-        {
-            if (!buffer.hasRemaining())
-                reBuffer();
-
-            int toCopy = Math.min(remaining, buffer.remaining());
-            buffer.put(data, position, toCopy);
-
-            remaining -= toCopy;
-            position += toCopy;
-
-            isDirty = true;
-            syncNeeded = true;
-        }
-    }
-
-    public int write(ByteBuffer src) throws IOException
-    {
-        if (buffer == null)
-            throw new ClosedChannelException();
-
-        int length = src.remaining();
-        int finalLimit = src.limit();
-        while (src.hasRemaining())
-        {
-            if (!buffer.hasRemaining())
-                reBuffer();
-
-            if (buffer.remaining() < src.remaining())
-                src.limit(src.position() + buffer.remaining());
-            buffer.put(src);
-            src.limit(finalLimit);
-
-            isDirty = true;
-            syncNeeded = true;
-        }
-        return length;
-    }
-
     /**
      * Synchronize file contents with disk.
      */
@@ -253,7 +182,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     {
         try
         {
-            SyncUtil.force(channel, false);
+            SyncUtil.force(fchannel, false);
         }
         catch (IOException e)
         {
@@ -261,55 +190,34 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         }
     }
 
+    /*
+     * This is only safe to call before truncation or close for CompressedSequentialWriter
+     * Otherwise it will leave a non-uniform size compressed block in the middle of the file
+     * and the compressed format can't handle that.
+     */
     protected void syncInternal()
     {
-        if (syncNeeded)
-        {
-            flushInternal();
-            syncDataOnlyInternal();
-
-            if (!directorySynced)
-            {
-                SyncUtil.trySync(directoryFD);
-                directorySynced = true;
-            }
-
-            syncNeeded = false;
-        }
+        doFlush();
+        syncDataOnlyInternal();
     }
 
-    /**
-     * If buffer is dirty, flush it's contents to the operating system. Does not imply fsync().
-     *
-     * Currently, for implementation reasons, this also invalidates the buffer.
-     */
     @Override
-    public void flush()
+    protected void doFlush()
     {
-        flushInternal();
-    }
+        flushData();
 
-    protected void flushInternal()
-    {
-        if (isDirty)
+        if (trickleFsync)
         {
-            flushData();
-
-            if (trickleFsync)
+            bytesSinceTrickleFsync += buffer.position();
+            if (bytesSinceTrickleFsync >= trickleFsyncByteInterval)
             {
-                bytesSinceTrickleFsync += buffer.position();
-                if (bytesSinceTrickleFsync >= trickleFsyncByteInterval)
-                {
-                    syncDataOnlyInternal();
-                    bytesSinceTrickleFsync = 0;
-                }
+                syncDataOnlyInternal();
+                bytesSinceTrickleFsync = 0;
             }
-
-            // Remember that we wrote, so we don't write it again on next flush().
-            resetBuffer();
-
-            isDirty = false;
         }
+
+        // Remember that we wrote, so we don't write it again on next flush().
+        resetBuffer();
     }
 
     public void setPostFlushListener(Runnable runPostFlush)
@@ -361,7 +269,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     {
         try
         {
-            return Math.max(current(), channel.size());
+            return Math.max(current(), fchannel.size());
         }
         catch (IOException e)
         {
@@ -374,12 +282,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         return filePath;
     }
 
-    protected void reBuffer()
-    {
-        flushInternal();
-        resetBuffer();
-    }
-
     protected void resetBuffer()
     {
         bufferOffset = current();
@@ -423,7 +325,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
         try
         {
-            channel.position(truncateTarget);
+            fchannel.position(truncateTarget);
         }
         catch (IOException e)
         {
@@ -442,7 +344,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     {
         try
         {
-            channel.truncate(toSize);
+            fchannel.truncate(toSize);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/utils/SyncUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SyncUtil.java b/src/java/org/apache/cassandra/utils/SyncUtil.java
index 0e83ba2..b217e29 100644
--- a/src/java/org/apache/cassandra/utils/SyncUtil.java
+++ b/src/java/org/apache/cassandra/utils/SyncUtil.java
@@ -1,10 +1,6 @@
 package org.apache.cassandra.utils;
 
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.SyncFailedException;
+import java.io.*;
 import java.lang.reflect.Field;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.ClosedChannelException;
@@ -162,4 +158,20 @@ public class SyncUtil
         else
             CLibrary.trySync(fd);
     }
+
+    public static void trySyncDir(File dir)
+    {
+        if (SKIP_SYNC)
+            return;
+
+        int directoryFD = CLibrary.tryOpenDirectory(dir.getPath());
+        try
+        {
+            trySync(directoryFD);
+        }
+        finally
+        {
+            CLibrary.tryCloseFD(directoryFD);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 6a57327..1b94a6b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -32,10 +32,7 @@ import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
 import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
 import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -183,14 +180,14 @@ public class CompactionAwareWriterTest extends CQLTester
                     rowsWritten++;
             }
         }
-        Collection<SSTableReader> newSSTables = writer.finish();
+        writer.finish();
         return rowsWritten;
     }
 
     private void populate(int count) throws Throwable
     {
-        byte [] payload = new byte[1000];
-        new Random().nextBytes(payload);
+        byte [] payload = new byte[5000];
+        new Random(42).nextBytes(payload);
         ByteBuffer b = ByteBuffer.wrap(payload);
 
         for (int i = 0; i < count; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 28af0ae..ca15722 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -94,7 +94,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
             byte[] rawPost = new byte[bytesToTest];
             try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename
+ ".metadata", compressionParameters, sstableMetadataCollector);)
             {
-                Random r = new Random();
+                Random r = new Random(42);
 
                 // Test both write with byte[] and ByteBuffer
                 r.nextBytes(dataPre);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index acef1ec..469bccb 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -7,6 +7,7 @@ import java.io.UTFDataFormatException;
 import java.lang.reflect.Field;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.channels.WritableByteChannel;
 import java.util.Arrays;
 import java.util.Random;
@@ -161,12 +162,14 @@ public class BufferedDataOutputStreamTest
     private ByteArrayOutputStream canonical;
     private DataOutputStreamPlus dosp;
 
-    void setUp()
+    void setUp() throws Exception
     {
 
         generated = new ByteArrayOutputStream();
         canonical = new ByteArrayOutputStream();
         dosp = new WrappedDataOutputStreamPlus(canonical);
+        if (ndosp != null)
+            ndosp.close();
         ndosp = new BufferedDataOutputStreamPlus(adapter, 4096);
     }
 
@@ -535,4 +538,61 @@ public class BufferedDataOutputStreamTest
         for (long v : testValues)
             assertEquals(v, bbdi.readUnsignedVInt());
     }
+
+    @Test
+    public void testWriteSlowByteOrder() throws Exception
+    {
+        try (DataOutputBuffer dob = new DataOutputBuffer(4))
+        {
+            dob.order(ByteOrder.LITTLE_ENDIAN);
+            dob.writeLong(42);
+            assertEquals(42, ByteBuffer.wrap(dob.toByteArray()).order(ByteOrder.LITTLE_ENDIAN).getLong());
+        }
+    }
+
+    @Test
+    public void testWriteExcessSlow() throws Exception
+    {
+        try (DataOutputBuffer dob = new DataOutputBuffer(4))
+        {
+            dob.strictFlushing = true;
+            ByteBuffer buf = ByteBuffer.allocateDirect(8);
+            buf.putLong(0, 42);
+            dob.write(buf);
+            assertEquals(42, ByteBuffer.wrap(dob.toByteArray()).getLong());
+        }
+    }
+
+    @Test
+    public void testApplyToChannel() throws Exception
+    {
+        setUp();
+        Object obj = new Object();
+        Object retval = ndosp.applyToChannel( channel -> {
+            ByteBuffer buf = ByteBuffer.allocate(8);
+            buf.putLong(0, 42);
+            try
+            {
+                channel.write(buf);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+            return obj;
+        });
+        assertEquals(obj, retval);
+        assertEquals(42, ByteBuffer.wrap(generated.toByteArray()).getLong());
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testApplyToChannelThrowsForMisaligned() throws Exception
+    {
+        setUp();
+        ndosp.strictFlushing = true;
+        ndosp.applyToChannel( channel -> {
+            return null;
+        });
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index 3c9aa0e..e051c00 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -467,6 +467,10 @@ public class BufferedRandomAccessFileTest
             }
         }, AssertionError.class);
 
+        //Used to throw ClosedChannelException, but now that it extends BDOSP it just NPEs
on the buffer
+        //Writing to a BufferedOutputStream that is closed generates no error
+        //Going to allow the NPE to throw to catch as a bug any use after close. Notably
it won't throw NPE for a
+        //write of a 0 length, but that is kind of a corner case
         expectException(new Callable<Object>()
         {
             public Object call() throws IOException
@@ -474,7 +478,7 @@ public class BufferedRandomAccessFileTest
                 w.write(generateByteArray(1));
                 return null;
             }
-        }, ClosedChannelException.class);
+        }, NullPointerException.class);
 
         try (RandomAccessReader copy = RandomAccessReader.open(new File(r.getPath())))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index e3014c3..fea6d2b 100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -71,7 +71,7 @@ public class CompressedInputStreamTest
         for (long l = 0L; l < 1000; l++)
         {
             index.put(l, writer.getFilePointer());
-            writer.stream.writeLong(l);
+            writer.writeLong(l);
         }
         writer.finish();
 


Mime
View raw message