cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Date Tue, 24 Apr 2018 07:01:46 GMT
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d387f5e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d387f5e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d387f5e

Branch: refs/heads/trunk
Commit: 1d387f5e7f688150c09b7eb14a036d153017ec02
Parents: 1a2eb5e 684120d
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Tue Apr 24 08:56:34 2018 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Tue Apr 24 08:56:34 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  80 +++++++-------
 .../db/compaction/AntiCompactionTest.java       | 105 ++++++++++++++++++-
 .../org/apache/cassandra/schema/MockSchema.java |  16 ++-
 4 files changed, 161 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d387f5e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6976c7f,5450322..784fa2b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -261,10 -30,12 +261,11 @@@ Merged from 3.0
   * Fully utilise specified compaction threads (CASSANDRA-14210)
   * Pre-create deletion log records to finish compactions quicker (CASSANDRA-12763)
  Merged from 2.2:
+  * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)
   * Fix JSON queries with IN restrictions and ORDER BY clause (CASSANDRA-14286)
 - * Backport circleci yaml (CASSANDRA-14240)
 + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
  Merged from 2.1:
   * Check checksum before decompressing data (CASSANDRA-14284)
 - * CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt (CASSANDRA-14183)
  
  
  3.11.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d387f5e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5672dfe,f0a4de5..831d8ca
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -648,63 -645,67 +648,30 @@@ public class CompactionManager implemen
                                        Refs<SSTableReader> validatedForRepair,
                                        LifecycleTransaction txn,
                                        long repairedAt,
 +                                      UUID pendingRepair,
                                        UUID parentRepairSession) throws InterruptedException,
IOException
      {
 -        logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables",
parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(),
cfs.getLiveSSTables());
 -        logger.trace("[repair #{}] Starting anticompaction for ranges {}", parentRepairSession,
ranges);
 -        Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
 -        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
 -        // we should only notify that repair status changed if it actually did:
 -        Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
 -        Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
 -        for (SSTableReader sstable : sstables)
 -            wasRepairedBefore.put(sstable, sstable.isRepaired());
 -
 -        Set<SSTableReader> nonAnticompacting = new HashSet<>();
 -
 -        Iterator<SSTableReader> sstableIterator = sstables.iterator();
          try
          {
 -            List<Range<Token>> normalizedRanges = Range.normalize(ranges);
 -
 -            while (sstableIterator.hasNext())
 -            {
 -                SSTableReader sstable = sstableIterator.next();
 +            ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentRepairSession);
 +            Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for previews");
  
 -                Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(),
sstable.last.getToken());
 +            logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession),
cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size());
 +            logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession),
ranges);
 +            Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
  
-             Set<SSTableReader> nonAnticompacting = new HashSet<>();
- 
 -                boolean shouldAnticompact = false;
 +            Iterator<SSTableReader> sstableIterator = sstables.iterator();
 +            List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-             Set<SSTableReader> fullyContainedSSTables = new HashSet<>();
- 
-             while (sstableIterator.hasNext())
-             {
-                 SSTableReader sstable = sstableIterator.next();
  
-                 Range<Token> sstableRange = new Range<>(sstable.first.getToken(),
sstable.last.getToken());
 -                for (Range<Token> r : normalizedRanges)
 -                {
 -                    if (r.contains(sstableBounds.left) && r.contains(sstableBounds.right))
 -                    {
 -                        logger.info("[repair #{}] SSTable {} fully contained in range {},
mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r);
 -                        sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor,
repairedAt);
 -                        sstable.reloadSSTableMetadata();
 -                        mutatedRepairStatuses.add(sstable);
 -                        if (!wasRepairedBefore.get(sstable))
 -                            mutatedRepairStatusToNotify.add(sstable);
 -                        sstableIterator.remove();
 -                        shouldAnticompact = true;
 -                        break;
 -                    }
 -                    else if (r.intersects(sstableBounds))
 -                    {
 -                        logger.info("[repair #{}] SSTable {} ({}) will be anticompacted
on range {}", parentRepairSession, sstable, sstableBounds, r);
 -                        shouldAnticompact = true;
 -                    }
 -                }
++            Set<SSTableReader> fullyContainedSSTables = findSSTablesToAnticompact(sstableIterator,
normalizedRanges, parentRepairSession);
  
-                 boolean shouldAnticompact = false;
- 
-                 for (Range<Token> r : normalizedRanges)
-                 {
-                     if (r.contains(sstableRange))
-                     {
-                         logger.info("{} SSTable {} fully contained in range {}, mutating
repairedAt instead of anticompacting", PreviewKind.NONE.logPrefix(parentRepairSession), sstable,
r);
-                         fullyContainedSSTables.add(sstable);
-                         sstableIterator.remove();
-                         shouldAnticompact = true;
-                         break;
-                     }
-                     else if (sstableRange.intersects(r))
-                     {
-                         logger.info("{} SSTable {} ({}) will be anticompacted on range {}",
PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableRange, r);
-                         shouldAnticompact = true;
-                     }
-                 }
- 
--                if (!shouldAnticompact)
--                {
-                     logger.info("{} SSTable {} ({}) does not intersect repaired ranges {},
not touching repairedAt.", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableRange,
normalizedRanges);
 -                    logger.info("[repair #{}] SSTable {} ({}) does not intersect repaired
ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges);
--                    nonAnticompacting.add(sstable);
--                    sstableIterator.remove();
--                }
--            }
 -            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify);
 -            txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses));
 -            validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
 +            cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables));
 +            cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, repairedAt,
pendingRepair);
-             txn.cancel(Sets.union(nonAnticompacting, fullyContainedSSTables));
-             validatedForRepair.release(Sets.union(nonAnticompacting, fullyContainedSSTables));
++            txn.cancel(fullyContainedSSTables);
++            validatedForRepair.release(fullyContainedSSTables);
              assert txn.originals().equals(sstables);
              if (!sstables.isEmpty())
 -                doAntiCompaction(cfs, ranges, txn, repairedAt);
 +                doAntiCompaction(cfs, ranges, txn, repairedAt, pendingRepair);
              txn.finish();
          }
          finally
@@@ -713,9 -714,9 +680,50 @@@
              txn.close();
          }
  
 -        logger.info("[repair #{}] Completed anticompaction successfully", parentRepairSession);
 +        logger.info("{} Completed anticompaction successfully", PreviewKind.NONE.logPrefix(parentRepairSession));
 +    }
 +
++    @VisibleForTesting
++    static Set<SSTableReader> findSSTablesToAnticompact(Iterator<SSTableReader>
sstableIterator, List<Range<Token>> normalizedRanges, UUID parentRepairSession)
++    {
++        Set<SSTableReader> fullyContainedSSTables = new HashSet<>();
++        while (sstableIterator.hasNext())
++        {
++            SSTableReader sstable = sstableIterator.next();
++
++            Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(),
sstable.last.getToken());
++
++            boolean shouldAnticompact = false;
++
++            for (Range<Token> r : normalizedRanges)
++            {
++                // ranges are normalized - no wrap around - if first and last are contained
we know that all tokens are contained in the range
++                if (r.contains(sstable.first.getToken()) && r.contains(sstable.last.getToken()))
++                {
++                    logger.info("{} SSTable {} fully contained in range {}, mutating repairedAt
instead of anticompacting", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, r);
++                    fullyContainedSSTables.add(sstable);
++                    sstableIterator.remove();
++                    shouldAnticompact = true;
++                    break;
++                }
++                else if (r.intersects(sstableBounds))
++                {
++                    logger.info("{} SSTable {} ({}) will be anticompacted on range {}",
PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableBounds, r);
++                    shouldAnticompact = true;
++                }
++            }
++
++            if (!shouldAnticompact)
++            {
++                // this should never happen - in PendingAntiCompaction#getSSTables we select
all sstables that intersect the repaired ranges, that can't have changed here
++                String message = String.format("%s SSTable %s (%s) does not intersect repaired
ranges %s, this sstable should not have been included.", PreviewKind.NONE.logPrefix(parentRepairSession),
sstable, sstableBounds, normalizedRanges);
++                logger.error(message);
++                throw new IllegalStateException(message);
++            }
++        }
++        return fullyContainedSSTables;
+     }
+ 
      public void performMaximal(final ColumnFamilyStore cfStore, boolean splitOutput)
      {
          FBUtilities.waitOnFutures(submitMaximal(cfStore, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()),
splitOutput));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d387f5e/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 6e7e184,841a22e..bda05af
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -19,24 -19,20 +19,30 @@@ package org.apache.cassandra.db.compact
  
  import java.io.File;
  import java.io.IOException;
++import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collection;
++import java.util.Collections;
++import java.util.Iterator;
  import java.util.List;
  import java.util.Set;
  import java.util.UUID;
  
  import com.google.common.collect.ImmutableSet;
  import com.google.common.collect.Iterables;
 -import com.google.common.util.concurrent.RateLimiter;
 +import com.google.common.collect.Lists;
++import com.google.common.collect.Sets;
 +import org.junit.Assert;
  import org.junit.BeforeClass;
  import org.junit.After;
 +import org.junit.Ignore;
  import org.junit.Test;
  
 -import org.apache.cassandra.config.CFMetaData;
++import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.schema.MockSchema;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.db.rows.EncodingStats;
@@@ -310,10 -274,9 +316,11 @@@ public class AntiCompactionTes
          ColumnFamilyStore store = prepareColumnFamilyStore();
          Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
          assertEquals(store.getLiveSSTables().size(), sstables.size());
-         Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()),
new BytesToken("9999".getBytes()));
++        // the sstables start at "0".getBytes() = 48, we need to include that first token,
with "/".getBytes() = 47
+         Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()),
new BytesToken("9999".getBytes()));
          List<Range<Token>> ranges = Arrays.asList(range);
 -        UUID parentRepairSession = UUID.randomUUID();
 +        UUID parentRepairSession = pendingRepair == null ? UUID.randomUUID() : pendingRepair;
 +        registerParentRepairSession(parentRepairSession, ranges, repairedAt, pendingRepair);
  
          try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
               Refs<SSTableReader> refs = Refs.ref(sstables))
@@@ -351,22 -302,21 +358,29 @@@
          {
              generateSStable(store,Integer.toString(table));
          }
++        int refCountBefore = Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount();
          Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
          assertEquals(store.getLiveSSTables().size(), sstables.size());
  
          Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()),
new BytesToken("-10".getBytes()));
          List<Range<Token>> ranges = Arrays.asList(range);
          UUID parentRepairSession = UUID.randomUUID();
 -
 +        registerParentRepairSession(parentRepairSession, ranges, UNREPAIRED_SSTABLE, null);
- 
++        boolean gotException = false;
          try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
               Refs<SSTableReader> refs = Refs.ref(sstables))
          {
 -            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1,
parentRepairSession);
 +            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1,
NO_PENDING_REPAIR, parentRepairSession);
 +        }
++        catch (IllegalStateException e)
++        {
++            gotException = true;
+         }
  
++        assertTrue(gotException);
          assertThat(store.getLiveSSTables().size(), is(10));
          assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(false));
++        assertEquals(refCountBefore, Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount());
      }
  
      private ColumnFamilyStore prepareColumnFamilyStore()
@@@ -399,42 -349,5 +413,129 @@@
          return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE,
(s) -> !s.isRepaired()));
      }
  
 +    /**
 +     * If the parent repair session is missing, we should still clean up
 +     */
 +    @Test
 +    public void missingParentRepairSession() throws Exception
 +    {
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 +        store.disableAutoCompaction();
 +
 +        for (int table = 0; table < 10; table++)
 +        {
 +            generateSStable(store,Integer.toString(table));
 +        }
 +        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
 +        assertEquals(10, sstables.size());
 +
 +        Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()),
new BytesToken("-10".getBytes()));
 +        List<Range<Token>> ranges = Arrays.asList(range);
 +
 +        UUID missingRepairSession = UUIDGen.getTimeUUID();
 +        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
 +             Refs<SSTableReader> refs = Refs.ref(sstables))
 +        {
 +            Assert.assertFalse(refs.isEmpty());
 +            try
 +            {
 +                CompactionManager.instance.performAnticompaction(store, ranges, refs, txn,
1, missingRepairSession, missingRepairSession);
 +                Assert.fail("expected RuntimeException");
 +            }
 +            catch (RuntimeException e)
 +            {
 +                // expected
 +            }
 +            Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state());
 +            Assert.assertTrue(refs.isEmpty());
 +        }
 +    }
++
++    @Test
++    public void testSSTablesToInclude()
++    {
++        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
++        List<SSTableReader> sstables = new ArrayList<>();
++        sstables.add(MockSchema.sstable(1, 10, 100, cfs));
++        sstables.add(MockSchema.sstable(2, 100, 200, cfs));
++
++        Range<Token> r = new Range<>(t(10), t(100)); // should include sstable
1 and 2 above, but none is fully contained (Range is (x, y])
+ 
++        Iterator<SSTableReader> sstableIterator = sstables.iterator();
++        Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator,
Collections.singletonList(r), UUID.randomUUID());
++        assertTrue(fullyContainedSSTables.isEmpty());
++        assertEquals(2, sstables.size());
++    }
++
++    @Test
++    public void testSSTablesToInclude2()
++    {
++        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
++        List<SSTableReader> sstables = new ArrayList<>();
++        SSTableReader sstable1 = MockSchema.sstable(1, 10, 100, cfs);
++        SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs);
++        sstables.add(sstable1);
++        sstables.add(sstable2);
++
++        Range<Token> r = new Range<>(t(9), t(100)); // sstable 1 is fully contained
++
++        Iterator<SSTableReader> sstableIterator = sstables.iterator();
++        Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator,
Collections.singletonList(r), UUID.randomUUID());
++        assertEquals(Collections.singleton(sstable1), fullyContainedSSTables);
++        assertEquals(Collections.singletonList(sstable2), sstables);
++    }
++
++    @Test(expected = IllegalStateException.class)
++    public void testSSTablesToNotInclude()
++    {
++        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
++        List<SSTableReader> sstables = new ArrayList<>();
++        SSTableReader sstable1 = MockSchema.sstable(1, 0, 5, cfs);
++        sstables.add(sstable1);
++
++        Range<Token> r = new Range<>(t(9), t(100)); // sstable is not intersecting
and should not be included
++
++        Iterator<SSTableReader> sstableIterator = sstables.iterator();
++        CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r),
UUID.randomUUID());
++    }
++
++    @Test(expected = IllegalStateException.class)
++    public void testSSTablesToNotInclude2()
++    {
++        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
++        List<SSTableReader> sstables = new ArrayList<>();
++        SSTableReader sstable1 = MockSchema.sstable(1, 10, 10, cfs);
++        SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs);
++        sstables.add(sstable1);
++        sstables.add(sstable2);
++
++        Range<Token> r = new Range<>(t(10), t(11)); // no sstable included,
throw
++
++        Iterator<SSTableReader> sstableIterator = sstables.iterator();
++        CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r),
UUID.randomUUID());
++    }
++
++    @Test
++    public void testSSTablesToInclude4()
++    {
++        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
++        List<SSTableReader> sstables = new ArrayList<>();
++        SSTableReader sstable1 = MockSchema.sstable(1, 10, 100, cfs);
++        SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs);
++        sstables.add(sstable1);
++        sstables.add(sstable2);
++
++        Range<Token> r = new Range<>(t(9), t(200)); // sstable 2 is fully contained
- last token is equal
++
++        Iterator<SSTableReader> sstableIterator = sstables.iterator();
++        Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator,
Collections.singletonList(r), UUID.randomUUID());
++        assertEquals(Sets.newHashSet(sstable1, sstable2), fullyContainedSSTables);
++        assertTrue(sstables.isEmpty());
++    }
++
++    private Token t(long t)
++    {
++        return new Murmur3Partitioner.LongToken(t);
++    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d387f5e/test/unit/org/apache/cassandra/schema/MockSchema.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/schema/MockSchema.java
index 99fff32,0000000..05de7ac
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@@ -1,187 -1,0 +1,197 @@@
 +/*
 +* Licensed to the Apache Software Foundation (ASF) under one
 +* or more contributor license agreements.  See the NOTICE file
 +* distributed with this work for additional information
 +* regarding copyright ownership.  The ASF licenses this file
 +* to you under the Apache License, Version 2.0 (the
 +* "License"); you may not use this file except in compliance
 +* with the License.  You may obtain a copy of the License at
 +*
 +*    http://www.apache.org/licenses/LICENSE-2.0
 +*
 +* Unless required by applicable law or agreed to in writing,
 +* software distributed under the License is distributed on an
 +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +* KIND, either express or implied.  See the License for the
 +* specific language governing permissions and limitations
 +* under the License.
 +*/
 +package org.apache.cassandra.schema;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.io.RandomAccessFile;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import com.google.common.collect.ImmutableSet;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.IndexSummary;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.FileHandle;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.Memory;
 +import org.apache.cassandra.schema.CachingParams;
 +import org.apache.cassandra.schema.KeyspaceMetadata;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.TableMetadataRef;
 +import org.apache.cassandra.utils.AlwaysPresentFilter;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +public class MockSchema
 +{
 +    static
 +    {
 +        Memory offsets = Memory.allocate(4);
 +        offsets.setInt(0, 0);
 +        indexSummary = new IndexSummary(Murmur3Partitioner.instance, offsets, 0, Memory.allocate(4),
0, 0, 0, 1);
 +    }
 +    private static final AtomicInteger id = new AtomicInteger();
 +    public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks",
KeyspaceParams.simpleTransient(1)));
 +
 +    public static final IndexSummary indexSummary;
 +    private static final FileHandle RANDOM_ACCESS_READER_FACTORY = new FileHandle.Builder(temp("mocksegmentedfile").getAbsolutePath()).complete();
 +
 +    public static Memtable memtable(ColumnFamilyStore cfs)
 +    {
 +        return new Memtable(cfs.metadata());
 +    }
 +
 +    public static SSTableReader sstable(int generation, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, false, cfs);
 +    }
 +
++    public static SSTableReader sstable(int generation, long first, long last, ColumnFamilyStore
cfs)
++    {
++        return sstable(generation, 0, false, first, last, cfs);
++    }
++
 +    public static SSTableReader sstable(int generation, boolean keepRef, ColumnFamilyStore
cfs)
 +    {
 +        return sstable(generation, 0, keepRef, cfs);
 +    }
 +
 +    public static SSTableReader sstable(int generation, int size, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, size, false, cfs);
 +    }
- 
 +    public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore
cfs)
 +    {
++        return sstable(generation, size, keepRef, generation, generation, cfs);
++    }
++
++    public static SSTableReader sstable(int generation, int size, boolean keepRef, long
firstToken, long lastToken, ColumnFamilyStore cfs)
++    {
 +        Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
 +                                               cfs.keyspace.getName(),
 +                                               cfs.getTableName(),
 +                                               generation, SSTableFormat.Type.BIG);
 +        Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX,
Component.FILTER, Component.TOC);
 +        for (Component component : components)
 +        {
 +            File file = new File(descriptor.filenameFor(component));
 +            try
 +            {
 +                file.createNewFile();
 +            }
 +            catch (IOException e)
 +            {
 +            }
 +        }
 +        if (size > 0)
 +        {
 +            try
 +            {
 +                File file = new File(descriptor.filenameFor(Component.DATA));
 +                try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
 +                {
 +                    raf.setLength(size);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        }
 +        SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList());
 +        StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator)
 +                                                 .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(),
0.01f, -1, null, header)
 +                                                 .get(MetadataType.STATS);
 +        SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
 +                                                          RANDOM_ACCESS_READER_FACTORY.sharedCopy(),
RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),
 +                                                          new AlwaysPresentFilter(), 1L,
metadata, SSTableReader.OpenReason.NORMAL, header);
-         reader.first = reader.last = readerBounds(generation);
++        reader.first = readerBounds(firstToken);
++        reader.last = readerBounds(lastToken);
 +        if (!keepRef)
 +            reader.selfRef().release();
 +        return reader;
 +    }
 +
 +    public static ColumnFamilyStore newCFS()
 +    {
 +        return newCFS(ks.getName());
 +    }
 +
 +    public static ColumnFamilyStore newCFS(String ksname)
 +    {
 +        String cfname = "mockcf" + (id.incrementAndGet());
 +        TableMetadata metadata = newTableMetadata(ksname, cfname);
 +        return new ColumnFamilyStore(ks, cfname, 0, new TableMetadataRef(metadata), new
Directories(metadata), false, false, false);
 +    }
 +
 +    public static TableMetadata newTableMetadata(String ksname, String cfname)
 +    {
 +        return TableMetadata.builder(ksname, cfname)
 +                            .partitioner(Murmur3Partitioner.instance)
 +                            .addPartitionKeyColumn("key", UTF8Type.instance)
 +                            .addClusteringColumn("col", UTF8Type.instance)
 +                            .addRegularColumn("value", UTF8Type.instance)
 +                            .caching(CachingParams.CACHE_NOTHING)
 +                            .build();
 +    }
 +
-     public static BufferDecoratedKey readerBounds(int generation)
++    public static BufferDecoratedKey readerBounds(long generation)
 +    {
 +        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(generation), ByteBufferUtil.EMPTY_BYTE_BUFFER);
 +    }
 +
 +    private static File temp(String id)
 +    {
 +        try
 +        {
 +            File file = File.createTempFile(id, "tmp");
 +            file.deleteOnExit();
 +            return file;
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    public static void cleanup()
 +    {
 +        // clean up data directory which are stored as data directory/keyspace/data files
 +        for (String dirName : DatabaseDescriptor.getAllDataFileLocations())
 +        {
 +            File dir = new File(dirName);
 +            if (!dir.exists())
 +                continue;
 +            String[] children = dir.list();
 +            for (String child : children)
 +                FileUtils.deleteRecursive(new File(dir, child));
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message