cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [2/3] cassandra git commit: Fix CASSANDRA-8750's treatment of compressed files
Date Wed, 04 Mar 2015 16:32:15 GMT
Fix CASSANDRA-8750's treatment of compressed files

patch by benedict; reviewed by marcus for CASSANDRA-8750


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1e2978f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1e2978f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1e2978f

Branch: refs/heads/trunk
Commit: a1e2978f9f6d2e9a318f44a5b6c625659b86efe8
Parents: ec7fba4
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Wed Mar 4 16:27:30 2015 +0000
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Wed Mar 4 16:27:30 2015 +0000

----------------------------------------------------------------------
 .../compress/CompressedRandomAccessReader.java  |   4 +
 .../io/compress/CompressionMetadata.java        |  38 +++--
 .../cassandra/io/util/RandomAccessReader.java   |   2 +
 .../io/sstable/SSTableRewriterTest.java         | 149 ++++++++++++-------
 4 files changed, 129 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1e2978f/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index e29ad33..184db9c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -137,6 +137,10 @@ public class CompressedRandomAccessReader extends RandomAccessReader
 
         // buffer offset is always aligned
         bufferOffset = current & ~(buffer.length - 1);
+        // the length() can be provided at construction time, to override the true (uncompressed)
length of the file;
+        // this is permitted to occur within a compressed segment, so we truncate validBufferBytes
if we cross the imposed length
+        if (bufferOffset + validBufferBytes > length())
+            validBufferBytes = (int)(length() - bufferOffset);
     }
 
     private int checksum(CompressionMetadata.Chunk chunk) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1e2978f/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 59c5da5..9ac2f89 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -57,6 +57,9 @@ import org.apache.cassandra.utils.Pair;
  */
 public class CompressionMetadata
 {
+    // dataLength can represent either the true length of the file
+    // or some shorter value, in the case we want to impose a shorter limit on readers
+    // (when early opening, we want to ensure readers cannot read past fully written sections)
     public final long dataLength;
     public final long compressedFileLength;
     public final boolean hasPostCompressionAdlerChecksums;
@@ -331,33 +334,39 @@ public class CompressionMetadata
 
         public CompressionMetadata open(long dataLength, long compressedLength, OpenType
type)
         {
-            SafeMemory offsets = this.offsets;
+            SafeMemory offsets;
             int count = this.count;
             switch (type)
             {
                 case FINAL: case SHARED_FINAL:
-                    // maybe resize the data
                     if (this.offsets.size() != count * 8L)
                     {
-                        offsets = this.offsets.copy(count * 8L);
-                        // release our reference to the original shared data;
-                        // we don't do this if not resizing since we must pass out existing
-                        // reference onto our caller
+                        // finalize the size of memory used if it won't now change;
+                        // unnecessary if already correct size
+                        SafeMemory tmp = this.offsets.copy(count * 8L);
                         this.offsets.free();
+                        this.offsets = tmp;
                     }
-                    // null out our reference to the original shared data to catch accidental
reuse
-                    // note that since noone is writing to this Writer while we open it,
null:ing out this.offsets is safe
-                    this.offsets = null;
+
                     if (type == OpenType.SHARED_FINAL)
-                        // we will use the data again, so stash our resized data back, and
take an extra reference to it
-                        this.offsets = offsets.sharedCopy();
+                    {
+                        offsets = this.offsets.sharedCopy();
+                    }
+                    else
+                    {
+                        offsets = this.offsets;
+                        // null out our reference to the original shared data to catch accidental
reuse
+                        // note that since noone is writing to this Writer while we open
it, null:ing out this.offsets is safe
+                        this.offsets = null;
+                    }
                     break;
 
                 case SHARED:
-
+                    offsets = this.offsets.sharedCopy();
                     // we should only be opened on a compression data boundary; truncate
our size to this boundary
-                    assert dataLength % parameters.chunkLength() == 0;
                     count = (int) (dataLength / parameters.chunkLength());
+                    if (dataLength % parameters.chunkLength() != 0)
+                        count++;
                     // grab our actual compressed length from the next offset from our the
position we're opened to
                     if (count < this.count)
                         compressedLength = offsets.getLong(count * 8L);
@@ -409,7 +418,8 @@ public class CompressionMetadata
 
         public void abort()
         {
-            offsets.close();
+            if (offsets != null)
+                offsets.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1e2978f/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index df68ca3..95877a2 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -49,6 +49,8 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     // channel liked with the file, used to retrieve data and force updates.
     protected final FileChannel channel;
 
+    // this can be overridden at construction to a value shorter than the true length of
the file;
+    // if so, it acts as an imposed limit on reads, rather than a convenience property
     private final long fileLength;
 
     protected final PoolingSegmentedFile owner;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1e2978f/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 6c96905..76b89e5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
 
 import javax.annotation.Nullable;
 
@@ -187,26 +188,33 @@ public class SSTableRewriterTest extends SchemaLoader
             cf.addColumn(Util.column(String.valueOf(i), "a", 1));
         File dir = cfs.directories.getDirectoryForNewSSTables();
         SSTableWriter writer = getWriter(cfs, dir);
-
-        for (int i = 0; i < 500; i++)
-            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
-        SSTableReader s = writer.openEarly(1000);
-        assertFileCounts(dir.list(), 2, 3);
-        for (int i = 500; i < 1000; i++)
-            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
-        SSTableReader s2 = writer.openEarly(1000);
-        assertTrue(s != s2);
-        assertFileCounts(dir.list(), 2, 3);
-        s.markObsolete();
-        s.selfRef().release();
-        s2.selfRef().release();
-        Thread.sleep(1000);
-        assertFileCounts(dir.list(), 0, 3);
-        writer.abort();
-        Thread.sleep(1000);
-        int datafiles = assertFileCounts(dir.list(), 0, 0);
-        assertEquals(datafiles, 0);
-        validateCFS(cfs);
+        try
+        {
+            for (int i = 0; i < 1000; i++)
+                writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)),
cf);
+            SSTableReader s = writer.openEarly(1000);
+            assertFileCounts(dir.list(), 2, 2);
+            for (int i = 1000; i < 2000; i++)
+                writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)),
cf);
+            SSTableReader s2 = writer.openEarly(1000);
+            assertTrue(s.last.compareTo(s2.last) < 0);
+            assertFileCounts(dir.list(), 2, 2);
+            s.markObsolete();
+            s.selfRef().release();
+            s2.selfRef().release();
+            Thread.sleep(1000);
+            assertFileCounts(dir.list(), 0, 2);
+            writer.abort();
+            Thread.sleep(1000);
+            int datafiles = assertFileCounts(dir.list(), 0, 0);
+            assertEquals(datafiles, 0);
+            validateCFS(cfs);
+        }
+        catch (Throwable t)
+        {
+            writer.abort();
+            throw t;
+        }
     }
 
     @Test
@@ -287,17 +295,23 @@ public class SSTableRewriterTest extends SchemaLoader
                     assertEquals(cfs.getSSTables().size(), files); // we have one original
file plus the ones we have switched out.
                 }
             }
+
+            List<SSTableReader> sstables = rewriter.finish();
+            assertEquals(files, sstables.size());
+            assertEquals(files, cfs.getSSTables().size());
+            assertEquals(1, cfs.getDataTracker().getView().shadowed.size());
+            cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+            assertEquals(files, cfs.getSSTables().size());
+            assertEquals(0, cfs.getDataTracker().getView().shadowed.size());
+            Thread.sleep(1000);
+            assertFileCounts(s.descriptor.directory.list(), 0, 0);
+            validateCFS(cfs);
+        }
+        catch (Throwable t)
+        {
+            rewriter.abort();
+            throw t;
         }
-        List<SSTableReader> sstables = rewriter.finish();
-        assertEquals(files, sstables.size());
-        assertEquals(files, cfs.getSSTables().size());
-        assertEquals(1, cfs.getDataTracker().getView().shadowed.size());
-        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
-        assertEquals(files, cfs.getSSTables().size());
-        assertEquals(0, cfs.getDataTracker().getView().shadowed.size());
-        Thread.sleep(1000);
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
-        validateCFS(cfs);
     }
 
 
@@ -402,6 +416,11 @@ public class SSTableRewriterTest extends SchemaLoader
         {
             test.run(scanner, controller, s, cfs, rewriter);
         }
+        catch (Throwable t)
+        {
+            rewriter.abort();
+            throw t;
+        }
 
         Thread.sleep(1000);
         assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
@@ -434,7 +453,7 @@ public class SSTableRewriterTest extends SchemaLoader
             while(scanner.hasNext())
             {
                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                if (rewriter.currentWriter().getFilePointer() > 25000000)
                 {
                     rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
                     files++;
@@ -448,11 +467,17 @@ public class SSTableRewriterTest extends SchemaLoader
                     break;
                 }
             }
+
+            Thread.sleep(1000);
+            assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything
to the last file
+            assertFileCounts(s.descriptor.directory.list(), 0, 0);
+            validateCFS(cfs);
+        }
+        catch (Throwable t)
+        {
+            rewriter.abort();
+            throw t;
         }
-        Thread.sleep(1000);
-        assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to
the last file
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
-        validateCFS(cfs);
     }
 
     @Test
@@ -484,14 +509,20 @@ public class SSTableRewriterTest extends SchemaLoader
                     assertEquals(cfs.getSSTables().size(), files); // we have one original
file plus the ones we have switched out.
                 }
             }
+
+            List<SSTableReader> sstables = rewriter.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+            Thread.sleep(1000);
+            assertFileCounts(s.descriptor.directory.list(), 0, 0);
+            cfs.truncateBlocking();
+            Thread.sleep(1000); // make sure the deletion tasks have run etc
+            validateCFS(cfs);
+        }
+        catch (Throwable t)
+        {
+            rewriter.abort();
+            throw t;
         }
-        List<SSTableReader> sstables = rewriter.finish();
-        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
-        Thread.sleep(1000);
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
-        cfs.truncateBlocking();
-        Thread.sleep(1000); // make sure the deletion tasks have run etc
-        validateCFS(cfs);
     }
 
     @Test
@@ -523,14 +554,20 @@ public class SSTableRewriterTest extends SchemaLoader
                     files++;
                 }
             }
+
+            List<SSTableReader> sstables = rewriter.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+            assertEquals(files, sstables.size());
+            assertEquals(files, cfs.getSSTables().size());
+            Thread.sleep(1000);
+            assertFileCounts(s.descriptor.directory.list(), 0, 0);
+            validateCFS(cfs);
+        }
+        catch (Throwable t)
+        {
+            rewriter.abort();
+            throw t;
         }
-        List<SSTableReader> sstables = rewriter.finish();
-        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
-        assertEquals(files, sstables.size());
-        assertEquals(files, cfs.getSSTables().size());
-        Thread.sleep(1000);
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
-        validateCFS(cfs);
     }
     @Test
     public void testSSTableSplit() throws InterruptedException
@@ -710,7 +747,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         for (int i = 0; i < count / 100; i++)
-            cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
+            cf.addColumn(Util.cellname(i), random(0, 1000), 1);
         File dir = cfs.directories.getDirectoryForNewSSTables();
         String filename = cfs.getTempSSTablePath(dir);
 
@@ -757,6 +794,8 @@ public class SSTableRewriterTest extends SchemaLoader
         int datacount = 0;
         for (String f : files)
         {
+            if (f.endsWith("-CRC.db"))
+                continue;
             if (f.contains("-tmplink-"))
                 tmplinkcount++;
             else if (f.contains("-tmp-"))
@@ -779,4 +818,14 @@ public class SSTableRewriterTest extends SchemaLoader
                                  StorageService.getPartitioner(),
                                  new MetadataCollector(cfs.metadata.comparator));
     }
+
+    private ByteBuffer random(int i, int size)
+    {
+        byte[] bytes = new byte[size + 4];
+        ThreadLocalRandom.current().nextBytes(bytes);
+        ByteBuffer r = ByteBuffer.wrap(bytes);
+        r.putInt(0, i);
+        return r;
+    }
+
 }


Mime
View raw message