cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [1/3] git commit: Fix streaming RangeTombstones at column index boundary; patch by slebresne reviewed by yukim for CASSANDRA-5418
Date Thu, 11 Apr 2013 16:12:55 GMT
Updated Branches:
  refs/heads/cassandra-1.2 83ed1cbda -> 0f1fb4340
  refs/heads/trunk 8b0e1868e -> 0aaf67a74


Fix streaming RangeTombstones at column index boundary; patch by slebresne reviewed by yukim
for CASSANDRA-5418


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f1fb434
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f1fb434
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f1fb434

Branch: refs/heads/cassandra-1.2
Commit: 0f1fb4340ca1f6360487c76909883bfedc63e4ce
Parents: 83ed1cb
Author: Yuki Morishita <yukim@apache.org>
Authored: Thu Apr 11 10:57:42 2013 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Thu Apr 11 10:57:42 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +-
 src/java/org/apache/cassandra/db/ColumnIndex.java  |   26 +++++++---
 .../apache/cassandra/io/sstable/SSTableWriter.java |    2 +-
 .../cassandra/streaming/StreamingTransferTest.java |   36 +++++++++++++++
 4 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f1fb434/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c26014..2124b15 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,7 +2,8 @@
  * Include fatal errors in trace events (CASSANDRA-5447)
  * Ensure that PerRowSecondaryIndex is notified of row-level deletes
    (CASSANDRA-5445)
-  * Allow empty blob literals in CQL3 (CASSANDRA-5452)
+ * Allow empty blob literals in CQL3 (CASSANDRA-5452)
+ * Fix streaming RangeTombstones at column index boundary (CASSANDRA-5418)
 Merged from 1.1:
  * Fix trying to load deleted row into row cache on startup (CASSANDRA-4463)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f1fb434/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index bd1c35a..bcd0eef 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -68,13 +68,22 @@ public class ColumnIndex
         public Builder(ColumnFamily cf,
                        ByteBuffer key,
                        int estimatedColumnCount,
-                       DataOutput output)
+                       DataOutput output,
+                       boolean fromStream)
         {
             this.indexOffset = rowHeaderSize(key, cf.deletionInfo());
             this.result = new ColumnIndex(estimatedColumnCount);
             this.output = output;
             this.atomSerializer = cf.getOnDiskSerializer();
-            this.tombstoneTracker = new RangeTombstone.Tracker(cf.getComparator());
+            this.tombstoneTracker = fromStream ? null : new RangeTombstone.Tracker(cf.getComparator());
+        }
+
+        public Builder(ColumnFamily cf,
+                       ByteBuffer key,
+                       int estimatedColumnCount,
+                       DataOutput output)
+        {
+            this(cf, key, estimatedColumnCount, output, false);
         }
 
         /**
@@ -99,7 +108,7 @@ public class ColumnIndex
 
         public int writtenAtomCount()
         {
-            return atomCount + tombstoneTracker.writtenAtom();
+            return tombstoneTracker == null ? atomCount : atomCount + tombstoneTracker.writtenAtom();
         }
 
         /**
@@ -153,11 +162,11 @@ public class ColumnIndex
             {
                 firstColumn = column;
                 startPosition = endPosition;
-                // TODO: have that use the firstColumn as min + make sure we
-                // optimize that on read
-                endPosition += tombstoneTracker.writeOpenedMarker(firstColumn, output, atomSerializer);
+                // TODO: have that use the firstColumn as min + make sure we optimize that
on read
+                if (tombstoneTracker != null)
+                    endPosition += tombstoneTracker.writeOpenedMarker(firstColumn, output,
atomSerializer);
                 blockSize = 0; // We don't count repeated tombstone marker in the block size,
to avoid a situation
-                               // where we wouldn't make any problem because a block is filled
by said marker
+                               // where we wouldn't make any progress because a block is
filled by said marker
             }
 
             long size = column.serializedSizeForSSTable();
@@ -177,7 +186,8 @@ public class ColumnIndex
                 atomSerializer.serializeForSSTable(column, output);
 
             // TODO: Should deal with removing unneeded tombstones
-            tombstoneTracker.update(column);
+            if (tombstoneTracker != null)
+                tombstoneTracker.update(column);
 
             lastColumn = column;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f1fb434/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index e05a34e..c64fd27 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -240,7 +240,7 @@ public class SSTableWriter extends SSTable
         ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory());
         cf.delete(deletionInfo);
 
-        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, columnCount,
dataFile.stream);
+        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, columnCount,
dataFile.stream, true);
         OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer();
         for (int i = 0; i < columnCount; i++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f1fb434/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 502e5d7..2befe45 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -26,9 +26,11 @@ import static org.apache.cassandra.Util.column;
 import static org.apache.cassandra.Util.addMutation;
 
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.context.CounterContext;
@@ -126,6 +128,40 @@ public class StreamingTransferTest extends SchemaLoader
         session.await();
     }
 
+    /**
+     * Test to make sure RangeTombstones at column index boundary transferred correctly.
+     */
+    @Test
+    public void testTransferRangeTombstones() throws Exception
+    {
+        String ks = "Keyspace1";
+        String cfname = "StandardInteger1";
+        Table table = Table.open(ks);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfname);
+
+        String key = "key1";
+        RowMutation rm = new RowMutation(ks, ByteBufferUtil.bytes(key));
+        // add columns of size slightly less than column_index_size to force insert column
index
+        rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes(1)), ByteBuffer.wrap(new
byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
+        rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes(6)), ByteBuffer.wrap(new
byte[DatabaseDescriptor.getColumnIndexSize()]), 2);
+        ColumnFamily cf = rm.addOrGet(cfname);
+        // add RangeTombstones
+        cf.delete(new DeletionInfo(ByteBufferUtil.bytes(2), ByteBufferUtil.bytes(3), cf.getComparator(),
1, (int) (System.currentTimeMillis() / 1000)));
+        cf.delete(new DeletionInfo(ByteBufferUtil.bytes(5), ByteBufferUtil.bytes(7), cf.getComparator(),
1, (int) (System.currentTimeMillis() / 1000)));
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+        cfs.clearUnsafe();
+        transfer(table, sstable);
+
+        // confirm that a single SSTable was transferred and registered
+        assertEquals(1, cfs.getSSTables().size());
+
+        List<Row> rows = Util.getRangeSlice(cfs);
+        assertEquals(1, rows.size());
+    }
+
     @Test
     public void testTransferTable() throws Exception
     {


Mime
View raw message