cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [11/12] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.5
Date Thu, 17 Mar 2016 16:07:07 GMT
Merge branch 'cassandra-3.0' into cassandra-3.5


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/587773fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/587773fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/587773fa

Branch: refs/heads/cassandra-3.5
Commit: 587773fa478ff64aa46cf17760eb31d6f83fc46d
Parents: e3716ee e8651b6
Author: Yuki Morishita <yukim@apache.org>
Authored: Thu Mar 17 10:42:20 2016 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Thu Mar 17 10:42:20 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   2 +
 .../org/apache/cassandra/db/Directories.java    |  30 +
 .../cassandra/db/SerializationHeader.java       |   5 +
 .../org/apache/cassandra/db/Serializers.java    | 114 ++--
 .../columniterator/AbstractSSTableIterator.java |   4 +-
 .../EncryptedFileSegmentInputStream.java        |   4 +-
 .../cassandra/hints/ChecksummedDataInput.java   |   8 +-
 .../org/apache/cassandra/hints/HintMessage.java |   4 +-
 .../io/compress/CompressedSequentialWriter.java |   8 +-
 .../io/sstable/SSTableSimpleIterator.java       |  11 +-
 .../sstable/format/RangeAwareSSTableWriter.java |   8 +-
 .../io/sstable/format/SSTableReader.java        |   2 +-
 .../io/sstable/format/SSTableWriter.java        |   2 +-
 .../io/sstable/format/big/BigTableWriter.java   |   4 +-
 .../cassandra/io/util/BytesReadTracker.java     |  30 +
 .../apache/cassandra/io/util/DataPosition.java  |  21 +
 .../apache/cassandra/io/util/FileDataInput.java |   8 +-
 .../org/apache/cassandra/io/util/FileMark.java  |  20 -
 .../io/util/FileSegmentInputStream.java         |  12 +-
 .../cassandra/io/util/RandomAccessReader.java   |   8 +-
 .../cassandra/io/util/RewindableDataInput.java  |  30 +
 .../io/util/RewindableDataInputStreamPlus.java  | 569 +++++++++++++++++++
 .../cassandra/io/util/SequentialWriter.java     |   6 +-
 .../cassandra/io/util/TrackedDataInputPlus.java | 150 +++++
 .../cassandra/io/util/TrackedInputStream.java   |  76 +++
 .../cassandra/service/StorageService.java       |   1 +
 .../cassandra/streaming/StreamReader.java       |  85 ++-
 .../compress/CompressedStreamReader.java        |  18 +-
 .../streaming/messages/FileMessageHeader.java   |   4 +-
 .../apache/cassandra/tools/nodetool/Repair.java |   2 +-
 .../cassandra/utils/BytesReadTracker.java       | 153 -----
 .../cassandra/utils/CloseableIterator.java      |   1 -
 ...acy_jb_clust_compact-jb-1-CompressionInfo.db | Bin 0 -> 83 bytes
 ..._tables-legacy_jb_clust_compact-jb-1-Data.db | Bin 0 -> 5270 bytes
 ...ables-legacy_jb_clust_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
 ...tables-legacy_jb_clust_compact-jb-1-Index.db | Bin 0 -> 157685 bytes
 ...s-legacy_jb_clust_compact-jb-1-Statistics.db | Bin 0 -> 6791 bytes
 ...bles-legacy_jb_clust_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
 ..._tables-legacy_jb_clust_compact-jb-1-TOC.txt |   7 +
 ...lust_counter_compact-jb-1-CompressionInfo.db | Bin 0 -> 75 bytes
 ...legacy_jb_clust_counter_compact-jb-1-Data.db | Bin 0 -> 4228 bytes
 ...gacy_jb_clust_counter_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
 ...egacy_jb_clust_counter_compact-jb-1-Index.db | Bin 0 -> 157685 bytes
 ..._jb_clust_counter_compact-jb-1-Statistics.db | Bin 0 -> 6791 bytes
 ...acy_jb_clust_counter_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
 ...legacy_jb_clust_counter_compact-jb-1-TOC.txt |   7 +
 ...cy_jb_simple_compact-jb-1-CompressionInfo.db | Bin 0 -> 43 bytes
 ...tables-legacy_jb_simple_compact-jb-1-Data.db | Bin 0 -> 108 bytes
 ...bles-legacy_jb_simple_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
 ...ables-legacy_jb_simple_compact-jb-1-Index.db | Bin 0 -> 75 bytes
 ...-legacy_jb_simple_compact-jb-1-Statistics.db | Bin 0 -> 4395 bytes
 ...les-legacy_jb_simple_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
 ...tables-legacy_jb_simple_compact-jb-1-TOC.txt |   7 +
 ...mple_counter_compact-jb-1-CompressionInfo.db | Bin 0 -> 43 bytes
 ...egacy_jb_simple_counter_compact-jb-1-Data.db | Bin 0 -> 118 bytes
 ...acy_jb_simple_counter_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
 ...gacy_jb_simple_counter_compact-jb-1-Index.db | Bin 0 -> 75 bytes
 ...jb_simple_counter_compact-jb-1-Statistics.db | Bin 0 -> 4395 bytes
 ...cy_jb_simple_counter_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
 ...egacy_jb_simple_counter_compact-jb-1-TOC.txt |   7 +
 ...acy_ka_clust_compact-ka-1-CompressionInfo.db | Bin 0 -> 83 bytes
 ..._tables-legacy_ka_clust_compact-ka-1-Data.db | Bin 0 -> 5277 bytes
 ...les-legacy_ka_clust_compact-ka-1-Digest.sha1 |   1 +
 ...ables-legacy_ka_clust_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
 ...tables-legacy_ka_clust_compact-ka-1-Index.db | Bin 0 -> 157685 bytes
 ...s-legacy_ka_clust_compact-ka-1-Statistics.db | Bin 0 -> 6859 bytes
 ...bles-legacy_ka_clust_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
 ..._tables-legacy_ka_clust_compact-ka-1-TOC.txt |   8 +
 ...lust_counter_compact-ka-1-CompressionInfo.db | Bin 0 -> 75 bytes
 ...legacy_ka_clust_counter_compact-ka-1-Data.db | Bin 0 -> 4527 bytes
 ...cy_ka_clust_counter_compact-ka-1-Digest.sha1 |   1 +
 ...gacy_ka_clust_counter_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
 ...egacy_ka_clust_counter_compact-ka-1-Index.db | Bin 0 -> 157685 bytes
 ..._ka_clust_counter_compact-ka-1-Statistics.db | Bin 0 -> 6859 bytes
 ...acy_ka_clust_counter_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
 ...legacy_ka_clust_counter_compact-ka-1-TOC.txt |   8 +
 ...cy_ka_simple_compact-ka-1-CompressionInfo.db | Bin 0 -> 43 bytes
 ...tables-legacy_ka_simple_compact-ka-1-Data.db | Bin 0 -> 105 bytes
 ...es-legacy_ka_simple_compact-ka-1-Digest.sha1 |   1 +
 ...bles-legacy_ka_simple_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
 ...ables-legacy_ka_simple_compact-ka-1-Index.db | Bin 0 -> 75 bytes
 ...-legacy_ka_simple_compact-ka-1-Statistics.db | Bin 0 -> 4453 bytes
 ...les-legacy_ka_simple_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
 ...tables-legacy_ka_simple_compact-ka-1-TOC.txt |   8 +
 ...mple_counter_compact-ka-1-CompressionInfo.db | Bin 0 -> 43 bytes
 ...egacy_ka_simple_counter_compact-ka-1-Data.db | Bin 0 -> 124 bytes
 ...y_ka_simple_counter_compact-ka-1-Digest.sha1 |   1 +
 ...acy_ka_simple_counter_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
 ...gacy_ka_simple_counter_compact-ka-1-Index.db | Bin 0 -> 75 bytes
 ...ka_simple_counter_compact-ka-1-Statistics.db | Bin 0 -> 4453 bytes
 ...cy_ka_simple_counter_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
 ...egacy_ka_simple_counter_compact-ka-1-TOC.txt |   8 +
 .../la-1-big-CompressionInfo.db                 | Bin 0 -> 83 bytes
 .../legacy_la_clust_compact/la-1-big-Data.db    | Bin 0 -> 5286 bytes
 .../la-1-big-Digest.adler32                     |   1 +
 .../legacy_la_clust_compact/la-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_la_clust_compact/la-1-big-Index.db   | Bin 0 -> 157685 bytes
 .../la-1-big-Statistics.db                      | Bin 0 -> 6859 bytes
 .../legacy_la_clust_compact/la-1-big-Summary.db | Bin 0 -> 75 bytes
 .../legacy_la_clust_compact/la-1-big-TOC.txt    |   8 +
 .../la-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../la-1-big-Data.db                            | Bin 0 -> 4527 bytes
 .../la-1-big-Digest.adler32                     |   1 +
 .../la-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../la-1-big-Index.db                           | Bin 0 -> 157685 bytes
 .../la-1-big-Statistics.db                      | Bin 0 -> 6859 bytes
 .../la-1-big-Summary.db                         | Bin 0 -> 75 bytes
 .../la-1-big-TOC.txt                            |   8 +
 .../la-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_la_simple_compact/la-1-big-Data.db   | Bin 0 -> 106 bytes
 .../la-1-big-Digest.adler32                     |   1 +
 .../legacy_la_simple_compact/la-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_la_simple_compact/la-1-big-Index.db  | Bin 0 -> 75 bytes
 .../la-1-big-Statistics.db                      | Bin 0 -> 4453 bytes
 .../la-1-big-Summary.db                         | Bin 0 -> 75 bytes
 .../legacy_la_simple_compact/la-1-big-TOC.txt   |   8 +
 .../la-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../la-1-big-Data.db                            | Bin 0 -> 124 bytes
 .../la-1-big-Digest.adler32                     |   1 +
 .../la-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../la-1-big-Index.db                           | Bin 0 -> 75 bytes
 .../la-1-big-Statistics.db                      | Bin 0 -> 4453 bytes
 .../la-1-big-Summary.db                         | Bin 0 -> 75 bytes
 .../la-1-big-TOC.txt                            |   8 +
 .../ma-1-big-CompressionInfo.db                 | Bin 0 -> 83 bytes
 .../legacy_ma_clust_compact/ma-1-big-Data.db    | Bin 0 -> 5393 bytes
 .../ma-1-big-Digest.crc32                       |   1 +
 .../legacy_ma_clust_compact/ma-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_ma_clust_compact/ma-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../ma-1-big-Statistics.db                      | Bin 0 -> 7046 bytes
 .../legacy_ma_clust_compact/ma-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_ma_clust_compact/ma-1-big-TOC.txt    |   8 +
 .../ma-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../ma-1-big-Data.db                            | Bin 0 -> 4606 bytes
 .../ma-1-big-Digest.crc32                       |   1 +
 .../ma-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../ma-1-big-Index.db                           | Bin 0 -> 157553 bytes
 .../ma-1-big-Statistics.db                      | Bin 0 -> 7055 bytes
 .../ma-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../ma-1-big-TOC.txt                            |   8 +
 .../ma-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_ma_simple_compact/ma-1-big-Data.db   | Bin 0 -> 91 bytes
 .../ma-1-big-Digest.crc32                       |   1 +
 .../legacy_ma_simple_compact/ma-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_ma_simple_compact/ma-1-big-Index.db  | Bin 0 -> 26 bytes
 .../ma-1-big-Statistics.db                      | Bin 0 -> 4640 bytes
 .../ma-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_ma_simple_compact/ma-1-big-TOC.txt   |   8 +
 .../ma-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../ma-1-big-Data.db                            | Bin 0 -> 114 bytes
 .../ma-1-big-Digest.crc32                       |   1 +
 .../ma-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../ma-1-big-Index.db                           | Bin 0 -> 27 bytes
 .../ma-1-big-Statistics.db                      | Bin 0 -> 4649 bytes
 .../ma-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../ma-1-big-TOC.txt                            |   8 +
 .../cassandra/AbstractSerializationsTester.java |   1 -
 .../apache/cassandra/db/DirectoriesTest.java    |  98 ++--
 .../cassandra/gms/SerializationsTest.java       |   1 -
 .../CompressedRandomAccessReaderTest.java       |   6 +-
 .../CompressedSequentialWriterTest.java         |   4 +-
 .../cassandra/io/sstable/LegacySSTableTest.java | 369 ++++++------
 .../io/util/BufferedRandomAccessFileTest.java   |   4 +-
 .../io/util/RandomAccessReaderTest.java         |   2 +-
 .../util/RewindableDataInputStreamPlusTest.java | 539 ++++++++++++++++++
 .../cassandra/utils/BytesReadTrackerTest.java   | 104 +++-
 167 files changed, 2115 insertions(+), 550 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a01e511,51cfc16..53dd292
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,6 -1,6 +1,7 @@@
 -3.0.5
 +3.5
 +Merged from 3.0:
+  * Support streaming pre-3.0 sstables (CASSANDRA-10990)
 - * Add backpressure to compressed commit log (CASSANDRA-10971)
 + * Add backpressure to compressed or encrypted commit log (CASSANDRA-10971)
   * SSTableExport supports secondary index tables (CASSANDRA-11330)
   * Fix sstabledump to include missing info in debug output (CASSANDRA-11321)
   * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Serializers.java
index 17f1de0,348fda3..cef06a3
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@@ -46,62 -46,77 +46,77 @@@ public class Serializer
      // unecessary (since IndexInfo.Serializer won't depend on the metadata either).
      public ISerializer<ClusteringPrefix> indexEntryClusteringPrefixSerializer(final
Version version, final SerializationHeader header)
      {
-         if (!version.storeRows())
+         if (!version.storeRows() || header ==  null) //null header indicates streaming from
pre-3.0 sstables
          {
-             return new ISerializer<ClusteringPrefix>()
+             return oldFormatSerializer(version);
+         }
+ 
+         return newFormatSerializer(version, header);
+     }
+ 
+     private ISerializer<ClusteringPrefix> oldFormatSerializer(final Version version)
+     {
+         return new ISerializer<ClusteringPrefix>()
+         {
+             SerializationHeader newHeader = SerializationHeader.makeWithoutStats(metadata);
+ 
+             public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws
IOException
              {
-                 public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws
IOException
-                 {
-                     // We should only use this for reading old sstable, never write new
ones.
-                     throw new UnsupportedOperationException();
-                 }
+                 //we deserialize in the old format and serialize in the new format
+                 ClusteringPrefix.serializer.serialize(clustering, out,
+                                                       version.correspondingMessagingVersion(),
+                                                       newHeader.clusteringTypes());
+             }
+ 
+             public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
+             {
+                 // We're reading the old cellname/composite
+                 ByteBuffer bb = ByteBufferUtil.readWithShortLength(in);
+                 assert bb.hasRemaining(); // empty cellnames were invalid
+ 
+                 int clusteringSize = metadata.clusteringColumns().size();
+                 // If the table has no clustering column, then the cellname will just be
the "column" name, which we ignore here.
+                 if (clusteringSize == 0)
+                     return Clustering.EMPTY;
+ 
+                 if (!metadata.isCompound())
 -                    return new Clustering(bb);
++                    return Clustering.make(bb);
  
-                 public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
+                 List<ByteBuffer> components = CompositeType.splitName(bb);
+                 byte eoc = CompositeType.lastEOC(bb);
+ 
+                 if (eoc == 0 || components.size() >= clusteringSize)
                  {
-                     // We're reading the old cellname/composite
-                     ByteBuffer bb = ByteBufferUtil.readWithShortLength(in);
-                     assert bb.hasRemaining(); // empty cellnames were invalid
- 
-                     int clusteringSize = metadata.clusteringColumns().size();
-                     // If the table has no clustering column, then the cellname will just
be the "column" name, which we ignore here.
-                     if (clusteringSize == 0)
-                         return Clustering.EMPTY;
- 
-                     if (!metadata.isCompound())
-                         return Clustering.make(bb);
- 
-                     List<ByteBuffer> components = CompositeType.splitName(bb);
-                     byte eoc = CompositeType.lastEOC(bb);
- 
-                     if (eoc == 0 || components.size() >= clusteringSize)
-                     {
-                         // That's a clustering.
-                         if (components.size() > clusteringSize)
-                             components = components.subList(0, clusteringSize);
- 
-                         return Clustering.make(components.toArray(new ByteBuffer[clusteringSize]));
-                     }
-                     else
-                     {
-                         // It's a range tombstone bound. It is a start since that's the
only part we've ever included
-                         // in the index entries.
-                         Slice.Bound.Kind boundKind = eoc > 0
-                                                    ? Slice.Bound.Kind.EXCL_START_BOUND
-                                                    : Slice.Bound.Kind.INCL_START_BOUND;
- 
-                         return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
-                     }
-                 }
+                     // That's a clustering.
+                     if (components.size() > clusteringSize)
+                         components = components.subList(0, clusteringSize);
  
-                 public long serializedSize(ClusteringPrefix clustering)
 -                    return new Clustering(components.toArray(new ByteBuffer[clusteringSize]));
++                    return Clustering.make(components.toArray(new ByteBuffer[clusteringSize]));
+                 }
+                 else
                  {
-                     // We should only use this for reading old sstable, never write new
ones.
-                     throw new UnsupportedOperationException();
+                     // It's a range tombstone bound. It is a start since that's the only
part we've ever included
+                     // in the index entries.
+                     Slice.Bound.Kind boundKind = eoc > 0
+                                                  ? Slice.Bound.Kind.EXCL_START_BOUND
+                                                  : Slice.Bound.Kind.INCL_START_BOUND;
+ 
+                     return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
                  }
-             };
-         }
+             }
  
-         return new ISerializer<ClusteringPrefix>()
+             public long serializedSize(ClusteringPrefix clustering)
+             {
+                 return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(),
+                                                                   newHeader.clusteringTypes());
+             }
+         };
+     }
+ 
+ 
+     private ISerializer<ClusteringPrefix> newFormatSerializer(final Version version,
final SerializationHeader header)
+     {
+         return new ISerializer<ClusteringPrefix>() //Reading and writing from/to the
new sstable format
          {
              public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws
IOException
              {
@@@ -119,4 -134,5 +134,5 @@@
              }
          };
      }
- }
+ 
 -}
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index d55161b,0e2012e..7f2e3bb
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@@ -29,10 -30,10 +29,10 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.io.sstable.CorruptSSTableException;
  import org.apache.cassandra.io.sstable.IndexHelper;
  import org.apache.cassandra.io.util.FileDataInput;
- import org.apache.cassandra.io.util.FileMark;
+ import org.apache.cassandra.io.util.DataPosition;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
 -abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
 +abstract class AbstractSSTableIterator implements UnfilteredRowIterator
  {
      protected final SSTableReader sstable;
      protected final DecoratedKey key;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
index 6915196,0000000..56bb7d6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
@@@ -1,73 -1,0 +1,73 @@@
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.DataInput;
 +import java.nio.ByteBuffer;
 +
++import org.apache.cassandra.io.util.DataPosition;
 +import org.apache.cassandra.io.util.FileDataInput;
- import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.FileSegmentInputStream;
 +
 +/**
 + * Each segment of an encrypted file may contain many encrypted chunks, and each chunk needs
to be individually decrypted
 + * to reconstruct the full segment.
 + */
 +public class EncryptedFileSegmentInputStream extends FileSegmentInputStream implements FileDataInput,
DataInput
 +{
 +    private final long segmentOffset;
 +    private final int expectedLength;
 +    private final ChunkProvider chunkProvider;
 +
 +    /**
 +     * offset the decrypted chunks already processed in this segment.
 +     */
 +    private int totalChunkOffset;
 +
 +    public EncryptedFileSegmentInputStream(String filePath, long segmentOffset, int position,
int expectedLength, ChunkProvider chunkProvider)
 +    {
 +        super(chunkProvider.nextChunk(), filePath, position);
 +        this.segmentOffset = segmentOffset;
 +        this.expectedLength = expectedLength;
 +        this.chunkProvider = chunkProvider;
 +    }
 +
 +    public interface ChunkProvider
 +    {
 +        /**
 +         * Get the next chunk from the backing provider, if any chunks remain.
 +         * @return Next chunk, else null if no more chunks remain.
 +         */
 +        ByteBuffer nextChunk();
 +    }
 +
 +    public long getFilePointer()
 +    {
 +        return segmentOffset + totalChunkOffset + buffer.position();
 +    }
 +
 +    public boolean isEOF()
 +    {
 +        return totalChunkOffset + buffer.position() >= expectedLength;
 +    }
 +
 +    public long bytesRemaining()
 +    {
 +        return expectedLength - (totalChunkOffset + buffer.position());
 +    }
 +
 +    public void seek(long position)
 +    {
 +        // implement this when we actually need it
 +        throw new UnsupportedOperationException();
 +    }
 +
-     public long bytesPastMark(FileMark mark)
++    public long bytesPastMark(DataPosition mark)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void reBuffer()
 +    {
 +        totalChunkOffset += buffer.position();
 +        buffer = chunkProvider.nextChunk();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 674ed7f,0000000..9fcdfa4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@@ -1,205 -1,0 +1,205 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.SerializationHeader;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.service.StorageService;
 +
 +public class RangeAwareSSTableWriter implements SSTableMultiWriter
 +{
 +    private final List<PartitionPosition> boundaries;
 +    private final Directories.DataDirectory[] directories;
 +    private final int sstableLevel;
 +    private final long estimatedKeys;
 +    private final long repairedAt;
 +    private final SSTableFormat.Type format;
-     private final SerializationHeader.Component header;
++    private final SerializationHeader header;
 +    private final LifecycleTransaction txn;
 +    private int currentIndex = -1;
 +    public final ColumnFamilyStore cfs;
 +    private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>();
 +    private final List<SSTableReader> finishedReaders = new ArrayList<>();
 +    private SSTableMultiWriter currentWriter = null;
 +
-     public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt,
SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader.Component
header) throws IOException
++    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt,
SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader
header) throws IOException
 +    {
 +        directories = cfs.getDirectories().getWriteableLocations();
 +        this.sstableLevel = sstableLevel;
 +        this.cfs = cfs;
 +        this.estimatedKeys = estimatedKeys / directories.length;
 +        this.repairedAt = repairedAt;
 +        this.format = format;
 +        this.txn = txn;
 +        this.header = header;
 +        boundaries = StorageService.getDiskBoundaries(cfs, directories);
 +        if (boundaries == null)
 +        {
 +            Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
 +            if (localDir == null)
 +                throw new IOException("Insufficient disk space to store " + totalSize +
" bytes");
 +            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir),
format));
-             currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt,
sstableLevel, header.toHeader(cfs.metadata), txn);
++            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt,
sstableLevel, header, txn);
 +        }
 +    }
 +
 +    private void maybeSwitchWriter(DecoratedKey key)
 +    {
 +        if (boundaries == null)
 +            return;
 +
 +        boolean switched = false;
 +        while (currentIndex < 0 || key.compareTo(boundaries.get(currentIndex)) > 0)
 +        {
 +            switched = true;
 +            currentIndex++;
 +        }
 +
 +        if (switched)
 +        {
 +            if (currentWriter != null)
 +                finishedWriters.add(currentWriter);
 +
 +            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])),
format);
-             currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt,
sstableLevel, header.toHeader(cfs.metadata), txn);
++            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt,
sstableLevel, header, txn);
 +        }
 +    }
 +
 +    public boolean append(UnfilteredRowIterator partition)
 +    {
 +        maybeSwitchWriter(partition.partitionKey());
 +        return currentWriter.append(partition);
 +    }
 +
 +    @Override
 +    public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean
openResult)
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        for (SSTableMultiWriter writer : finishedWriters)
 +        {
 +            if (writer.getFilePointer() > 0)
 +                finishedReaders.addAll(writer.finish(repairedAt, maxDataAge, openResult));
 +            else
 +                SSTableMultiWriter.abortOrDie(writer);
 +        }
 +        return finishedReaders;
 +    }
 +
 +    @Override
 +    public Collection<SSTableReader> finish(boolean openResult)
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        for (SSTableMultiWriter writer : finishedWriters)
 +        {
 +            if (writer.getFilePointer() > 0)
 +                finishedReaders.addAll(writer.finish(openResult));
 +            else
 +                SSTableMultiWriter.abortOrDie(writer);
 +        }
 +        return finishedReaders;
 +    }
 +
 +    @Override
 +    public Collection<SSTableReader> finished()
 +    {
 +        return finishedReaders;
 +    }
 +
 +    @Override
 +    public SSTableMultiWriter setOpenResult(boolean openResult)
 +    {
 +        finishedWriters.forEach((w) -> w.setOpenResult(openResult));
 +        currentWriter.setOpenResult(openResult);
 +        return this;
 +    }
 +
 +    public String getFilename()
 +    {
 +        return String.join("/", cfs.keyspace.getName(), cfs.getTableName());
 +    }
 +
 +    @Override
 +    public long getFilePointer()
 +    {
 +        return currentWriter.getFilePointer();
 +    }
 +
 +    @Override
 +    public UUID getCfId()
 +    {
 +        return currentWriter.getCfId();
 +    }
 +
 +    @Override
 +    public Throwable commit(Throwable accumulate)
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        for (SSTableMultiWriter writer : finishedWriters)
 +            accumulate = writer.commit(accumulate);
 +        return accumulate;
 +    }
 +
 +    @Override
 +    public Throwable abort(Throwable accumulate)
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        for (SSTableMultiWriter finishedWriter : finishedWriters)
 +            accumulate = finishedWriter.abort(accumulate);
 +
 +        return accumulate;
 +    }
 +
 +    @Override
 +    public void prepareToCommit()
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        finishedWriters.forEach(SSTableMultiWriter::prepareToCommit);
 +    }
 +
 +    @Override
 +    public void close()
 +    {
 +        if (currentWriter != null)
 +            finishedWriters.add(currentWriter);
 +        currentWriter = null;
 +        finishedWriters.forEach(SSTableMultiWriter::close);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index ab38ba9,5f35029..6aaf776
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@@ -83,9 -80,8 +83,9 @@@ public abstract class SSTableWriter ext
          this.keyCount = keyCount;
          this.repairedAt = repairedAt;
          this.metadataCollector = metadataCollector;
-         this.header = header;
+         this.header = header != null ? header : SerializationHeader.makeWithoutStats(metadata);
//null header indicates streaming from pre-3.0 sstable
          this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata,
descriptor.version, header);
 +        this.observers = observers == null ? Collections.emptySet() : observers;
      }
  
      public static SSTableWriter create(Descriptor descriptor,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 7348027,f8db26b..7d7cf8a
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -35,15 -35,18 +35,18 @@@ import org.apache.cassandra.config.CFMe
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.rows.*;
 -import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
  import org.apache.cassandra.io.sstable.format.SSTableFormat;
  import org.apache.cassandra.io.sstable.format.Version;
+ import org.apache.cassandra.io.util.RewindableDataInputStreamPlus;
  import org.apache.cassandra.io.util.DataInputPlus;
+ import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.streaming.messages.FileMessageHeader;
  import org.apache.cassandra.utils.ByteBufferUtil;
- import org.apache.cassandra.utils.BytesReadTracker;
+ import org.apache.cassandra.io.util.TrackedInputStream;
+ import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  
  
@@@ -124,7 -129,7 +127,7 @@@ public class StreamReade
          {
              if (deserializer != null)
                  logger.warn("[Stream {}] Error while reading partition {} from stream on
ks='{}' and table='{}'.",
--                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(),
cfs.getColumnFamilyName());
++                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(),
cfs.getTableName());
              if (writer != null)
              {
                  writer.abort(e);
@@@ -142,10 -157,9 +155,10 @@@
          Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
          if (localDir == null)
              throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
 -        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir),
format));
  
-         RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys,
repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), header);
 -        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel,
getHeader(cfs.metadata), session.getTransaction(cfId));
++        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys,
repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), getHeader(cfs.metadata));
 +        StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum);
 +        return writer;
      }
  
      protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 5a47787,9719587..318484f
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -24,9 -24,7 +24,8 @@@ import java.nio.channels.ReadableByteCh
  
  import com.google.common.base.Throwables;
  
- import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;


Mime
View raw message