Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4C5681899C for ; Thu, 17 Mar 2016 16:06:59 +0000 (UTC) Received: (qmail 75578 invoked by uid 500); 17 Mar 2016 16:06:58 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 75444 invoked by uid 500); 17 Mar 2016 16:06:58 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 75023 invoked by uid 99); 17 Mar 2016 16:06:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Mar 2016 16:06:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A8CEDDFA42; Thu, 17 Mar 2016 16:06:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Thu, 17 Mar 2016 16:07:03 -0000 Message-Id: <4fe398e34b084a32adb70016bbfe5b7c@git.apache.org> In-Reply-To: <715c91d3123c4ecdafa1dab8606d0e57@git.apache.org> References: <715c91d3123c4ecdafa1dab8606d0e57@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/12] cassandra git commit: Support streaming of older version sstables in 3.0 http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt b/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt new file mode 100644 index 0000000..a29a600 --- /dev/null +++ b/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt @@ -0,0 +1,8 @@ +Index.db +Statistics.db +Digest.crc32 +Summary.db +Data.db +TOC.txt +Filter.db +CompressionInfo.db http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/AbstractSerializationsTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java index 501f4ae..3a1f348 100644 --- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java +++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java @@ -25,7 +25,6 @@ import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; -import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessagingService; import java.io.File; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/db/DirectoriesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java index 8732881..f864bbc 100644 --- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java +++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java @@ -243,46 +243,76 @@ public class DirectoriesTest for (CFMetaData cfm : CFM) { Directories directories = new Directories(cfm); - Directories.SSTableLister lister; - Set listed; + checkFiles(cfm, directories); + } + } - // List all but no snapshot, backup - lister = directories.sstableLister(Directories.OnTxnErr.THROW); - listed = new HashSet<>(lister.listFiles()); - for (File f : files.get(cfm.cfName)) - { - if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR)) - assert !listed.contains(f) : f + " should not be listed"; - else - assert listed.contains(f) : f + " is missing"; - } + private void checkFiles(CFMetaData cfm, Directories directories) + { + Directories.SSTableLister lister; + Set listed;// List all but no snapshot, backup + lister = directories.sstableLister(Directories.OnTxnErr.THROW); + listed = new HashSet<>(lister.listFiles()); + for (File f : files.get(cfm.cfName)) + { + if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR)) + assertFalse(f + " should not be listed", listed.contains(f)); + else + assertTrue(f + " is missing", listed.contains(f)); + } - // List all but including backup (but no snapshot) - lister = directories.sstableLister(Directories.OnTxnErr.THROW).includeBackups(true); - listed = new HashSet<>(lister.listFiles()); - for (File f : files.get(cfm.cfName)) - { - if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR)) - assert !listed.contains(f) : f + " should not be listed"; - else - assert listed.contains(f) : f + " is missing"; - } + // List all but including backup (but no snapshot) + lister = directories.sstableLister(Directories.OnTxnErr.THROW).includeBackups(true); + listed = new HashSet<>(lister.listFiles()); + for (File f : files.get(cfm.cfName)) + { + if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR)) + assertFalse(f + " should not be listed", listed.contains(f)); + else + assertTrue(f + " is missing", listed.contains(f)); + } - // Skip temporary and compacted - lister = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); - listed = new HashSet<>(lister.listFiles()); - for (File f : files.get(cfm.cfName)) - { - if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR)) - assert !listed.contains(f) : f + " should not be listed"; - else if (f.getName().contains("tmp-")) - assert !listed.contains(f) : f + " should not be listed"; - else - assert listed.contains(f) : f + " is missing"; - } + // Skip temporary and compacted + lister = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); + listed = new HashSet<>(lister.listFiles()); + for (File f : files.get(cfm.cfName)) + { + if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR)) + assertFalse(f + " should not be listed", listed.contains(f)); + else if (f.getName().contains("tmp-")) + assertFalse(f + " should not be listed", listed.contains(f)); + else + assertTrue(f + " is missing", listed.contains(f)); } } + @Test + public void testTemporaryFile() throws IOException + { + for (CFMetaData cfm : CFM) + { + Directories directories = new Directories(cfm); + + File tempDir = directories.getTemporaryWriteableDirectoryAsFile(10); + tempDir.mkdir(); + File tempFile = new File(tempDir, "tempFile"); + tempFile.createNewFile(); + + assertTrue(tempDir.exists()); + assertTrue(tempFile.exists()); + + //make sure temp dir/file will not affect existing sstable listing + checkFiles(cfm, directories); + + directories.removeTemporaryDirectories(); + + //make sure temp dir/file deletion will not affect existing sstable listing + checkFiles(cfm, directories); + + assertFalse(tempDir.exists()); + assertFalse(tempFile.exists()); + } + } @Test public void testDiskFailurePolicy_best_effort() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/gms/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java index 8b7ad1f..e50b461 100644 --- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.gms; import org.apache.cassandra.AbstractSerializationsTester; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java index 9154d79..74b5c74 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java @@ -30,7 +30,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.DataPosition; import org.apache.cassandra.io.util.MmappedRegions; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.io.util.SequentialWriter; @@ -84,7 +84,7 @@ public class CompressedRandomAccessReaderTest for (int i = 0; i < 20; i++) writer.write("x".getBytes()); - FileMark mark = writer.mark(); + DataPosition mark = writer.mark(); // write enough garbage to create new chunks: for (int i = 0; i < 40; ++i) writer.write("y".getBytes()); @@ -126,7 +126,7 @@ public class CompressedRandomAccessReaderTest : SequentialWriter.open(f)) { writer.write("The quick ".getBytes()); - FileMark mark = writer.mark(); + DataPosition mark = writer.mark(); writer.write("blue fox jumps over the lazy dog".getBytes()); // write enough to be sure to change chunk http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java index 56c83da..9b09f0b 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java @@ -38,7 +38,7 @@ import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.DataPosition; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.io.util.SequentialWriterTest; import org.apache.cassandra.schema.CompressionParams; @@ -104,7 +104,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest dataPost.flip(); writer.write(dataPre); - FileMark mark = writer.mark(); + DataPosition mark = writer.mark(); // Write enough garbage to transition chunk for (int i = 0; i < CompressionParams.DEFAULT_CHUNK_LENGTH; i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index 00727b8..4b9a769 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -21,14 +21,13 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Random; -import java.util.Set; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -37,16 +36,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator; -import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -55,7 +48,6 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.sstable.format.big.BigFormat; -import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.StreamPlan; @@ -63,8 +55,6 @@ import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; - /** * Tests backwards compatibility for SSTables */ @@ -73,10 +63,7 @@ public class LegacySSTableTest private static final Logger logger = LoggerFactory.getLogger(LegacySSTableTest.class); public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root"; - public static final String KSNAME = "Keyspace1"; - public static final String CFNAME = "Standard1"; - public static Set TEST_DATA; public static File LEGACY_SSTABLE_ROOT; /** @@ -104,58 +91,78 @@ public class LegacySSTableTest public static void defineSchema() throws ConfigurationException { SchemaLoader.prepareServer(); - - CFMetaData metadata = CFMetaData.Builder.createDense(KSNAME, CFNAME, false, false) - .addPartitionKey("key", BytesType.instance) - .addClusteringColumn("column", BytesType.instance) - .addRegularColumn("value", BytesType.instance) - .build(); - - SchemaLoader.createKeyspace(KSNAME, - KeyspaceParams.simple(1), - metadata); - beforeClass(); - } - - public static void beforeClass() - { + StorageService.instance.initServer(); Keyspace.setInitialized(); + createKeyspace(); + for (String legacyVersion : legacyVersions) + { + createTables(legacyVersion); + } String scp = System.getProperty(LEGACY_SSTABLE_PROP); assert scp != null; LEGACY_SSTABLE_ROOT = new File(scp).getAbsoluteFile(); assert LEGACY_SSTABLE_ROOT.isDirectory(); + } - TEST_DATA = new HashSet(); - for (int i = 100; i < 1000; ++i) - TEST_DATA.add(Integer.toString(i)); + @After + public void tearDown() + { + for (String legacyVersion : legacyVersions) + { + truncateTables(legacyVersion); + } } /** * Get a descriptor for the legacy sstable at the given version. */ - protected Descriptor getDescriptor(String ver) + protected Descriptor getDescriptor(String legacyVersion, String table) { - File directory = new File(LEGACY_SSTABLE_ROOT + File.separator + ver + File.separator + KSNAME); - return new Descriptor(ver, directory, KSNAME, CFNAME, 0, SSTableFormat.Type.LEGACY); + return new Descriptor(legacyVersion, getTableDir(legacyVersion, table), "legacy_tables", table, 1, + BigFormat.instance.getVersion(legacyVersion).hasNewFileName()? + SSTableFormat.Type.BIG :SSTableFormat.Type.LEGACY); } @Test - public void testStreaming() throws Throwable + public void testLoadLegacyCqlTables() throws Exception { - StorageService.instance.initServer(); + for (String legacyVersion : legacyVersions) + { + logger.info("Loading legacy version: {}", legacyVersion); + loadLegacyTables(legacyVersion); + CacheService.instance.invalidateKeyCache(); + long startCount = CacheService.instance.keyCache.size(); + verifyReads(legacyVersion); + verifyCache(legacyVersion, startCount); + } + } + + @Test + public void testStreamLegacyCqlTables() throws Exception + { + for (String legacyVersion : legacyVersions) + { + streamLegacyTables(legacyVersion); + verifyReads(legacyVersion); + } + } - for (File version : LEGACY_SSTABLE_ROOT.listFiles()) + private void streamLegacyTables(String legacyVersion) throws Exception + { + for (int compact = 0; compact <= 1; compact++) { - if (!new File(LEGACY_SSTABLE_ROOT + File.separator + version.getName() + File.separator + KSNAME).isDirectory()) - continue; - if (Version.validate(version.getName()) && SSTableFormat.Type.LEGACY.info.getVersion(version.getName()).isCompatibleForStreaming()) - testStreaming(version.getName()); + logger.info("Streaming legacy version {}{}", legacyVersion, getCompactNameSuffix(compact)); + streamLegacyTable("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact)); + streamLegacyTable("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact)); + streamLegacyTable("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact)); + streamLegacyTable("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact)); } } - private void testStreaming(String version) throws Exception + private void streamLegacyTable(String tablePattern, String legacyVersion, String compactNameSuffix) throws Exception { - SSTableReader sstable = SSTableReader.open(getDescriptor(version)); + String table = String.format(tablePattern, legacyVersion, compactNameSuffix); + SSTableReader sstable = SSTableReader.open(getDescriptor(legacyVersion, table)); IPartitioner p = sstable.getPartitioner(); List> ranges = new ArrayList<>(); ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100")))); @@ -166,94 +173,41 @@ public class LegacySSTableTest sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt)); new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details) .execute().get(); - - ColumnFamilyStore cfs = Keyspace.open(KSNAME).getColumnFamilyStore(CFNAME); - assert cfs.getLiveSSTables().size() == 1; - sstable = cfs.getLiveSSTables().iterator().next(); - for (String keystring : TEST_DATA) - { - ByteBuffer key = bytes(keystring); - - SliceableUnfilteredRowIterator iter = sstable.iterator(Util.dk(key), ColumnFilter.selectionBuilder().add(cfs.metadata.getColumnDefinition(bytes("name"))).build(), false, false); - - // check not deleted (CASSANDRA-6527) - assert iter.partitionLevelDeletion().equals(DeletionTime.LIVE); - assert iter.next().clustering().get(0).equals(key); - } - sstable.selfRef().release(); - } - - @Test - public void testVersions() throws Throwable - { - boolean notSkipped = false; - - for (File version : LEGACY_SSTABLE_ROOT.listFiles()) - { - if (!new File(LEGACY_SSTABLE_ROOT + File.separator + version.getName() + File.separator + KSNAME).isDirectory()) - continue; - if (Version.validate(version.getName()) && SSTableFormat.Type.LEGACY.info.getVersion(version.getName()).isCompatible()) - { - notSkipped = true; - testVersion(version.getName()); - } - } - - assert notSkipped; } - public void testVersion(String version) throws Throwable + private static void loadLegacyTables(String legacyVersion) throws Exception { - try - { - ColumnFamilyStore cfs = Keyspace.open(KSNAME).getColumnFamilyStore(CFNAME); - - - SSTableReader reader = SSTableReader.open(getDescriptor(version)); - for (String keystring : TEST_DATA) - { - - ByteBuffer key = bytes(keystring); - - SliceableUnfilteredRowIterator iter = reader.iterator(Util.dk(key), ColumnFilter.selection(cfs.metadata.partitionColumns()), false, false); - - // check not deleted (CASSANDRA-6527) - assert iter.partitionLevelDeletion().equals(DeletionTime.LIVE); - assert iter.next().clustering().get(0).equals(key); - } - - // TODO actually test some reads - } - catch (Throwable e) + for (int compact = 0; compact <= 1; compact++) { - System.err.println("Failed to read " + version); - throw e; + logger.info("Preparing legacy version {}{}", legacyVersion, getCompactNameSuffix(compact)); + loadLegacyTable("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact)); + loadLegacyTable("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact)); + loadLegacyTable("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact)); + loadLegacyTable("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact)); } } - @Test - public void testLegacyCqlTables() throws Exception + private static void verifyCache(String legacyVersion, long startCount) throws InterruptedException, java.util.concurrent.ExecutionException { - createKeyspace(); - - loadLegacyTables(); + //For https://issues.apache.org/jira/browse/CASSANDRA-10778 + //Validate whether the key cache successfully saves in the presence of old keys as + //well as loads the correct number of keys + long endCount = CacheService.instance.keyCache.size(); + Assert.assertTrue(endCount > startCount); + CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); + CacheService.instance.invalidateKeyCache(); + Assert.assertEquals(startCount, CacheService.instance.keyCache.size()); + CacheService.instance.keyCache.loadSaved(); + if (BigFormat.instance.getVersion(legacyVersion).storeRows()) + Assert.assertEquals(endCount, CacheService.instance.keyCache.size()); + else + Assert.assertEquals(startCount, CacheService.instance.keyCache.size()); } - private static void loadLegacyTables() throws Exception + private static void verifyReads(String legacyVersion) { - for (String legacyVersion : legacyVersions) + for (int compact = 0; compact <= 1; compact++) { - logger.info("Preparing legacy version {}", legacyVersion); - - createTables(legacyVersion); - - loadLegacyTable("legacy_%s_simple", legacyVersion); - loadLegacyTable("legacy_%s_simple_counter", legacyVersion); - loadLegacyTable("legacy_%s_clust", legacyVersion); - loadLegacyTable("legacy_%s_clust_counter", legacyVersion); - - CacheService.instance.invalidateKeyCache(); - long startCount = CacheService.instance.keyCache.size(); for (int ck = 0; ck < 50; ck++) { String ckValue = Integer.toString(ck) + longString; @@ -265,58 +219,94 @@ public class LegacySSTableTest UntypedResultSet rs; if (ck == 0) { - rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple WHERE pk=?", legacyVersion), pkValue); - Assert.assertNotNull(rs); - Assert.assertEquals(1, rs.size()); - Assert.assertEquals("foo bar baz", rs.one().getString("val")); - rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple_counter WHERE pk=?", legacyVersion), pkValue); - Assert.assertNotNull(rs); - Assert.assertEquals(1, rs.size()); - Assert.assertEquals(1L, rs.one().getLong("val")); + readSimpleTable(legacyVersion, getCompactNameSuffix(compact), pkValue); + readSimpleCounterTable(legacyVersion, getCompactNameSuffix(compact), pkValue); } - rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust WHERE pk=? AND ck=?", legacyVersion), pkValue, ckValue); - assertLegacyClustRows(1, rs); - - String ckValue2 = Integer.toString(ck < 10 ? 40 : ck - 1) + longString; - String ckValue3 = Integer.toString(ck > 39 ? 10 : ck + 1) + longString; - rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust WHERE pk=? AND ck IN (?, ?, ?)", legacyVersion), pkValue, ckValue, ckValue2, ckValue3); - assertLegacyClustRows(3, rs); - - rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust_counter WHERE pk=? AND ck=?", legacyVersion), pkValue, ckValue); - Assert.assertNotNull(rs); - Assert.assertEquals(1, rs.size()); - Assert.assertEquals(1L, rs.one().getLong("val")); + readClusteringTable(legacyVersion, getCompactNameSuffix(compact), ck, ckValue, pkValue); + readClusteringCounterTable(legacyVersion, getCompactNameSuffix(compact), ckValue, pkValue); } } - - //For https://issues.apache.org/jira/browse/CASSANDRA-10778 - //Validate whether the key cache successfully saves in the presence of old keys as - //well as loads the correct number of keys - long endCount = CacheService.instance.keyCache.size(); - Assert.assertTrue(endCount > startCount); - CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); - CacheService.instance.invalidateKeyCache(); - Assert.assertEquals(startCount, CacheService.instance.keyCache.size()); - CacheService.instance.keyCache.loadSaved(); - if (BigFormat.instance.getVersion(legacyVersion).storeRows()) - Assert.assertEquals(endCount, CacheService.instance.keyCache.size()); - else - Assert.assertEquals(startCount, CacheService.instance.keyCache.size()); } } - private void createKeyspace() + private static void readClusteringCounterTable(String legacyVersion, String compactSuffix, String ckValue, String pkValue) + { + logger.debug("Read legacy_{}_clust_counter{}", legacyVersion, compactSuffix); + UntypedResultSet rs; + rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust_counter%s WHERE pk=? AND ck=?", legacyVersion, compactSuffix), pkValue, ckValue); + Assert.assertNotNull(rs); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals(1L, rs.one().getLong("val")); + } + + private static void readClusteringTable(String legacyVersion, String compactSuffix, int ck, String ckValue, String pkValue) + { + logger.debug("Read legacy_{}_clust{}", legacyVersion, compactSuffix); + UntypedResultSet rs; + rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust%s WHERE pk=? AND ck=?", legacyVersion, compactSuffix), pkValue, ckValue); + assertLegacyClustRows(1, rs); + + String ckValue2 = Integer.toString(ck < 10 ? 40 : ck - 1) + longString; + String ckValue3 = Integer.toString(ck > 39 ? 10 : ck + 1) + longString; + rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust%s WHERE pk=? AND ck IN (?, ?, ?)", legacyVersion, compactSuffix), pkValue, ckValue, ckValue2, ckValue3); + assertLegacyClustRows(3, rs); + } + + private static void readSimpleCounterTable(String legacyVersion, String compactSuffix, String pkValue) + { + logger.debug("Read legacy_{}_simple_counter{}", legacyVersion, compactSuffix); + UntypedResultSet rs; + rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple_counter%s WHERE pk=?", legacyVersion, compactSuffix), pkValue); + Assert.assertNotNull(rs); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals(1L, rs.one().getLong("val")); + } + + private static void readSimpleTable(String legacyVersion, String compactSuffix, String pkValue) + { + logger.debug("Read simple: legacy_{}_simple{}", legacyVersion, compactSuffix); + UntypedResultSet rs; + rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple%s WHERE pk=?", legacyVersion, compactSuffix), pkValue); + Assert.assertNotNull(rs); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals("foo bar baz", rs.one().getString("val")); + } + + private static void createKeyspace() { QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}"); } private static void createTables(String legacyVersion) { - QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple (pk text PRIMARY KEY, val text)", legacyVersion)); - QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter (pk text PRIMARY KEY, val counter)", legacyVersion)); - QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust (pk text, ck text, val text, PRIMARY KEY (pk, ck))", legacyVersion)); - QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter (pk text, ck text, val counter, PRIMARY KEY (pk, ck))", legacyVersion)); + for (int i=0; i<=1; i++) + { + String compactSuffix = getCompactNameSuffix(i); + String tableSuffix = i == 0? "" : " WITH COMPACT STORAGE"; + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple%s (pk text PRIMARY KEY, val text)%s", legacyVersion, compactSuffix, tableSuffix)); + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter%s (pk text PRIMARY KEY, val counter)%s", legacyVersion, compactSuffix, tableSuffix)); + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust%s (pk text, ck text, val text, PRIMARY KEY (pk, ck))%s", legacyVersion, compactSuffix, tableSuffix)); + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter%s (pk text, ck text, val counter, PRIMARY KEY (pk, ck))%s", legacyVersion, compactSuffix, tableSuffix)); + } + } + + private static String getCompactNameSuffix(int i) + { + return i == 0? "" : "_compact"; + } + + private static void truncateTables(String legacyVersion) + { + for (int compact = 0; compact <= 1; compact++) + { + QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact))); + QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact))); + QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact))); + QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact))); + } + CacheService.instance.invalidateCounterCache(); + CacheService.instance.invalidateKeyCache(); } private static void assertLegacyClustRows(int count, UntypedResultSet rs) @@ -332,9 +322,9 @@ public class LegacySSTableTest } } - private static void loadLegacyTable(String tablePattern, String legacyVersion) throws IOException + private static void loadLegacyTable(String tablePattern, String legacyVersion, String compactSuffix) throws IOException { - String table = String.format(tablePattern, legacyVersion); + String table = String.format(tablePattern, legacyVersion, compactSuffix); logger.info("Loading legacy table {}", table); @@ -349,7 +339,7 @@ public class LegacySSTableTest } /** - * Generates sstables for 4 CQL tables (see {@link #createTables(String)}) in current + * Generates sstables for 8 CQL tables (see {@link #createTables(String)}) in current * sstable format (version) into {@code test/data/legacy-sstables/VERSION}, where * {@code VERSION} matches {@link Version#getVersion() BigFormat.latestVersion.getVersion()}. *

@@ -357,13 +347,10 @@ public class LegacySSTableTest * during development. I.e. remove the {@code @Ignore} annotation temporarily. *

*/ - @Test @Ignore + @Test public void testGenerateSstables() throws Throwable { - createKeyspace(); - createTables(BigFormat.latestVersion.getVersion()); - Random rand = new Random(); StringBuilder sb = new StringBuilder(); for (int i = 0; i < 128; i++) @@ -372,25 +359,28 @@ public class LegacySSTableTest } String randomString = sb.toString(); - for (int pk = 0; pk < 5; pk++) + for (int compact = 0; compact <= 1; compact++) { - String valPk = Integer.toString(pk); - QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_simple (pk, val) VALUES ('%s', '%s')", - BigFormat.latestVersion, valPk, "foo bar baz")); + for (int pk = 0; pk < 5; pk++) + { + String valPk = Integer.toString(pk); + QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_simple%s (pk, val) VALUES ('%s', '%s')", + BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, "foo bar baz")); - QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_simple_counter SET val = val + 1 WHERE pk = '%s'", - BigFormat.latestVersion, valPk)); + QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_simple_counter%s SET val = val + 1 WHERE pk = '%s'", + BigFormat.latestVersion, getCompactNameSuffix(compact), valPk)); - for (int ck = 0; ck < 50; ck++) - { - String valCk = Integer.toString(ck); + for (int ck = 0; ck < 50; ck++) + { + String valCk = Integer.toString(ck); - QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_clust (pk, ck, val) VALUES ('%s', '%s', '%s')", - BigFormat.latestVersion, valPk, valCk + longString, randomString)); + QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_clust%s (pk, ck, val) VALUES ('%s', '%s', '%s')", + BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, valCk + longString, randomString)); - QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_clust_counter SET val = val + 1 WHERE pk = '%s' AND ck='%s'", - BigFormat.latestVersion, valPk, valCk + longString)); + QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_clust_counter%s SET val = val + 1 WHERE pk = '%s' AND ck='%s'", + BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, valCk + longString)); + } } } @@ -398,10 +388,13 @@ public class LegacySSTableTest File ksDir = new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables", BigFormat.latestVersion)); ksDir.mkdirs(); - copySstablesFromTestData(String.format("legacy_%s_simple", BigFormat.latestVersion), ksDir); - copySstablesFromTestData(String.format("legacy_%s_simple_counter", BigFormat.latestVersion), ksDir); - copySstablesFromTestData(String.format("legacy_%s_clust", BigFormat.latestVersion), ksDir); - copySstablesFromTestData(String.format("legacy_%s_clust_counter", BigFormat.latestVersion), ksDir); + for (int compact = 0; compact <= 1; compact++) + { + copySstablesFromTestData(String.format("legacy_%s_simple%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir); + copySstablesFromTestData(String.format("legacy_%s_simple_counter%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir); + copySstablesFromTestData(String.format("legacy_%s_clust%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir); + copySstablesFromTestData(String.format("legacy_%s_clust_counter%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir); + } } private void copySstablesFromTestData(String table, File ksDir) throws IOException @@ -420,12 +413,17 @@ public class LegacySSTableTest private static void copySstablesToTestData(String legacyVersion, String table, File cfDir) throws IOException { - for (File file : new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table)).listFiles()) + for (File file : getTableDir(legacyVersion, table).listFiles()) { copyFile(cfDir, file); } } + private static File getTableDir(String legacyVersion, String table) + { + return new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table)); + } + private static void copyFile(File cfDir, File file) throws IOException { byte[] buf = new byte[65536]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java index c3a4539..360d262 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java @@ -406,7 +406,7 @@ public class BufferedRandomAccessFileTest assert tmpFile.getPath().equals(r.getPath()); // Create a mark and move the rw there. - final FileMark mark = r.mark(); + final DataPosition mark = r.mark(); r.reset(mark); // Expect this call to succeed. @@ -457,7 +457,7 @@ public class BufferedRandomAccessFileTest RandomAccessReader file = RandomAccessReader.open(channel); file.seek(10); - FileMark mark = file.mark(); + DataPosition mark = file.mark(); file.seek(file.length()); assertTrue(file.isEOF()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java index f0d4383..e83c015 100644 --- a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java @@ -356,7 +356,7 @@ public class RandomAccessReaderTest assertFalse(reader.isEOF()); assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining()); - FileMark mark = reader.mark(); + DataPosition mark = reader.mark(); assertEquals(0, reader.bytesPastMark()); assertEquals(0, reader.bytesPastMark(mark)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java new file mode 100644 index 0000000..175ab53 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class RewindableDataInputStreamPlusTest +{ + + private final int INITIAL_BUFFER_SIZE = 1; + + private File file; + + @Before + public void setup() throws Exception + { + this.file = new File(System.getProperty("java.io.tmpdir"), "subdir/test.buffer"); + } + + @Test + public void testMarkAndResetSimple() throws Exception + { + byte[] testData; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(baos)) + { + // boolean + out.writeBoolean(true); + // byte + out.writeByte(0x1); + // char + out.writeChar('a'); + // short + out.writeShort(1); + // int + out.writeInt(1); + // long + out.writeLong(1L); + // float + out.writeFloat(1.0f); + // double + out.writeDouble(1.0d); + + // String + out.writeUTF("abc"); + testData = baos.toByteArray(); + } + + for (int memCapacity = 0; memCapacity <= 16; memCapacity++) + { + int diskCapacity = 16 - memCapacity; + try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData), + INITIAL_BUFFER_SIZE, memCapacity, file, + diskCapacity)) + { + try { + //should mark before resetting + reader.reset(null); + fail("Should have thrown IOException"); + } catch (IOException e) {} + + assertTrue(reader.readBoolean()); + + reader.mark(); + + try { + //cannot mark already marked stream + reader.mark(); + fail("Should have thrown IllegalStateException"); + } catch (IllegalStateException e) {} + + assertEquals(0x1, reader.readByte()); + assertEquals('a', reader.readChar()); + assertEquals(3, reader.bytesPastMark(null)); + reader.reset(null); + + try { + //cannot mark when reading from cache + reader.mark(); + fail("Should have thrown IllegalStateException"); + } catch (IllegalStateException e) {} + + //read again previous sequence + assertEquals(0x1, reader.readByte()); + assertEquals('a', reader.readChar()); + //finish reading again previous sequence + assertEquals(1, reader.readShort()); + + reader.mark(); + assertEquals(1, reader.readInt()); + assertEquals(1L, reader.readLong()); + assertEquals(1.0f, reader.readFloat(), 0); + assertEquals(16, reader.bytesPastMark(null)); + reader.reset(null); + + //read again previous sequence + assertEquals(1, reader.readInt()); + assertEquals(1L, reader.readLong()); + assertEquals(1.0f, reader.readFloat(), 0); + //finish reading again previous sequence + + //mark again + reader.mark(); + assertEquals(1.0d, reader.readDouble(), 0); + assertEquals(8, reader.bytesPastMark(null)); + reader.reset(null); + + //read again previous sequence + assertEquals(1.0d, reader.readDouble(), 0); + //finish reading again previous sequence + + //mark and reset + reader.mark(); + reader.reset(null); + + assertEquals("abc", reader.readUTF()); + + //check max file size + assertEquals(diskCapacity, file.length()); + } + assertFalse(file.exists()); + } + } + + @Test + public void testVeryLargeCapacity() throws Exception + { + byte[] testData; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(baos)) + { + out.writeUTF("abc"); + testData = baos.toByteArray(); + } + + try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData), + INITIAL_BUFFER_SIZE, Integer.MAX_VALUE, file, + Integer.MAX_VALUE)) + { + reader.mark(); + assertEquals("abc", reader.readUTF()); + reader.reset(); + assertEquals("abc", reader.readUTF()); + } + assertFalse(file.exists()); + + + baos = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(baos)) + { + out.writeBoolean(true); + out.writeBoolean(true); + testData = baos.toByteArray(); + } + } + + @Test + public void testMarkAndResetBigBuffer() throws Exception + { + byte[] testData; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(baos)) + { + // boolean + out.writeBoolean(true); + // byte + out.writeByte(0x1); + // char + out.writeChar('a'); + // short + out.writeShort(1); + // int + out.writeInt(1); + // long + out.writeLong(1L); + // float + out.writeFloat(1.0f); + // double + out.writeDouble(1.0d); + + // String + out.writeUTF("abc"); + testData = baos.toByteArray(); + + // 1 (boolean) + 1 (byte) + 2 (char) + 2 (short) + 4 (int) + 8 (long) + // + 4 (float) + 8 (double) + 5 bytes (utf string) + } + + for (int memCapacity = 0; memCapacity <= 18; memCapacity++) + { + int diskCapacity = 18 - memCapacity; + try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData), + INITIAL_BUFFER_SIZE, memCapacity, file, + diskCapacity)) + { + //read a big amount before resetting + reader.mark(); + assertTrue(reader.readBoolean()); + assertEquals(0x1, reader.readByte()); + assertEquals('a', reader.readChar()); + assertEquals(1, reader.readShort()); + assertEquals(1, reader.readInt()); + assertEquals(1L, reader.readLong()); + reader.reset(); + + //read from buffer + assertTrue(reader.readBoolean()); + assertEquals(0x1, reader.readByte()); + assertEquals('a', reader.readChar()); + assertEquals(1, reader.readShort()); + assertEquals(1, reader.readInt()); + assertEquals(1L, reader.readLong()); + + assertEquals(17, reader.available()); + + //mark again + reader.mark(); + assertEquals(1.0f, reader.readFloat(), 0); + assertEquals(1.0d, reader.readDouble(), 0); + assertEquals("abc", reader.readUTF()); + reader.reset(); + + assertEquals(17, reader.available()); + + assertEquals(1.0f, reader.readFloat(), 0); + assertEquals(1.0d, reader.readDouble(), 0); + assertEquals("abc", reader.readUTF()); + } + assertFalse(file.exists()); + } + } + + + @Test + public void testCircularSpillFile() throws Exception + { + byte[] testData; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(baos)) + { + // boolean + out.writeBoolean(true); + // byte + out.writeByte(0x1); + // char + out.writeChar('a'); + // short + out.writeShort(1); + // int + out.writeInt(1); + + // String + out.writeUTF("ab"); + testData = baos.toByteArray(); + + // 1 (boolean) + 1 (byte) + 2 (char) + 2 (short) + 4 (int) + 4 bytes (utf string) + } + + //read at most 4 bytes multiple times (and then check file size) + int MEM_SIZE = 0; + int DISK_SIZE = 4; + try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData), + INITIAL_BUFFER_SIZE, MEM_SIZE, file, + DISK_SIZE)) + { + //read 2 bytes and reset + reader.mark(); + assertTrue(reader.readBoolean()); + assertEquals(0x1, reader.readByte()); + assertEquals(2, reader.bytesPastMark(null)); + reader.reset(); + + //read again previous sequence + assertTrue(reader.readBoolean()); + assertEquals(0x1, reader.readByte()); + //finish reading again previous sequence + + //read 4 bytes and reset + reader.mark(); + assertEquals('a', reader.readChar()); + assertEquals(1, reader.readShort()); + assertEquals(4, reader.bytesPastMark(null)); + reader.reset(); + + //read again previous sequence + assertEquals('a', reader.readChar()); + assertEquals(1, reader.readShort()); + //finish reading again previous sequence + + //read 4 bytes and reset + reader.mark(); + assertEquals(1, reader.readInt()); + assertEquals(4, reader.bytesPastMark(null)); + reader.reset(); + + //read again previous sequence + assertEquals(1, reader.readInt()); + + //check max file size + assertEquals(DISK_SIZE, file.length()); + } + assertFalse(file.exists()); + } + + @Test + public void testExhaustCapacity() throws Exception + { + byte[] testData; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(baos)) + { + // boolean + out.writeBoolean(true); + // byte + out.writeByte(0x1); + // char + out.writeChar('a'); + // short + out.writeShort(1); + testData = baos.toByteArray(); + } + + //test capacity exhausted when reading more than 4 bytes + testCapacityExhausted(testData, 0, 2); + testCapacityExhausted(testData, 2, 0); + testCapacityExhausted(testData, 1, 1); + } + + private void testCapacityExhausted(byte[] testData, int memSize, int diskSize) throws IOException + { + try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData), + INITIAL_BUFFER_SIZE, memSize, file, + diskSize)) + { + //read 2 bytes and reset + reader.mark(); + assertTrue(reader.readBoolean()); + assertEquals(0x1, reader.readByte()); + assertEquals(2, reader.bytesPastMark(null)); + reader.reset(); + + //read again previous sequence + assertTrue(reader.readBoolean()); + assertEquals(0x1, reader.readByte()); + //finish reading again previous sequence + + reader.mark(); + //read 3 bytes - OK + assertEquals('a', reader.readChar()); + //read 1 more bytes - CAPACITY will exhaust when trying to reset :( + assertEquals(1, reader.readShort()); + + try + { + reader.reset(); + fail("Should have thrown IOException"); + } + catch (IOException e) {} + + //check max file size + assertEquals(diskSize, file.length()); + } + assertFalse(file.exists()); + } + + @Test + public void testMarkAndResetUnsignedRead() throws Exception + { + byte[] testData; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(baos)) + { + // byte + out.writeByte(0x1); + // short + out.writeShort(2); + testData = baos.toByteArray(); + } + + for (int memCapacity = 0; memCapacity <= 1; memCapacity++) + { + int diskCapacity = 1 - memCapacity; + try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData), + INITIAL_BUFFER_SIZE, memCapacity, file, + diskCapacity)) + { + reader.mark(); + assertEquals(1, reader.readUnsignedByte()); + reader.reset(); + assertEquals(1, reader.readUnsignedByte()); + + //will read first byte of short 2 + reader.mark(); + assertEquals(0, reader.readUnsignedByte()); + reader.reset(); + + assertEquals(2, reader.readUnsignedShort()); + + reader.mark(); + reader.reset(); + assertEquals(0, reader.available()); + } + } + assertFalse(file.exists()); + } + + @Test + public void testMarkAndResetSkipBytes() throws Exception + { + String testStr = "1234567890"; + byte[] testData = testStr.getBytes(); + + for (int memCapacity = 0; memCapacity <= 7; memCapacity++) + { + int diskCapacity = 7 - memCapacity; + try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData), + INITIAL_BUFFER_SIZE, memCapacity, file, + diskCapacity)) + { + reader.mark(); + // read first 5 bytes and rewind + byte[] out = new byte[5]; + reader.readFully(out, 0, 5); + assertEquals("12345", new String(out)); + + // then skip 2 bytes (67) + reader.skipBytes(2); + + assertEquals(7, reader.bytesPastMark(null)); + reader.reset(); + + //now read part of the previously skipped bytes + out = new byte[5]; + reader.readFully(out); + assertEquals("12345", new String(out)); + + //skip 3 bytes (2 from cache, 1 from stream) + reader.skip(3); + + // mark and read 2 more bytes + reader.mark(); + out = new byte[2]; + reader.readFully(out); + assertEquals("90", new String(out)); + assertEquals(0, reader.available()); + reader.reset(); + + //reset and read only the next byte "9" in the third position + reader.readFully(out, 1, 1); + assertEquals("99", new String(out)); + + //now we read the remainder via readline + assertEquals(1, reader.available()); + assertEquals("0", reader.readLine()); + + } + assertFalse(file.exists()); + } + } + + @Test + public void testMarkAndResetReadFully() throws Exception + { + String testStr = "1234567890"; + byte[] testData = testStr.getBytes(); + + for (int memCapacity = 0; memCapacity <= 5; memCapacity++) + { + int diskCapacity = 5 - memCapacity; + try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData), + INITIAL_BUFFER_SIZE, memCapacity, file, + diskCapacity)) + { + reader.mark(); + // read first 5 bytes and rewind + byte[] out = new byte[5]; + reader.readFully(out, 0, 5); + assertEquals("12345", new String(out)); + reader.reset(); + + // read half from cache, half from parent stream + out = new byte[7]; + reader.readFully(out); + assertEquals("1234567", new String(out)); + + // mark and read 3 more bytes + reader.mark(); + out = new byte[3]; + reader.readFully(out); + assertEquals("890", new String(out)); + assertEquals(0, reader.available()); + reader.reset(); + + //reset and read only the next byte "8" in the third position + reader.readFully(out, 2, 1); + assertEquals("898", new String(out)); + + //now we read the remainder via readline + assertEquals(2, reader.available()); + assertEquals("90", reader.readLine()); + } + assertFalse(file.exists()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java b/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java index 221e55c..7693b45 100644 --- a/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java +++ b/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java @@ -18,6 +18,10 @@ */ package org.apache.cassandra.utils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -25,8 +29,10 @@ import java.io.DataOutputStream; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import org.apache.cassandra.io.util.BytesReadTracker; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.TrackedDataInputPlus; +import org.apache.cassandra.io.util.TrackedInputStream; public class BytesReadTrackerTest { @@ -34,6 +40,33 @@ public class BytesReadTrackerTest @Test public void testBytesRead() throws Exception { + internalTestBytesRead(true); + internalTestBytesRead(false); + } + + @Test + public void testUnsignedRead() throws Exception + { + internalTestUnsignedRead(true); + internalTestUnsignedRead(false); + } + + @Test + public void testSkipBytesAndReadFully() throws Exception + { + internalTestSkipBytesAndReadFully(true); + internalTestSkipBytesAndReadFully(false); + } + + @Test + public void testReadLine() throws Exception + { + internalTestReadLine(true); + internalTestReadLine(false); + } + + public void internalTestBytesRead(boolean inputStream) throws Exception + { byte[] testData; ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -66,45 +99,46 @@ public class BytesReadTrackerTest out.close(); } - DataInputStream in = new DataInputStream(new ByteArrayInputStream(testData)); - BytesReadTracker tracker = new BytesReadTracker(in); + DataInputPlus.DataInputStreamPlus in = new DataInputPlus.DataInputStreamPlus(new ByteArrayInputStream(testData)); + BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in); + DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker; try { // boolean = 1byte - boolean bool = tracker.readBoolean(); + boolean bool = reader.readBoolean(); assertTrue(bool); assertEquals(1, tracker.getBytesRead()); // byte = 1byte - byte b = tracker.readByte(); + byte b = reader.readByte(); assertEquals(b, 0x1); assertEquals(2, tracker.getBytesRead()); // char = 2byte - char c = tracker.readChar(); + char c = reader.readChar(); assertEquals('a', c); assertEquals(4, tracker.getBytesRead()); // short = 2bytes - short s = tracker.readShort(); + short s = reader.readShort(); assertEquals(1, s); assertEquals((short) 6, tracker.getBytesRead()); // int = 4bytes - int i = tracker.readInt(); + int i = reader.readInt(); assertEquals(1, i); assertEquals(10, tracker.getBytesRead()); // long = 8bytes - long l = tracker.readLong(); + long l = reader.readLong(); assertEquals(1L, l); assertEquals(18, tracker.getBytesRead()); // float = 4bytes - float f = tracker.readFloat(); + float f = reader.readFloat(); assertEquals(1.0f, f, 0); assertEquals(22, tracker.getBytesRead()); // double = 8bytes - double d = tracker.readDouble(); + double d = reader.readDouble(); assertEquals(1.0d, d, 0); assertEquals(30, tracker.getBytesRead()); // String("abc") = 2(string size) + 3 = 5 bytes - String str = tracker.readUTF(); + String str = reader.readUTF(); assertEquals("abc", str); assertEquals(35, tracker.getBytesRead()); @@ -119,8 +153,7 @@ public class BytesReadTrackerTest assertEquals(0, tracker.getBytesRead()); } - @Test - public void testUnsignedRead() throws Exception + public void internalTestUnsignedRead(boolean inputStream) throws Exception { byte[] testData; @@ -139,17 +172,18 @@ public class BytesReadTrackerTest out.close(); } - DataInputStream in = new DataInputStream(new ByteArrayInputStream(testData)); - BytesReadTracker tracker = new BytesReadTracker(in); + DataInputPlus.DataInputStreamPlus in = new DataInputPlus.DataInputStreamPlus(new ByteArrayInputStream(testData)); + BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in); + DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker; try { // byte = 1byte - int b = tracker.readUnsignedByte(); + int b = reader.readUnsignedByte(); assertEquals(b, 1); assertEquals(1, tracker.getBytesRead()); // short = 2bytes - int s = tracker.readUnsignedShort(); + int s = reader.readUnsignedShort(); assertEquals(1, s); assertEquals(3, tracker.getBytesRead()); @@ -161,30 +195,30 @@ public class BytesReadTrackerTest } } - @Test - public void testSkipBytesAndReadFully() throws Exception + public void internalTestSkipBytesAndReadFully(boolean inputStream) throws Exception { String testStr = "1234567890"; byte[] testData = testStr.getBytes(); - DataInputStream in = new DataInputStream(new ByteArrayInputStream(testData)); - BytesReadTracker tracker = new BytesReadTracker(in); + DataInputPlus.DataInputStreamPlus in = new DataInputPlus.DataInputStreamPlus(new ByteArrayInputStream(testData)); + BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in); + DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker; try { // read first 5 bytes byte[] out = new byte[5]; - tracker.readFully(out, 0, 5); + reader.readFully(out, 0, 5); assertEquals("12345", new String(out)); assertEquals(5, tracker.getBytesRead()); // then skip 2 bytes - tracker.skipBytes(2); + reader.skipBytes(2); assertEquals(7, tracker.getBytesRead()); // and read the rest out = new byte[3]; - tracker.readFully(out); + reader.readFully(out); assertEquals("890", new String(out)); assertEquals(10, tracker.getBytesRead()); @@ -196,16 +230,24 @@ public class BytesReadTrackerTest } } - @Test(expected = UnsupportedOperationException.class) - public void testReadLine() throws Exception + public void internalTestReadLine(boolean inputStream) throws Exception { DataInputStream in = new DataInputStream(new ByteArrayInputStream("1".getBytes())); - BytesReadTracker tracker = new BytesReadTracker(in); + BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in); + DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker; try { - // throws UnsupportedOperationException - tracker.readLine(); + String line = reader.readLine(); + if (inputStream) + assertEquals(line, "1"); + else + fail("Should have thrown UnsupportedOperationException"); + } + catch (UnsupportedOperationException e) + { + if (inputStream) + fail("Should have not thrown UnsupportedOperationException"); } finally {