cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [05/20] cassandra git commit: Fix commit log replay after out-of-order flush completion.
Date Thu, 12 May 2016 13:23:10 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 30ed85b..ca7fe82 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.FileDataInput;
@@ -49,7 +50,7 @@ public class MetadataSerializer implements IMetadataSerializer
 {
     private static final Logger logger = LoggerFactory.getLogger(MetadataSerializer.class);
 
-    public void serialize(Map<MetadataType, MetadataComponent> components, DataOutputPlus
out) throws IOException
+    public void serialize(Map<MetadataType, MetadataComponent> components, Version
version, DataOutputPlus out) throws IOException
     {
         // sort components by type
         List<MetadataComponent> sortedComponents = Lists.newArrayList(components.values());
@@ -66,12 +67,12 @@ public class MetadataSerializer implements IMetadataSerializer
             out.writeInt(type.ordinal());
             // serialize position
             out.writeInt(lastPosition);
-            lastPosition += type.serializer.serializedSize(component);
+            lastPosition += type.serializer.serializedSize(component, version);
         }
         // serialize components
         for (MetadataComponent component : sortedComponents)
         {
-            component.getType().serializer.serialize(component, out);
+            component.getType().serializer.serialize(component, version, out);
         }
     }
 
@@ -153,7 +154,7 @@ public class MetadataSerializer implements IMetadataSerializer
 
         try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
         {
-            serialize(currentComponents, out);
+            serialize(currentComponents, descriptor.version, out);
             out.flush();
         }
         // we cant move a file on top of another file in windows:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index f2eb1af..3d48e34 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -43,7 +43,8 @@ public class StatsMetadata extends MetadataComponent
 
     public final EstimatedHistogram estimatedRowSize;
     public final EstimatedHistogram estimatedColumnCount;
-    public final ReplayPosition replayPosition;
+    public final ReplayPosition commitLogLowerBound;
+    public final ReplayPosition commitLogUpperBound;
     public final long minTimestamp;
     public final long maxTimestamp;
     public final int maxLocalDeletionTime;
@@ -57,7 +58,8 @@ public class StatsMetadata extends MetadataComponent
 
     public StatsMetadata(EstimatedHistogram estimatedRowSize,
                          EstimatedHistogram estimatedColumnCount,
-                         ReplayPosition replayPosition,
+                         ReplayPosition commitLogLowerBound,
+                         ReplayPosition commitLogUpperBound,
                          long minTimestamp,
                          long maxTimestamp,
                          int maxLocalDeletionTime,
@@ -71,7 +73,8 @@ public class StatsMetadata extends MetadataComponent
     {
         this.estimatedRowSize = estimatedRowSize;
         this.estimatedColumnCount = estimatedColumnCount;
-        this.replayPosition = replayPosition;
+        this.commitLogLowerBound = commitLogLowerBound;
+        this.commitLogUpperBound = commitLogUpperBound;
         this.minTimestamp = minTimestamp;
         this.maxTimestamp = maxTimestamp;
         this.maxLocalDeletionTime = maxLocalDeletionTime;
@@ -117,7 +120,8 @@ public class StatsMetadata extends MetadataComponent
     {
         return new StatsMetadata(estimatedRowSize,
                                  estimatedColumnCount,
-                                 replayPosition,
+                                 commitLogLowerBound,
+                                 commitLogUpperBound,
                                  minTimestamp,
                                  maxTimestamp,
                                  maxLocalDeletionTime,
@@ -134,7 +138,8 @@ public class StatsMetadata extends MetadataComponent
     {
         return new StatsMetadata(estimatedRowSize,
                                  estimatedColumnCount,
-                                 replayPosition,
+                                 commitLogLowerBound,
+                                 commitLogUpperBound,
                                  minTimestamp,
                                  maxTimestamp,
                                  maxLocalDeletionTime,
@@ -157,7 +162,8 @@ public class StatsMetadata extends MetadataComponent
         return new EqualsBuilder()
                        .append(estimatedRowSize, that.estimatedRowSize)
                        .append(estimatedColumnCount, that.estimatedColumnCount)
-                       .append(replayPosition, that.replayPosition)
+                       .append(commitLogLowerBound, that.commitLogLowerBound)
+                       .append(commitLogUpperBound, that.commitLogUpperBound)
                        .append(minTimestamp, that.minTimestamp)
                        .append(maxTimestamp, that.maxTimestamp)
                        .append(maxLocalDeletionTime, that.maxLocalDeletionTime)
@@ -177,7 +183,8 @@ public class StatsMetadata extends MetadataComponent
         return new HashCodeBuilder()
                        .append(estimatedRowSize)
                        .append(estimatedColumnCount)
-                       .append(replayPosition)
+                       .append(commitLogLowerBound)
+                       .append(commitLogUpperBound)
                        .append(minTimestamp)
                        .append(maxTimestamp)
                        .append(maxLocalDeletionTime)
@@ -193,12 +200,12 @@ public class StatsMetadata extends MetadataComponent
 
     public static class StatsMetadataSerializer implements IMetadataComponentSerializer<StatsMetadata>
     {
-        public int serializedSize(StatsMetadata component) throws IOException
+        public int serializedSize(StatsMetadata component, Version version) throws IOException
         {
             int size = 0;
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize,
TypeSizes.NATIVE);
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount,
TypeSizes.NATIVE);
-            size += ReplayPosition.serializer.serializedSize(component.replayPosition, TypeSizes.NATIVE);
+            size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound,
TypeSizes.NATIVE);
             size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int),
compressionRatio(double), repairedAt (long)
             size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime,
TypeSizes.NATIVE);
             size += TypeSizes.NATIVE.sizeof(component.sstableLevel);
@@ -211,14 +218,16 @@ public class StatsMetadata extends MetadataComponent
             for (ByteBuffer columnName : component.maxColumnNames)
                 size += 2 + columnName.remaining(); // with short length
             size += TypeSizes.NATIVE.sizeof(component.hasLegacyCounterShards);
+            if (version.hasCommitLogLowerBound())
+                size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound,
TypeSizes.NATIVE);
             return size;
         }
 
-        public void serialize(StatsMetadata component, DataOutputPlus out) throws IOException
+        public void serialize(StatsMetadata component, Version version, DataOutputPlus out)
throws IOException
         {
             EstimatedHistogram.serializer.serialize(component.estimatedRowSize, out);
             EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
-            ReplayPosition.serializer.serialize(component.replayPosition, out);
+            ReplayPosition.serializer.serialize(component.commitLogUpperBound, out);
             out.writeLong(component.minTimestamp);
             out.writeLong(component.maxTimestamp);
             out.writeInt(component.maxLocalDeletionTime);
@@ -233,13 +242,16 @@ public class StatsMetadata extends MetadataComponent
             for (ByteBuffer columnName : component.maxColumnNames)
                 ByteBufferUtil.writeWithShortLength(columnName, out);
             out.writeBoolean(component.hasLegacyCounterShards);
+            if (version.hasCommitLogLowerBound())
+                ReplayPosition.serializer.serialize(component.commitLogLowerBound, out);
         }
 
         public StatsMetadata deserialize(Version version, DataInput in) throws IOException
         {
             EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
             EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
-            ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+            ReplayPosition commitLogLowerBound = ReplayPosition.NONE, commitLogUpperBound;
+            commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
             long minTimestamp = in.readLong();
             long maxTimestamp = in.readLong();
             int maxLocalDeletionTime = in.readInt();
@@ -264,9 +276,12 @@ public class StatsMetadata extends MetadataComponent
             if (version.tracksLegacyCounterShards())
                 hasLegacyCounterShards = in.readBoolean();
 
+            if (version.hasCommitLogLowerBound())
+                commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
             return new StatsMetadata(rowSizes,
                                      columnCounts,
-                                     replayPosition,
+                                     commitLogLowerBound,
+                                     commitLogUpperBound,
                                      minTimestamp,
                                      maxTimestamp,
                                      maxLocalDeletionTime,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
index 603732b..4ca078b 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
@@ -71,12 +71,12 @@ public class ValidationMetadata extends MetadataComponent
 
     public static class ValidationMetadataSerializer implements IMetadataComponentSerializer<ValidationMetadata>
     {
-        public int serializedSize(ValidationMetadata component) throws IOException
+        public int serializedSize(ValidationMetadata component, Version version) throws IOException
         {
             return TypeSizes.NATIVE.sizeof(component.partitioner) + 8;
         }
 
-        public void serialize(ValidationMetadata component, DataOutputPlus out) throws IOException
+        public void serialize(ValidationMetadata component, Version version, DataOutputPlus
out) throws IOException
         {
             out.writeUTF(component.partitioner);
             out.writeDouble(component.bloomFilterFPChance);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
deleted file mode 100644
index 1a15d6f..0000000
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public abstract class DiskAwareRunnable extends WrappedRunnable
-{
-    protected Directories.DataDirectory getWriteDirectory(long writeSize)
-    {
-        Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize);
-        if (directory == null)
-            throw new FSWriteError(new IOException("Insufficient disk space to write " +
writeSize + " bytes"), "");
-
-        return directory;
-    }
-
-    /**
-     * Get sstable directories for the CF.
-     * @return Directories instance for the CF.
-     */
-    protected abstract Directories getDirectories();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index dd56b8b..6911ec6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -122,14 +122,11 @@ public class StreamReceiveTask extends StreamTask
                     for (SSTableWriter writer : task.sstables)
                         writer.abort();
                     task.sstables.clear();
-                    task.session.taskCompleted(task);
                     return;
                 }
                 ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
                 File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size()
* 256L);
-                if (lockfiledir == null)
-                    throw new IOError(new IOException("All disks full"));
                 StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
                 lockfile.create(task.sstables);
                 List<SSTableReader> readers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 2665f40..8319014 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -70,7 +70,8 @@ public class SSTableMetadataViewer
                     out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int)
(System.currentTimeMillis() / 1000)));
                     out.printf("SSTable Level: %d%n", stats.sstableLevel);
                     out.printf("Repaired at: %d%n", stats.repairedAt);
-                    out.println(stats.replayPosition);
+                    out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound);
+                    out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound);
                     out.println("Estimated tombstone drop times:");
                     for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 1b4edee..f9b4156 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -452,7 +452,7 @@ public class CommitLogStressTest
         int cells = 0;
 
         @Override
-        void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final
CommitLogDescriptor desc)
+        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final
CommitLogDescriptor desc)
         {
             if (desc.id < discardedPos.segment)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index c377a21..0c46061 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -59,7 +59,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
     }
 
     @Override
-    void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor
desc)
+    void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor
desc)
     {
         FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
         Mutation mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 93b88b4..adeb778 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.MockSchema;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.SSTableDeletingTask;
@@ -263,15 +264,15 @@ public class TrackerTest
         Tracker tracker = cfs.getTracker();
         tracker.subscribe(listener);
 
-        Memtable prev1 = tracker.switchMemtable(true);
+        Memtable prev1 = tracker.switchMemtable(true, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()),
cfs));
         OpOrder.Group write1 = cfs.keyspace.writeOrder.getCurrent();
         OpOrder.Barrier barrier1 = cfs.keyspace.writeOrder.newBarrier();
-        prev1.setDiscarding(barrier1, new AtomicReference<ReplayPosition>());
+        prev1.setDiscarding(barrier1, new AtomicReference<>(CommitLog.instance.getContext()));
         barrier1.issue();
-        Memtable prev2 = tracker.switchMemtable(false);
+        Memtable prev2 = tracker.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()),
cfs));
         OpOrder.Group write2 = cfs.keyspace.writeOrder.getCurrent();
         OpOrder.Barrier barrier2 = cfs.keyspace.writeOrder.newBarrier();
-        prev2.setDiscarding(barrier2, new AtomicReference<ReplayPosition>());
+        prev2.setDiscarding(barrier2, new AtomicReference<>(CommitLog.instance.getContext()));
         barrier2.issue();
         Memtable cur = tracker.getView().getCurrentMemtable();
         OpOrder.Group writecur = cfs.keyspace.writeOrder.getCurrent();
@@ -297,6 +298,9 @@ public class TrackerTest
         SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
         tracker.replaceFlushed(prev2, reader);
         Assert.assertEquals(1, tracker.getView().sstables.size());
+        Assert.assertEquals(1, tracker.getView().premature.size());
+        tracker.permitCompactionOfFlushed(reader);
+        Assert.assertEquals(0, tracker.getView().premature.size());
         Assert.assertEquals(1, listener.received.size());
         Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
         listener.received.clear();
@@ -307,16 +311,17 @@ public class TrackerTest
         tracker = cfs.getTracker();
         listener = new MockListener(false);
         tracker.subscribe(listener);
-        prev1 = tracker.switchMemtable(false);
+        prev1 = tracker.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()),
cfs));
         tracker.markFlushing(prev1);
         reader = MockSchema.sstable(0, 10, true, cfs);
         cfs.invalidate(false);
         tracker.replaceFlushed(prev1, reader);
+        tracker.permitCompactionOfFlushed(reader);
         Assert.assertEquals(0, tracker.getView().sstables.size());
         Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
         Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
-        Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
-        Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(1)).removed.size());
+        Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(0)).removed.size());
+        Assert.assertEquals(reader, (((SSTableDeletingNotification) listener.received.get(1)).deleting));
         DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 32a81e2..5706598 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -208,6 +208,6 @@ public class ViewTest
         for (int i = 0 ; i < sstableCount ; i++)
             sstables.add(MockSchema.sstable(i, cfs));
         return new View(ImmutableList.copyOf(memtables), Collections.<Memtable>emptyList(),
Helpers.identityMap(sstables),
-                        Collections.<SSTableReader>emptySet(), SSTableIntervalTree.build(sstables));
+                        Collections.<SSTableReader>emptySet(), Collections.<SSTableReader>emptySet(),
SSTableIntervalTree.build(sstables));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 931422b..3bef89e 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -113,7 +113,7 @@ public class CompressedRandomAccessReaderTest
         ChannelProxy channel = new ChannelProxy(f);
         try
         {
-            MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
             SequentialWriter writer = compressed
                 ? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance),
sstableMetadataCollector)
                 : SequentialWriter.open(f);
@@ -166,7 +166,7 @@ public class CompressedRandomAccessReaderTest
         File metadata = new File(file.getPath() + ".meta");
         metadata.deleteOnExit();
 
-        MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
         try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(),
new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector))
         {
             writer.write(CONTENT.getBytes());
@@ -251,7 +251,7 @@ public class CompressedRandomAccessReaderTest
         File metadata = new File(file.getPath() + ".meta");
         metadata.deleteOnExit();
 
-        MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
         try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(),
new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector))
         {
             writer.write(CONTENT.getBytes());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 27b866d..43c44fd 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -85,7 +85,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
 
         try
         {
-            MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
             byte[] dataPre = new byte[bytesToTest];
             byte[] rawPost = new byte[bytesToTest];
             try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename
+ ".metadata", new CompressionParameters(compressor), sstableMetadataCollector);)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index eda4f17..19fa7c4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.sstable.metadata;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.EnumSet;
@@ -27,12 +28,16 @@ import java.util.Set;
 import com.google.common.collect.Sets;
 
 import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.RandomAccessReader;
@@ -45,18 +50,51 @@ public class MetadataSerializerTest
     @Test
     public void testSerialization() throws IOException
     {
+        Map<MetadataType, MetadataComponent> originalMetadata = constructMetadata();
+
+        MetadataSerializer serializer = new MetadataSerializer();
+        File statsFile = serialize(originalMetadata, serializer, BigFormat.latestVersion);
+
+        Descriptor desc = new Descriptor( statsFile.getParentFile(), "", "", 0, Descriptor.Type.FINAL);
+        try (RandomAccessReader in = RandomAccessReader.open(statsFile))
+        {
+            Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc,
in, EnumSet.allOf(MetadataType.class));
+
+            for (MetadataType type : MetadataType.values())
+            {
+                assertEquals(originalMetadata.get(type), deserialized.get(type));
+            }
+        }
+    }
+
+    public File serialize(Map<MetadataType, MetadataComponent> metadata, MetadataSerializer
serializer, Version version)
+            throws IOException, FileNotFoundException
+    {
+        // Serialize to tmp file
+        File statsFile = File.createTempFile(Component.STATS.name, null);
+        try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(statsFile)))
+        {
+            serializer.serialize(metadata, version, out);
+        }
+        return statsFile;
+    }
+
+    public Map<MetadataType, MetadataComponent> constructMetadata()
+    {
         EstimatedHistogram rowSizes = new EstimatedHistogram(new long[] { 1L, 2L },
                                                              new long[] { 3L, 4L, 5L });
         EstimatedHistogram columnCounts = new EstimatedHistogram(new long[] { 6L, 7L },
                                                                  new long[] { 8L, 9L, 10L
});
-        ReplayPosition rp = new ReplayPosition(11L, 12);
+        ReplayPosition start = new ReplayPosition(11L, 12);
+        ReplayPosition end = new ReplayPosition(15L, 9);
         long minTimestamp = 2162517136L;
         long maxTimestamp = 4162517136L;
 
         MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance))
                                                       .estimatedRowSize(rowSizes)
                                                       .estimatedColumnCount(columnCounts)
-                                                      .replayPosition(rp);
+                                                      .commitLogLowerBound(start)
+                                                      .commitLogUpperBound(end);
         collector.updateMinTimestamp(minTimestamp);
         collector.updateMaxTimestamp(maxTimestamp);
 
@@ -67,23 +105,35 @@ public class MetadataSerializerTest
         String partitioner = RandomPartitioner.class.getCanonicalName();
         double bfFpChance = 0.1;
         Map<MetadataType, MetadataComponent> originalMetadata = collector.finalizeMetadata(partitioner,
bfFpChance, 0);
+        return originalMetadata;
+    }
+
+    @Test
+    public void testLaReadsLb() throws IOException
+    {
+        Map<MetadataType, MetadataComponent> originalMetadata = constructMetadata();
 
         MetadataSerializer serializer = new MetadataSerializer();
-        // Serialize to tmp file
-        File statsFile = File.createTempFile(Component.STATS.name, null);
-        try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(statsFile)))
-        {
-            serializer.serialize(originalMetadata, out);
-        }
+        // Write metadata in two minor formats.
+        File statsFileLb = serialize(originalMetadata, serializer, BigFormat.instance.getVersion("lb"));
+        File statsFileLa = serialize(originalMetadata, serializer, BigFormat.instance.getVersion("la"));
 
-        Descriptor desc = new Descriptor( statsFile.getParentFile(), "", "", 0, Descriptor.Type.FINAL);
-        try (RandomAccessReader in = RandomAccessReader.open(statsFile))
+        // Reading both as earlier version should yield identical results.
+        Descriptor desc = new Descriptor("la", statsFileLb.getParentFile(), "", "", 0, Descriptor.Type.FINAL,
DatabaseDescriptor.getSSTableFormat());
+        try (RandomAccessReader inLb = RandomAccessReader.open(statsFileLb);
+             RandomAccessReader inLa = RandomAccessReader.open(statsFileLa))
         {
-            Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc,
in, EnumSet.allOf(MetadataType.class));
+            Map<MetadataType, MetadataComponent> deserializedLb = serializer.deserialize(desc,
inLb, EnumSet.allOf(MetadataType.class));
+            Map<MetadataType, MetadataComponent> deserializedLa = serializer.deserialize(desc,
inLa, EnumSet.allOf(MetadataType.class));
 
             for (MetadataType type : MetadataType.values())
             {
-                assertEquals(originalMetadata.get(type), deserialized.get(type));
+                assertEquals(deserializedLa.get(type), deserializedLb.get(type));
+                if (!originalMetadata.get(type).equals(deserializedLb.get(type)))
+                {
+                    // Currently only STATS can be different. Change if no longer the case
+                    assertEquals(MetadataType.STATS, type);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
index 8409a26..1f66fb7 100644
--- a/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
@@ -157,7 +157,7 @@ public class IntervalTreeTest
                 public String deserialize(DataInput in) throws IOException { return in.readUTF();
}
                 public long serializedSize(String v, TypeSizes s) { return v.length(); }
             },
-            (Constructor<Interval<Integer, String>>) (Object) Interval.class.getConstructor(Object.class,
Object.class, Object.class)
+            (Constructor<Interval<Integer, String>>) (Constructor<?>) Interval.class.getConstructor(Object.class,
Object.class, Object.class)
         );
 
         DataOutputBuffer out = new DataOutputBuffer();


Mime
View raw message