Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8252C1752A for ; Thu, 23 Oct 2014 15:11:49 +0000 (UTC) Received: (qmail 17622 invoked by uid 500); 23 Oct 2014 15:11:49 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 17468 invoked by uid 500); 23 Oct 2014 15:11:49 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 17175 invoked by uid 99); 23 Oct 2014 15:11:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Oct 2014 15:11:48 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9F5FB16791; Thu, 23 Oct 2014 15:11:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jake@apache.org To: commits@cassandra.apache.org Date: Thu, 23 Oct 2014 15:11:49 -0000 Message-Id: <8785f9e02c724c6380985f830062f230@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/8] Extend Descriptor to include a format value and refactor reader/writer apis http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java new file mode 100644 index 0000000..07d867d --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.IOException; + +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.OnDiskAtom; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileDataInput; + +/** + * A Cell Iterator over SSTable + */ +class SSTableSliceIterator implements OnDiskAtomIterator +{ + private final OnDiskAtomIterator reader; + private final DecoratedKey key; + + public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ColumnSlice[] slices, boolean reversed) + { + this.key = key; + RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); + this.reader = indexEntry == null ? null : createReader(sstable, indexEntry, null, slices, reversed); + } + + /** + * An iterator for a slice within an SSTable + * @param sstable Keyspace for the CFS we are reading from + * @param file Optional parameter that input is read from. If null is passed, this class creates an appropriate one automatically. + * If this class creates, it will close the underlying file when #close() is called. + * If a caller passes a non-null argument, this class will NOT close the underlying file when the iterator is closed (i.e. the caller is responsible for closing the file) + * In all cases the caller should explicitly #close() this iterator. + * @param key The key the requested slice resides under + * @param slices the column slices + * @param reversed Results are returned in reverse order iff reversed is true. + * @param indexEntry position of the row + */ + public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry) + { + this.key = key; + reader = createReader(sstable, indexEntry, file, slices, reversed); + } + + private static OnDiskAtomIterator createReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput file, ColumnSlice[] slices, boolean reversed) + { + return slices.length == 1 && slices[0].start.isEmpty() && !reversed + ? new SimpleSliceReader(sstable, indexEntry, file, slices[0].finish) + : new IndexedSliceReader(sstable, indexEntry, file, slices, reversed); + } + + public DecoratedKey getKey() + { + return key; + } + + public ColumnFamily getColumnFamily() + { + return reader == null ? null : reader.getColumnFamily(); + } + + public boolean hasNext() + { + return reader != null && reader.hasNext(); + } + + public OnDiskAtom next() + { + return reader.next(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + public void close() throws IOException + { + if (reader != null) + reader.close(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java new file mode 100644 index 0000000..9fec303 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.IOException; +import java.util.Iterator; + +import com.google.common.collect.AbstractIterator; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.composites.CellNameType; +import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ByteBufferUtil; + +class SimpleSliceReader extends AbstractIterator implements OnDiskAtomIterator +{ + private static final Logger logger = LoggerFactory.getLogger(SimpleSliceReader.class); + + private final FileDataInput file; + private final boolean needsClosing; + private final Composite finishColumn; + private final CellNameType comparator; + private final ColumnFamily emptyColumnFamily; + private final Iterator atomIterator; + + SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, Composite finishColumn) + { + Tracing.trace("Seeking to partition beginning in data file"); + this.finishColumn = finishColumn; + this.comparator = sstable.metadata.comparator; + try + { + if (input == null) + { + this.file = sstable.getFileDataInput(indexEntry.position); + this.needsClosing = true; + } + else + { + this.file = input; + input.seek(indexEntry.position); + this.needsClosing = false; + } + + // Skip key and data size + ByteBufferUtil.skipShortLength(file); + + emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata); + emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file)); + atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, sstable.descriptor.version); + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + } + + protected OnDiskAtom computeNext() + { + if (!atomIterator.hasNext()) + return endOfData(); + + OnDiskAtom column = atomIterator.next(); + if (!finishColumn.isEmpty() && comparator.compare(column.name(), finishColumn) > 0) + return endOfData(); + + return column; + } + + public ColumnFamily getColumnFamily() + { + return emptyColumnFamily; + } + + public void close() throws IOException + { + if (needsClosing) + file.close(); + } + + public DecoratedKey getKey() + { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java index f801dac..dd879c4 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java @@ -27,6 +27,7 @@ import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; @@ -91,7 +92,7 @@ public class CompactionMetadata extends MetadataComponent ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out); } - public CompactionMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException + public CompactionMetadata deserialize(Version version, DataInput in) throws IOException { int nbAncestors = in.readInt(); Set ancestors = new HashSet<>(nbAncestors); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java index 49ae378..018d4a0 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputPlus; /** @@ -55,5 +56,5 @@ public interface IMetadataComponentSerializer * @return Deserialized component * @throws IOException */ - T deserialize(Descriptor.Version version, DataInput in) throws IOException; + T deserialize(Version version, DataInput in) throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 7ba2895..152614d 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -37,7 +37,7 @@ import org.apache.cassandra.io.sstable.ColumnNameHelper; import org.apache.cassandra.io.sstable.ColumnStats; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.EstimatedHistogram; import org.apache.cassandra.utils.MurmurHash; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index a557b88..a501518 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -235,7 +236,7 @@ public class StatsMetadata extends MetadataComponent out.writeBoolean(component.hasLegacyCounterShards); } - public StatsMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException + public StatsMetadata deserialize(Version version, DataInput in) throws IOException { EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in); EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in); @@ -247,7 +248,7 @@ public class StatsMetadata extends MetadataComponent StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in); int sstableLevel = in.readInt(); long repairedAt = 0; - if (version.hasRepairedAt) + if (version.hasRepairedAt()) repairedAt = in.readLong(); int colCount = in.readInt(); @@ -261,7 +262,7 @@ public class StatsMetadata extends MetadataComponent maxColumnNames.add(ByteBufferUtil.readWithShortLength(in)); boolean hasLegacyCounterShards = true; - if (version.tracksLegacyCounterShards) + if (version.tracksLegacyCounterShards()) hasLegacyCounterShards = in.readBoolean(); return new StatsMetadata(rowSizes, http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java index e00c55c..aed6820 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputPlus; /** @@ -82,7 +83,7 @@ public class ValidationMetadata extends MetadataComponent out.writeDouble(component.bloomFilterFPChance); } - public ValidationMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException + public ValidationMetadata deserialize(Version version, DataInput in) throws IOException { return new ValidationMetadata(in.readUTF(), in.readDouble()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/AbstractDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java index 2815260..588540d 100644 --- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java +++ b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java @@ -21,9 +21,9 @@ import java.io.*; public abstract class AbstractDataInput extends InputStream implements DataInput { - protected abstract void seek(long position) throws IOException; - protected abstract long getPosition(); - protected abstract long getPositionLimit(); + public abstract void seek(long position) throws IOException; + public abstract long getPosition(); + public abstract long getPositionLimit(); public int skipBytes(int n) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index 797b964..4140c95 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -51,7 +51,7 @@ public class DataIntegrityMetadata public ChecksumValidator(Descriptor descriptor) throws IOException { this.descriptor = descriptor; - checksum = descriptor.version.hasAllAdlerChecksums ? new Adler32() : new PureJavaCrc32(); + checksum = descriptor.version.hasAllAdlerChecksums() ? new Adler32() : new PureJavaCrc32(); reader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))); chunkSize = reader.readInt(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/FileDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileDataInput.java b/src/java/org/apache/cassandra/io/util/FileDataInput.java index d94075c..55809ad 100644 --- a/src/java/org/apache/cassandra/io/util/FileDataInput.java +++ b/src/java/org/apache/cassandra/io/util/FileDataInput.java @@ -20,6 +20,7 @@ package org.apache.cassandra.io.util; import java.io.Closeable; import java.io.DataInput; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; public interface FileDataInput extends DataInput, Closeable http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index e590918..3567465 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -17,13 +17,7 @@ */ package org.apache.cassandra.io.util; -import java.io.Closeable; -import java.io.DataInput; -import java.io.EOFException; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; +import java.io.*; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.file.AtomicMoveNotSupportedException; @@ -442,4 +436,25 @@ public class FileUtils } return length; } + + + public static void copyTo(DataInput in, OutputStream out, int length) throws IOException + { + byte[] buffer = new byte[64 * 1024]; + int copiedBytes = 0; + + while (copiedBytes + buffer.length < length) + { + in.readFully(buffer); + out.write(buffer); + copiedBytes += buffer.length; + } + + if (copiedBytes < length) + { + int left = length - copiedBytes; + in.readFully(buffer, 0, left); + out.write(buffer, 0, left); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java index 0479256..574a7fb 100644 --- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java +++ b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java @@ -65,12 +65,12 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn return segmentOffset + position; } - protected long getPosition() + public long getPosition() { return segmentOffset + position; } - protected long getPositionLimit() + public long getPositionLimit() { return segmentOffset + buffer.capacity(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/MemoryInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java index 73ccc1b..45261e0 100644 --- a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java +++ b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java @@ -41,17 +41,17 @@ public class MemoryInputStream extends AbstractDataInput implements DataInput position += count; } - protected void seek(long pos) + public void seek(long pos) { position = (int) pos; } - protected long getPosition() + public long getPosition() { return position; } - protected long getPositionLimit() + public long getPositionLimit() { return mem.size(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java index 8ab432e..4e6f856 100644 --- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.EstimatedHistogram; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index de3c125..72d8d6e 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -74,7 +74,8 @@ public final class MessagingService implements MessagingServiceMBean public static final int VERSION_12 = 6; public static final int VERSION_20 = 7; public static final int VERSION_21 = 8; - public static final int current_version = VERSION_21; + public static final int VERSION_30 = 9; + public static final int current_version = VERSION_30; public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC"; public static final byte[] ONE_BYTE = new byte[1]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java index e4aff96..15230ea 100644 --- a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java +++ b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.notifications; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; public class SSTableAddedNotification implements INotification { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/notifications/SSTableDeletingNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableDeletingNotification.java b/src/java/org/apache/cassandra/notifications/SSTableDeletingNotification.java index 8b0f597..dcaa3b5 100644 --- a/src/java/org/apache/cassandra/notifications/SSTableDeletingNotification.java +++ b/src/java/org/apache/cassandra/notifications/SSTableDeletingNotification.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.notifications; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; /** * Fired right before removing an SSTable. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java index c36583c..7ca574b 100644 --- a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java +++ b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java @@ -19,9 +19,8 @@ package org.apache.cassandra.notifications; import java.util.Collection; -import org.apache.cassandra.io.sstable.SSTableReader; - import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.format.SSTableReader; public class SSTableListChangedNotification implements INotification { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java index a473a43..d1398bc 100644 --- a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java +++ b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java @@ -20,7 +20,7 @@ package org.apache.cassandra.notifications; import java.util.Collection; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; public class SSTableRepairStatusChanged implements INotification { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 04a27af..2ad8dc2 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -24,6 +24,7 @@ import java.util.UUID; import java.util.concurrent.Future; import com.google.common.base.Predicate; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +36,6 @@ import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 670aa0b..fa354e6 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -29,6 +29,8 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; + +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +43,6 @@ import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 1b93c2c..2ffd954 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -33,6 +33,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.util.concurrent.Futures; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,6 @@ import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -472,7 +472,8 @@ public class CacheService implements CacheServiceMBean out.writeInt(desc.generation); out.writeBoolean(true); CFMetaData cfm = Schema.instance.getCFMetaData(key.desc.ksname, key.desc.cfname); - cfm.comparator.rowIndexEntrySerializer().serialize(entry, out); + + key.desc.getFormat().getIndexSerializer(cfm).serialize(entry, out); } public Future> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException @@ -492,7 +493,7 @@ public class CacheService implements CacheServiceMBean RowIndexEntry.Serializer.skipPromotedIndex(input); return null; } - RowIndexEntry entry = reader.metadata.comparator.rowIndexEntrySerializer().deserialize(input, reader.descriptor.version); + RowIndexEntry entry = reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input, reader.descriptor.version); return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamLockfile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamLockfile.java b/src/java/org/apache/cassandra/streaming/StreamLockfile.java index d00842a..a0cf5fc 100644 --- a/src/java/org/apache/cassandra/streaming/StreamLockfile.java +++ b/src/java/org/apache/cassandra/streaming/StreamLockfile.java @@ -28,12 +28,12 @@ import java.util.List; import java.util.UUID; import com.google.common.base.Charsets; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.util.FileUtils; /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 34cbf02..8c4efcd 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -17,16 +17,18 @@ */ package org.apache.cassandra.streaming; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.Collection; import java.util.UUID; import com.google.common.base.Throwables; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.FileUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,15 +39,14 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.messages.FileMessageHeader; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.BytesReadTracker; import org.apache.cassandra.utils.Pair; + /** * StreamReader reads from stream and writes to SSTable. */ @@ -56,8 +57,9 @@ public class StreamReader protected final long estimatedKeys; protected final Collection> sections; protected final StreamSession session; - protected final Descriptor.Version inputVersion; + protected final Version inputVersion; protected final long repairedAt; + protected final SSTableFormat.Type format; protected final int sstableLevel; protected Descriptor desc; @@ -68,8 +70,9 @@ public class StreamReader this.cfId = header.cfId; this.estimatedKeys = header.estimatedKeys; this.sections = header.sections; - this.inputVersion = new Descriptor.Version(header.version); + this.inputVersion = header.format.info.getVersion(header.version); this.repairedAt = header.repairedAt; + this.format = header.format; this.sstableLevel = header.sstableLevel; } @@ -91,7 +94,8 @@ public class StreamReader } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - SSTableWriter writer = createWriter(cfs, totalSize, repairedAt); + SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format); + DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); BytesReadTracker in = new BytesReadTracker(dis); try @@ -99,12 +103,12 @@ public class StreamReader while (in.getBytesRead() < totalSize) { writeRow(writer, in, cfs); + // TODO move this to BytesReadTracker session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } return writer; - } - catch (Throwable e) + } catch (Throwable e) { writer.abort(); drain(dis, in.getBytesRead()); @@ -115,13 +119,14 @@ public class StreamReader } } - protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt) throws IOException + protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException { Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(); if (localDir == null) throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir))); - return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys, repairedAt, sstableLevel); + desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir), format)); + + return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel); } protected void drain(InputStream dis, long bytesRead) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index f26e439..a6db58c 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -28,8 +28,10 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 48b88c4..8bb34a6 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -25,6 +25,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,6 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.messages.*; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 18058c1..a3dd10f 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -23,7 +23,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.Pair; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java index 43bc26a..93903a7 100644 --- a/src/java/org/apache/cassandra/streaming/StreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java @@ -27,7 +27,7 @@ import java.util.Collection; import com.ning.compress.lzf.LZFOutputStream; import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataIntegrityMetadata; import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator; import org.apache.cassandra.io.util.FileUtils; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index fb2599f..0595e0c 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -18,12 +18,15 @@ package org.apache.cassandra.streaming.compress; import java.io.DataInputStream; + import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import com.google.common.base.Throwables; +import org.apache.cassandra.io.sstable.format.SSTableWriter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +34,6 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; @@ -72,21 +74,25 @@ public class CompressedStreamReader extends StreamReader } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - SSTableWriter writer = createWriter(cfs, totalSize, repairedAt); + SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format); - CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums); + CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums()); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); try { for (Pair section : sections) { - long length = section.right - section.left; + assert in.getBytesRead() < totalSize; + int sectionLength = (int) (section.right - section.left); + // skip to beginning of section inside chunk cis.position(section.left); in.reset(0); - while (in.getBytesRead() < length) + + while (in.getBytesRead() < sectionLength) { writeRow(writer, in, cfs); + // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java index 001c927..786ff23 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java @@ -25,7 +25,7 @@ import java.util.Collection; import java.util.List; import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.streaming.ProgressInfo; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index 5e378bc..5266e45 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.compress.CompressionInfo; @@ -43,6 +44,9 @@ public class FileMessageHeader public final int sequenceNumber; /** SSTable version */ public final String version; + + /** SSTable format **/ + public final SSTableFormat.Type format; public final long estimatedKeys; public final List> sections; public final CompressionInfo compressionInfo; @@ -52,6 +56,7 @@ public class FileMessageHeader public FileMessageHeader(UUID cfId, int sequenceNumber, String version, + SSTableFormat.Type format, long estimatedKeys, List> sections, CompressionInfo compressionInfo, @@ -61,6 +66,7 @@ public class FileMessageHeader this.cfId = cfId; this.sequenceNumber = sequenceNumber; this.version = version; + this.format = format; this.estimatedKeys = estimatedKeys; this.sections = sections; this.compressionInfo = compressionInfo; @@ -95,6 +101,7 @@ public class FileMessageHeader sb.append("cfId: ").append(cfId); sb.append(", #").append(sequenceNumber); sb.append(", version: ").append(version); + sb.append(", format: ").append(format); sb.append(", estimated keys: ").append(estimatedKeys); sb.append(", transfer size: ").append(size()); sb.append(", compressed?: ").append(compressionInfo != null); @@ -128,8 +135,15 @@ public class FileMessageHeader UUIDSerializer.serializer.serialize(header.cfId, out, version); out.writeInt(header.sequenceNumber); out.writeUTF(header.version); - out.writeLong(header.estimatedKeys); + //We can't stream to a node that doesn't understand a new sstable format + if (version < StreamMessage.VERSION_30 && header.format != SSTableFormat.Type.LEGACY && header.format != SSTableFormat.Type.BIG) + throw new UnsupportedOperationException("Can't stream non-legacy sstables to nodes < 3.0"); + + if (version >= StreamMessage.VERSION_30) + out.writeUTF(header.format.name); + + out.writeLong(header.estimatedKeys); out.writeInt(header.sections.size()); for (Pair section : header.sections) { @@ -146,6 +160,11 @@ public class FileMessageHeader UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); int sequenceNumber = in.readInt(); String sstableVersion = in.readUTF(); + + SSTableFormat.Type format = SSTableFormat.Type.LEGACY; + if (version >= StreamMessage.VERSION_30) + format = SSTableFormat.Type.validate(in.readUTF()); + long estimatedKeys = in.readLong(); int count = in.readInt(); List> sections = new ArrayList<>(count); @@ -154,7 +173,7 @@ public class FileMessageHeader CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version); long repairedAt = in.readLong(); int sstableLevel = in.readInt(); - return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel); + return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel); } public long serializedSize(FileMessageHeader header, int version) @@ -162,6 +181,10 @@ public class FileMessageHeader long size = UUIDSerializer.serializer.serializedSize(header.cfId, version); size += TypeSizes.NATIVE.sizeof(header.sequenceNumber); size += TypeSizes.NATIVE.sizeof(header.version); + + if (version >= StreamMessage.VERSION_30) + size += TypeSizes.NATIVE.sizeof(header.format.name); + size += TypeSizes.NATIVE.sizeof(header.estimatedKeys); size += TypeSizes.NATIVE.sizeof(header.sections.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java index 8569b88..237fb70 100644 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; -import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.util.DataOutputStreamAndChannel; import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index 13af987..ed4c4ce 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -22,7 +22,7 @@ import java.nio.channels.ReadableByteChannel; import java.util.List; import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputStreamAndChannel; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamWriter; @@ -74,6 +74,7 @@ public class OutgoingFileMessage extends StreamMessage this.header = new FileMessageHeader(sstable.metadata.cfId, sequenceNumber, sstable.descriptor.version.toString(), + sstable.descriptor.formatType, estimatedKeys, sections, compressionInfo, http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index 372fdd3..20490db 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -32,7 +32,9 @@ import org.apache.cassandra.streaming.StreamSession; public abstract class StreamMessage { /** Streaming protocol version */ - public static final int CURRENT_VERSION = 3; + public static final int VERSION_20 = 2; + public static final int VERSION_30 = 3; + public static final int CURRENT_VERSION = VERSION_30; public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/SSTableExport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java index 1febbe8..1ab56ce 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExport.java +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.io.PrintStream; import java.util.*; +import org.apache.cassandra.db.compaction.ICompactionScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.cli.*; import org.apache.cassandra.config.CFMetaData; @@ -115,7 +117,7 @@ public class SSTableExport } /** - * Serialize a given cell to a List of Objects that jsonMapper knows how to turn into strings. Format is + * Serialize a given cell to a List of Objects that jsonMapper knows how to turn into strings. Type is * * human_readable_name, value, timestamp, [flag, [options]] * @@ -318,10 +320,10 @@ public class SSTableExport Set excludeSet = new HashSet(); if (excludes != null) - excludeSet = new HashSet(Arrays.asList(excludes)); + excludeSet = new HashSet<>(Arrays.asList(excludes)); SSTableIdentityIterator row; - SSTableScanner scanner = reader.getScanner(); + ICompactionScanner scanner = reader.getScanner(); try { outs.println("["); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/SSTableImport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java index 05b9dcb..fc4470e 100644 --- a/src/java/org/apache/cassandra/tools/SSTableImport.java +++ b/src/java/org/apache/cassandra/tools/SSTableImport.java @@ -26,6 +26,8 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Option; @@ -42,7 +44,6 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -302,7 +303,7 @@ public class SSTableImport Object[] data = parser.readValueAs(new TypeReference(){}); keyCountToImport = (keyCountToImport == null) ? data.length : keyCountToImport; - SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport); + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE, 0); System.out.printf("Importing %s keys...%n", keyCountToImport); @@ -375,7 +376,7 @@ public class SSTableImport System.out.printf("Importing %s keys...%n", keyCountToImport); parser = getParser(jsonFile); // renewing parser - SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport); + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE); int lineNumber = 1; DecoratedKey prevStoredKey = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java index b37d3b4..1f596ad 100644 --- a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java +++ b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java @@ -77,7 +77,7 @@ public class SSTableRepairedAtSetter for (String fname: fileNames) { Descriptor descriptor = Descriptor.fromFilename(fname); - if (descriptor.version.hasRepairedAt) + if (descriptor.version.hasRepairedAt()) { if (setIsRepaired) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index 4dffa74..2c92e60 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -21,6 +21,7 @@ package org.apache.cassandra.tools; import java.io.File; import java.util.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.cli.*; import org.apache.cassandra.config.DatabaseDescriptor; @@ -66,7 +67,7 @@ public class StandaloneScrubber OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true); - List sstables = new ArrayList(); + List sstables = new ArrayList<>(); // Scrub sstables for (Map.Entry> entry : lister.list().entrySet()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/StandaloneSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java index e078c3b..8c7e704 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java +++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java @@ -21,6 +21,7 @@ package org.apache.cassandra.tools; import java.io.File; import java.util.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.cli.*; import org.apache.cassandra.config.DatabaseDescriptor; @@ -109,7 +110,7 @@ public class StandaloneSplitter String snapshotName = "pre-split-" + System.currentTimeMillis(); - List sstables = new ArrayList(); + List sstables = new ArrayList<>(); for (Map.Entry> fn : parsedFilenames.entrySet()) { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java index 1a8a89f..9aa02f2 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java +++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java @@ -19,6 +19,7 @@ package org.apache.cassandra.tools; import java.util.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.cli.*; import org.apache.cassandra.config.DatabaseDescriptor; @@ -61,7 +62,7 @@ public class StandaloneUpgrader else lister.includeBackups(false); - Collection readers = new ArrayList(); + Collection readers = new ArrayList<>(); // Upgrade sstables for (Map.Entry> entry : lister.list().entrySet()) @@ -73,7 +74,7 @@ public class StandaloneUpgrader try { SSTableReader sstable = SSTableReader.openNoValidation(entry.getKey(), components, cfs.metadata); - if (sstable.descriptor.version.equals(Descriptor.Version.CURRENT)) + if (sstable.descriptor.version.equals(DatabaseDescriptor.getSSTableFormat().info.getLatestVersion())) continue; readers.add(sstable); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java index 6385e5c..bee8ab0 100644 --- a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java +++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java @@ -47,17 +47,17 @@ public class EncodedDataInputStream extends AbstractDataInput implements DataInp return input.readByte() & 0xFF; } - protected void seek(long position) + public void seek(long position) { throw new UnsupportedOperationException(); } - protected long getPosition() + public long getPosition() { throw new UnsupportedOperationException(); } - protected long getPositionLimit() + public long getPositionLimit() { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java index 7bc8ef5..28ec975 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -32,7 +33,6 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.Util; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.*; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableUtils; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.ByteBufferUtil; @@ -92,7 +92,7 @@ public class LongCompactionsTest Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); - ArrayList sstables = new ArrayList(); + ArrayList sstables = new ArrayList<>(); for (int k = 0; k < sstableCount; k++) { SortedMap rows = new TreeMap(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index 159b641..0530d84 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.junit.BeforeClass; import org.junit.Test; @@ -29,7 +30,6 @@ import org.apache.cassandra.Util; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.FBUtilities; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index fd53170..2d5e6fc 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -49,7 +49,7 @@ import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java index 3da4555..c1869b9 100644 --- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java +++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.cache; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -29,7 +30,6 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.utils.ByteBufferUtil; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/CleanupTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java index 213c5b8..21de96f 100644 --- a/test/unit/org/apache/cassandra/db/CleanupTest.java +++ b/test/unit/org/apache/cassandra/db/CleanupTest.java @@ -39,7 +39,7 @@ import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.dht.BytesToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 386be01..be64a6f 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -43,6 +43,9 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.junit.BeforeClass; @@ -74,12 +77,6 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.IncludingExcludingBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableDeletingTask; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableSimpleWriter; -import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.SimpleStrategy; @@ -1780,12 +1777,12 @@ public class ColumnFamilyStoreTest { MetadataCollector collector = new MetadataCollector(cfmeta.comparator); collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable - return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName), - 0, - ActiveRepairService.UNREPAIRED_SSTABLE, - metadata, - StorageService.getPartitioner(), - collector); + return SSTableWriter.create(Descriptor.fromFilename(makeFilename(directory, metadata.ksName, metadata.cfName, DatabaseDescriptor.getSSTableFormat())), + 0L, + ActiveRepairService.UNREPAIRED_SSTABLE, + metadata, + DatabaseDescriptor.getPartitioner(), + collector); } }; writer.newRow(key); @@ -1837,12 +1834,12 @@ public class ColumnFamilyStoreTest for (int ancestor : ancestors) collector.addAncestor(ancestor); String file = new Descriptor(directory, ks, cf, 3, Descriptor.Type.TEMP).filenameFor(Component.DATA); - return new SSTableWriter(file, - 0, - ActiveRepairService.UNREPAIRED_SSTABLE, - metadata, - StorageService.getPartitioner(), - collector); + return SSTableWriter.create(Descriptor.fromFilename(file), + 0L, + ActiveRepairService.UNREPAIRED_SSTABLE, + metadata, + StorageService.getPartitioner(), + collector); } }; writer.newRow(key); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index ad3a6bc..629b414 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.nio.file.Files; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -25,6 +24,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -37,7 +37,6 @@ import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; @@ -179,7 +178,7 @@ public class KeyCacheTest for (SSTableReader reader : readers) reader.releaseReference(); - Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);; + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); while (StorageService.tasks.getActiveCount() + StorageService.tasks.getQueue().size() > 0); // after releasing the reference this should drop to 2 http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/KeyspaceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java b/test/unit/org/apache/cassandra/db/KeyspaceTest.java index 27a4e20..01038c4 100644 --- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java @@ -26,6 +26,7 @@ import java.util.*; import java.io.IOException; import com.google.common.collect.Iterables; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.lang3.StringUtils; import org.junit.BeforeClass; import org.junit.Test; @@ -49,7 +50,6 @@ import static org.apache.cassandra.Util.column; import static org.apache.cassandra.Util.expiringColumn; import static org.apache.cassandra.Util.cellname; import org.apache.cassandra.Util; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.ByteBufferUtil; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java index 2a6b952..d2f63cc 100644 --- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java +++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.junit.BeforeClass; import org.junit.Test; @@ -52,12 +53,10 @@ import org.apache.cassandra.db.index.SecondaryIndexSearcher; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.MemtableAllocator; import static org.apache.cassandra.Util.dk; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java index 3282b0a..e880d95 100644 --- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java +++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java @@ -28,6 +28,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.db.composites.SimpleDenseCellNameType; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.ByteBufferUtil; @@ -39,10 +40,10 @@ public class RowIndexEntryTest extends SchemaLoader @Test public void testSerializedSize() throws IOException { - final RowIndexEntry simple = new RowIndexEntry(123); + final RowIndexEntry simple = new RowIndexEntry<>(123); DataOutputBuffer buffer = new DataOutputBuffer(); - RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(new SimpleDenseCellNameType(UTF8Type.instance)); + RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(new IndexHelper.IndexInfo.Serializer(new SimpleDenseCellNameType(UTF8Type.instance))); serializer.serialize(simple, buffer); @@ -70,7 +71,7 @@ public class RowIndexEntryTest extends SchemaLoader }}.build(); - RowIndexEntry withIndex = RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex); + RowIndexEntry withIndex = RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex); serializer.serialize(withIndex, buffer); Assert.assertEquals(buffer.getLength(), serializer.serializedSize(withIndex)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index 24a6787..814b498 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -37,6 +37,8 @@ import org.apache.cassandra.db.marshal.CounterColumnType; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.lang3.StringUtils; @@ -56,7 +58,6 @@ import org.apache.cassandra.db.compaction.Scrubber; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.Util.cellname; @@ -230,7 +231,7 @@ public class ScrubTest assert root != null; File rootDir = new File(root); assert rootDir.isDirectory(); - Descriptor desc = new Descriptor(new Descriptor.Version("jb"), rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL); + Descriptor desc = new Descriptor("jb", rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY); CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index cd5dc7f..e6e5d55 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -29,6 +29,14 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutionException; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.SimpleStrategy; +import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Test; + import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.KSMetaData; @@ -41,9 +49,6 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableScanner; -import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -110,7 +115,7 @@ public class AntiCompactionTest int nonRepairedKeys = 0; for (SSTableReader sstable : store.getSSTables()) { - SSTableScanner scanner = sstable.getScanner(); + ICompactionScanner scanner = sstable.getScanner(); while (scanner.hasNext()) { SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); @@ -187,7 +192,7 @@ public class AntiCompactionTest int nonRepairedKeys = 0; for (SSTableReader sstable : store.getSSTables()) { - SSTableScanner scanner = sstable.getScanner(); + ICompactionScanner scanner = sstable.getScanner(); while (scanner.hasNext()) { SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java index 85f5330..1116c9e 100644 --- a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.Set; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.junit.BeforeClass; import org.junit.Test; @@ -34,7 +35,6 @@ import org.apache.cassandra.Util; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.ByteBufferUtil; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index 00923b1..5f87ff3 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@ -28,6 +28,7 @@ import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.SimpleStrategy; import org.junit.BeforeClass; import org.junit.Test; @@ -35,7 +36,6 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.Util; import static org.junit.Assert.assertEquals;