cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xe...@apache.org
Subject [1/2] cassandra git commit: Avoid index segment stitching in RAM which lead to OOM on big SSTable files
Date Sun, 27 Mar 2016 22:21:41 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.5 f6c5d7298 -> 5c4d5c731


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
index 4663692..39a0fbc 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
@@ -20,14 +20,18 @@ package org.apache.cassandra.index.sasi.disk;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.rows.BTreeRow;
 import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.Row;
@@ -36,6 +40,7 @@ import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -158,4 +163,89 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
 
         FileUtils.closeQuietly(index);
     }
+
+    @Test
+    public void testSparse() throws Exception
+    {
+        final String columnName = "timestamp";
+
+        ColumnFamilyStore cfs = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
+        ColumnDefinition column = cfs.metadata.getColumnDefinition(UTF8Type.instance.decompose(columnName));
+
+        SASIIndex sasi = (SASIIndex) cfs.indexManager.getIndexByName(columnName);
+
+        File directory = cfs.getDirectories().getDirectoryForNewSSTables();
+        Descriptor descriptor = Descriptor.fromFilename(cfs.getSSTablePath(directory));
+        PerSSTableIndexWriter indexWriter = (PerSSTableIndexWriter) sasi.getFlushObserver(descriptor,
OperationType.FLUSH);
+
+        final long now = System.currentTimeMillis();
+
+        indexWriter.begin();
+        indexWriter.indexes.put(column, indexWriter.newIndex(sasi.getIndex()));
+
+        populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap<Long,
Set<Integer>>()
+        {{
+            put(now,     new HashSet<>(Arrays.asList(0, 1)));
+            put(now + 1, new HashSet<>(Arrays.asList(2, 3)));
+            put(now + 2, new HashSet<>(Arrays.asList(4, 5, 6, 7, 8, 9)));
+        }});
+
+        Callable<OnDiskIndex> segmentBuilder = indexWriter.getIndex(column).scheduleSegmentFlush(false);
+
+        Assert.assertNull(segmentBuilder.call());
+
+        PerSSTableIndexWriter.Index index = indexWriter.getIndex(column);
+        Random random = ThreadLocalRandom.current();
+
+        Set<String> segments = new HashSet<>();
+        // now let's test multiple correct segments with yield incorrect final segment
+        for (int i = 0; i < 3; i++)
+        {
+            populateSegment(cfs.metadata, index, new HashMap<Long, Set<Integer>>()
+            {{
+                put(now,     new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(),
random.nextInt())));
+                put(now + 1, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(),
random.nextInt())));
+                put(now + 2, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(),
random.nextInt())));
+            }});
+
+            try
+            {
+                // flush each of the new segments, they should all succeed
+                OnDiskIndex segment = index.scheduleSegmentFlush(false).call();
+                index.segments.add(Futures.immediateFuture(segment));
+                segments.add(segment.getIndexPath());
+            }
+            catch (Exception | FSError e)
+            {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+
+        // make sure that all of the segments are present of the filesystem
+        for (String segment : segments)
+            Assert.assertTrue(new File(segment).exists());
+
+        indexWriter.complete();
+
+        // make sure that individual segments have been cleaned up
+        for (String segment : segments)
+            Assert.assertFalse(new File(segment).exists());
+
+        // and combined index doesn't exist either
+        Assert.assertFalse(new File(index.outputFile).exists());
+    }
+
+    private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index
index, Map<Long, Set<Integer>> data)
+    {
+        for (Map.Entry<Long, Set<Integer>> value : data.entrySet())
+        {
+            ByteBuffer term = LongType.instance.decompose(value.getKey());
+            for (Integer keyPos : value.getValue())
+            {
+                ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", keyPos));
+                index.add(term, metadata.partitioner.decorateKey(key), ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE
- 1));
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
index d09b8d4..189e9c6 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
+import org.apache.cassandra.index.sasi.utils.CombinedTerm;
 import org.apache.cassandra.index.sasi.utils.CombinedValue;
 import org.apache.cassandra.index.sasi.utils.MappedBuffer;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
@@ -35,7 +36,6 @@ import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.MurmurHash;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.SequentialWriter;
 
@@ -84,82 +84,21 @@ public class TokenTreeTest
     final static SortedMap<Long, LongSet> tokens = bigTokensMap;
 
     @Test
-    public void buildAndIterate() throws Exception
+    public void testSerializedSizeDynamic() throws Exception
     {
-        final TokenTreeBuilder builder = new TokenTreeBuilder(tokens).finish();
-        final Iterator<Pair<Long, LongSet>> tokenIterator = builder.iterator();
-        final Iterator<Map.Entry<Long, LongSet>> listIterator = tokens.entrySet().iterator();
-        while (tokenIterator.hasNext() && listIterator.hasNext())
-        {
-            Pair<Long, LongSet> tokenNext = tokenIterator.next();
-            Map.Entry<Long, LongSet> listNext = listIterator.next();
-
-            Assert.assertEquals(listNext.getKey(), tokenNext.left);
-            Assert.assertEquals(listNext.getValue(), tokenNext.right);
-        }
-
-        Assert.assertFalse("token iterator not finished", tokenIterator.hasNext());
-        Assert.assertFalse("list iterator not finished", listIterator.hasNext());
+        testSerializedSize(new DynamicTokenTreeBuilder(tokens));
     }
 
     @Test
-    public void buildWithMultipleMapsAndIterate() throws Exception
+    public void testSerializedSizeStatic() throws Exception
     {
-        final SortedMap<Long, LongSet> merged = new TreeMap<>();
-        final TokenTreeBuilder builder = new TokenTreeBuilder(simpleTokenMap).finish();
-        builder.add(collidingTokensMap);
-
-        merged.putAll(collidingTokensMap);
-        for (Map.Entry<Long, LongSet> entry : simpleTokenMap.entrySet())
-        {
-            if (merged.containsKey(entry.getKey()))
-            {
-                LongSet mergingOffsets  = entry.getValue();
-                LongSet existingOffsets = merged.get(entry.getKey());
-
-                if (mergingOffsets.equals(existingOffsets))
-                    continue;
-
-                Set<Long> mergeSet = new HashSet<>();
-                for (LongCursor merging : mergingOffsets)
-                    mergeSet.add(merging.value);
-
-                for (LongCursor existing : existingOffsets)
-                    mergeSet.add(existing.value);
-
-                LongSet mergedResults = new LongOpenHashSet();
-                for (Long result : mergeSet)
-                    mergedResults.add(result);
-
-                merged.put(entry.getKey(), mergedResults);
-            }
-            else
-            {
-                merged.put(entry.getKey(), entry.getValue());
-            }
-        }
-
-        final Iterator<Pair<Long, LongSet>> tokenIterator = builder.iterator();
-        final Iterator<Map.Entry<Long, LongSet>> listIterator = merged.entrySet().iterator();
-        while (tokenIterator.hasNext() && listIterator.hasNext())
-        {
-            Pair<Long, LongSet> tokenNext = tokenIterator.next();
-            Map.Entry<Long, LongSet> listNext = listIterator.next();
-
-            Assert.assertEquals(listNext.getKey(), tokenNext.left);
-            Assert.assertEquals(listNext.getValue(), tokenNext.right);
-        }
-
-        Assert.assertFalse("token iterator not finished", tokenIterator.hasNext());
-        Assert.assertFalse("list iterator not finished", listIterator.hasNext());
-
+        testSerializedSize(new StaticTokenTreeBuilder(new FakeCombinedTerm(tokens)));
     }
 
-    @Test
-    public void testSerializedSize() throws Exception
-    {
-        final TokenTreeBuilder builder = new TokenTreeBuilder(tokens).finish();
 
+    public void testSerializedSize(final TokenTreeBuilder builder) throws Exception
+    {
+        builder.finish();
         final File treeFile = File.createTempFile("token-tree-size-test", "tt");
         treeFile.deleteOnExit();
 
@@ -171,13 +110,26 @@ public class TokenTreeTest
 
         final RandomAccessReader reader = RandomAccessReader.open(treeFile);
         Assert.assertEquals((int) reader.bytesRemaining(), builder.serializedSize());
+        reader.close();
     }
 
     @Test
-    public void buildSerializeAndIterate() throws Exception
+    public void buildSerializeAndIterateDynamic() throws Exception
     {
-        final TokenTreeBuilder builder = new TokenTreeBuilder(simpleTokenMap).finish();
+        buildSerializeAndIterate(new DynamicTokenTreeBuilder(simpleTokenMap), simpleTokenMap);
+    }
 
+    @Test
+    public void buildSerializeAndIterateStatic() throws Exception
+    {
+        buildSerializeAndIterate(new StaticTokenTreeBuilder(new FakeCombinedTerm(tokens)),
tokens);
+    }
+
+
+    public void buildSerializeAndIterate(TokenTreeBuilder builder, SortedMap<Long, LongSet>
tokenMap) throws Exception
+    {
+
+        builder.finish();
         final File treeFile = File.createTempFile("token-tree-iterate-test1", "tt");
         treeFile.deleteOnExit();
 
@@ -191,7 +143,7 @@ public class TokenTreeTest
         final TokenTree tokenTree = new TokenTree(new MappedBuffer(reader));
 
         final Iterator<Token> tokenIterator = tokenTree.iterator(KEY_CONVERTER);
-        final Iterator<Map.Entry<Long, LongSet>> listIterator = simpleTokenMap.entrySet().iterator();
+        final Iterator<Map.Entry<Long, LongSet>> listIterator = tokenMap.entrySet().iterator();
         while (tokenIterator.hasNext() && listIterator.hasNext())
         {
             Token treeNext = tokenIterator.next();
@@ -208,19 +160,30 @@ public class TokenTreeTest
     }
 
     @Test
-    public void buildSerializeAndGet() throws Exception
+    public void buildSerializeAndGetDynamic() throws Exception
+    {
+        buildSerializeAndGet(false);
+    }
+
+    @Test
+    public void buildSerializeAndGetStatic() throws Exception
+    {
+        buildSerializeAndGet(true);
+    }
+
+    public void buildSerializeAndGet(boolean isStatic) throws Exception
     {
         final long tokMin = 0;
         final long tokMax = 1000;
 
-        final TokenTree tokenTree = generateTree(tokMin, tokMax);
+        final TokenTree tokenTree = generateTree(tokMin, tokMax, isStatic);
 
         for (long i = 0; i <= tokMax; i++)
         {
             TokenTree.OnDiskToken result = tokenTree.get(i, KEY_CONVERTER);
             Assert.assertNotNull("failed to find object for token " + i, result);
 
-            Set<Long> found = result.getOffsets();
+            LongSet found = result.getOffsets();
             Assert.assertEquals(1, found.size());
             Assert.assertEquals(i, found.toArray()[0]);
         }
@@ -229,10 +192,20 @@ public class TokenTreeTest
     }
 
     @Test
-    public void buildSerializeIterateAndSkip() throws Exception
+    public void buildSerializeIterateAndSkipDynamic() throws Exception
+    {
+        buildSerializeIterateAndSkip(new DynamicTokenTreeBuilder(tokens), tokens);
+    }
+
+    @Test
+    public void buildSerializeIterateAndSkipStatic() throws Exception
     {
-        final TokenTreeBuilder builder = new TokenTreeBuilder(tokens).finish();
+        buildSerializeIterateAndSkip(new StaticTokenTreeBuilder(new FakeCombinedTerm(tokens)),
tokens);
+    }
 
+    public void buildSerializeIterateAndSkip(TokenTreeBuilder builder, SortedMap<Long,
LongSet> tokens) throws Exception
+    {
+        builder.finish();
         final File treeFile = File.createTempFile("token-tree-iterate-test2", "tt");
         treeFile.deleteOnExit();
 
@@ -278,10 +251,20 @@ public class TokenTreeTest
     }
 
     @Test
-    public void skipPastEnd() throws Exception
+    public void skipPastEndDynamic() throws Exception
     {
-        final TokenTreeBuilder builder = new TokenTreeBuilder(simpleTokenMap).finish();
+        skipPastEnd(new DynamicTokenTreeBuilder(simpleTokenMap), simpleTokenMap);
+    }
 
+    @Test
+    public void skipPastEndStatic() throws Exception
+    {
+        skipPastEnd(new StaticTokenTreeBuilder(new FakeCombinedTerm(simpleTokenMap)), simpleTokenMap);
+    }
+
+    public void skipPastEnd(TokenTreeBuilder builder, SortedMap<Long, LongSet> tokens)
throws Exception
+    {
+        builder.finish();
         final File treeFile = File.createTempFile("token-tree-skip-past-test", "tt");
         treeFile.deleteOnExit();
 
@@ -294,17 +277,28 @@ public class TokenTreeTest
         final RandomAccessReader reader = RandomAccessReader.open(treeFile);
         final RangeIterator<Long, Token> tokenTree = new TokenTree(new MappedBuffer(reader)).iterator(KEY_CONVERTER);
 
-        tokenTree.skipTo(simpleTokenMap.lastKey() + 10);
+        tokenTree.skipTo(tokens.lastKey() + 10);
     }
 
     @Test
-    public void testTokenMerge() throws Exception
+    public void testTokenMergeDyanmic() throws Exception
+    {
+        testTokenMerge(false);
+    }
+
+    @Test
+    public void testTokenMergeStatic() throws Exception
+    {
+        testTokenMerge(true);
+    }
+
+    public void testTokenMerge(boolean isStatic) throws Exception
     {
         final long min = 0, max = 1000;
 
         // two different trees with the same offsets
-        TokenTree treeA = generateTree(min, max);
-        TokenTree treeB = generateTree(min, max);
+        TokenTree treeA = generateTree(min, max, isStatic);
+        TokenTree treeB = generateTree(min, max, isStatic);
 
         RangeIterator<Long, Token> a = treeA.iterator(new KeyConverter());
         RangeIterator<Long, Token> b = treeB.iterator(new KeyConverter());
@@ -400,6 +394,52 @@ public class TokenTreeTest
         }
     }
 
+    public static class FakeCombinedTerm extends CombinedTerm
+    {
+        private final SortedMap<Long, LongSet> tokens;
+
+        public FakeCombinedTerm(SortedMap<Long, LongSet> tokens)
+        {
+            super(null, null);
+            this.tokens = tokens;
+        }
+
+        public RangeIterator<Long, Token> getTokenIterator()
+        {
+            return new TokenMapIterator(tokens);
+        }
+    }
+
+    public static class TokenMapIterator extends RangeIterator<Long, Token>
+    {
+        public final Iterator<Map.Entry<Long, LongSet>> iterator;
+
+        public TokenMapIterator(SortedMap<Long, LongSet> tokens)
+        {
+            super(tokens.firstKey(), tokens.lastKey(), tokens.size());
+            iterator = tokens.entrySet().iterator();
+        }
+
+        public Token computeNext()
+        {
+            if (!iterator.hasNext())
+                return endOfData();
+
+            Map.Entry<Long, LongSet> entry = iterator.next();
+            return new TokenWithOffsets(entry.getKey(), entry.getValue());
+        }
+
+        public void close() throws IOException
+        {
+
+        }
+
+        public void performSkipTo(Long next)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     public static class TokenWithOffsets extends Token
     {
         private final LongSet offsets;
@@ -411,6 +451,12 @@ public class TokenTreeTest
         }
 
         @Override
+        public LongSet getOffsets()
+        {
+            return offsets;
+        }
+
+        @Override
         public void merge(CombinedValue<Long> other)
         {}
 
@@ -498,7 +544,7 @@ public class TokenTreeTest
         return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(hashed), buf);
     }
 
-    private static TokenTree generateTree(final long minToken, final long maxToken) throws
IOException
+    private static TokenTree generateTree(final long minToken, final long maxToken, boolean
isStatic) throws IOException
     {
         final SortedMap<Long, LongSet> toks = new TreeMap<Long, LongSet>()
         {{
@@ -510,7 +556,8 @@ public class TokenTreeTest
                 }
         }};
 
-        final TokenTreeBuilder builder = new TokenTreeBuilder(toks).finish();
+        final TokenTreeBuilder builder = isStatic ? new StaticTokenTreeBuilder(new FakeCombinedTerm(toks))
: new DynamicTokenTreeBuilder(toks);
+        builder.finish();
         final File treeFile = File.createTempFile("token-tree-get-test", "tt");
         treeFile.deleteOnExit();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java b/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
index 96e7610..205d28f 100644
--- a/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
+++ b/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
@@ -23,6 +23,8 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.index.sasi.disk.Token;
 
@@ -80,6 +82,12 @@ public class LongIterator extends RangeIterator<Long, Token>
         }
 
         @Override
+        public LongSet getOffsets()
+        {
+            return new LongOpenHashSet(4);
+        }
+
+        @Override
         public Iterator<DecoratedKey> iterator()
         {
             return Collections.emptyIterator();


Mime
View raw message