cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [08/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2
Date Tue, 24 May 2016 05:54:39 GMT
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d6ffa4b7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d6ffa4b7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d6ffa4b7

Branch: refs/heads/cassandra-3.0
Commit: d6ffa4b7de9ec8b66239b2c75ff860ad6e3aa77a
Parents: f463fed 675591d
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Tue May 24 07:34:56 2016 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Tue May 24 07:34:56 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        |  2 +-
 .../cassandra/service/ActiveRepairService.java  | 59 ++++++++-------
 .../service/ActiveRepairServiceTest.java        | 75 +++++++++++++++++++-
 4 files changed, 110 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6ffa4b7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 795e823,fcd7c3c..af97cd1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
 -2.1.15
 +2.2.7
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches and
 +   report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
+  * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739)
   * Add message dropped tasks to nodetool netstats (CASSANDRA-11855)
   * Don't compute expensive MaxPurgeableTimestamp until we've verified there's an 
     expired tombstone (CASSANDRA-11834)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6ffa4b7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6ffa4b7/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 21cdeae,5297ce3..1ea5aaf
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -373,26 -378,13 +373,26 @@@ public class ActiveRepairServic
      {
          assert parentRepairSession != null;
          ParentRepairSession prs = getParentRepairSession(parentRepairSession);
 +        //A repair will be marked as not global if it is a subrange repair to avoid many
small anti-compactions
 +        //in addition to other scenarios such as repairs not involving all DCs or hosts
 +        if (!prs.isGlobal)
 +        {
 +            logger.info("Not a global repair, will not do anticompaction");
 +            removeParentRepairSession(parentRepairSession);
 +            return Futures.immediateFuture(Collections.emptyList());
 +        }
 +        assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction
on unknown ranges";
  
          List<ListenableFuture<?>> futures = new ArrayList<>();
 -        for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
 +        // if we don't have successful repair ranges, then just skip anticompaction
 +        if (!successfulRanges.isEmpty())
          {
 -            Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey());
 -            ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
 -            futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges,
sstables, prs.repairedAt));
 +            for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
 +            {
-                 Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey());
++                Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey());
 +                ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
 +                futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges,
sstables, prs.repairedAt));
 +            }
          }
  
          ListenableFuture<List<Object>> allAntiCompactionResults = Futures.successfulAsList(futures);
@@@ -432,14 -424,12 +432,14 @@@
  
      public static class ParentRepairSession
      {
 -        public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
 -        public final Collection<Range<Token>> ranges;
 +        private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
 +        private final Collection<Range<Token>> ranges;
-         private final Map<UUID, Set<SSTableReader>> sstableMap = new HashMap<>();
+         public final Map<UUID, Set<String>> sstableMap = new HashMap<>();
 -        public final long repairedAt;
 +        private final long repairedAt;
 +        public final boolean isIncremental;
 +        public final boolean isGlobal;
  
 -        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>>
ranges, long repairedAt)
 +        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>>
ranges, boolean isIncremental, boolean isGlobal, long repairedAt)
          {
              for (ColumnFamilyStore cfs : columnFamilyStores)
              {
@@@ -448,45 -438,48 +448,56 @@@
              }
              this.ranges = ranges;
              this.repairedAt = repairedAt;
 +            this.isGlobal = isGlobal;
 +            this.isIncremental = isIncremental;
          }
  
-         public void addSSTables(UUID cfId, Set<SSTableReader> sstables)
-         {
-             sstableMap.get(cfId).addAll(sstables);
-         }
- 
          @SuppressWarnings("resource")
-         public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId)
+         public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID
cfId)
          {
-             Set<SSTableReader> sstables = sstableMap.get(cfId);
-             Iterator<SSTableReader> sstableIterator = sstables.iterator();
              ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references
= ImmutableMap.builder();
-             while (sstableIterator.hasNext())
+             for (SSTableReader sstable : getActiveSSTables(cfId))
              {
-                 SSTableReader sstable = sstableIterator.next();
-                 if (!new File(sstable.descriptor.filenameFor(Component.DATA)).exists())
-                 {
-                     sstableIterator.remove();
-                 }
+                 Ref<SSTableReader> ref = sstable.tryRef();
+                 if (ref == null)
+                     sstableMap.get(cfId).remove(sstable.getFilename());
                  else
+                     references.put(sstable, ref);
+             }
+             return new Refs<>(references.build());
+         }
+ 
+         private Set<SSTableReader> getActiveSSTables(UUID cfId)
+         {
+             Set<String> repairedSSTables = sstableMap.get(cfId);
+             Set<SSTableReader> activeSSTables = new HashSet<>();
+             Set<String> activeSSTableNames = new HashSet<>();
+             for (SSTableReader sstable : columnFamilyStores.get(cfId).getSSTables())
+             {
+                 if (repairedSSTables.contains(sstable.getFilename()))
                  {
-                     Ref<SSTableReader> ref = sstable.tryRef();
-                     if (ref == null)
-                         sstableIterator.remove();
-                     else
-                         references.put(sstable, ref);
+                     activeSSTables.add(sstable);
+                     activeSSTableNames.add(sstable.getFilename());
                  }
              }
-             return new Refs<>(references.build());
+             sstableMap.put(cfId, activeSSTableNames);
+             return activeSSTables;
          }
+ 
+         public void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
+         {
+             for (SSTableReader sstable : sstables)
+             {
+                 sstableMap.get(cfId).add(sstable.getFilename());
+             }
+         }
+ 
 +        public long getRepairedAt()
 +        {
 +            if (isGlobal)
 +                return repairedAt;
 +            return ActiveRepairService.UNREPAIRED_SSTABLE;
 +        }
          @Override
          public String toString()
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6ffa4b7/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index dab45f9,419ea1a..b4066d7
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -1,218 -1,107 +1,289 @@@
  /*
 - * Licensed to the Apache Software Foundation (ASF) under one
 - * or more contributor license agreements.  See the NOTICE file
 - * distributed with this work for additional information
 - * regarding copyright ownership.  The ASF licenses this file
 - * to you under the Apache License, Version 2.0 (the
 - * "License"); you may not use this file except in compliance
 - * with the License.  You may obtain a copy of the License at
 - *
 - *     http://www.apache.org/licenses/LICENSE-2.0
 - *
 - * Unless required by applicable law or agreed to in writing, software
 - * distributed under the License is distributed on an "AS IS" BASIS,
 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 - * See the License for the specific language governing permissions and
 - * limitations under the License.
 - */
 -
 +* Licensed to the Apache Software Foundation (ASF) under one
 +* or more contributor license agreements.  See the NOTICE file
 +* distributed with this work for additional information
 +* regarding copyright ownership.  The ASF licenses this file
 +* to you under the Apache License, Version 2.0 (the
 +* "License"); you may not use this file except in compliance
 +* with the License.  You may obtain a copy of the License at
 +*
 +*    http://www.apache.org/licenses/LICENSE-2.0
 +*
 +* Unless required by applicable law or agreed to in writing,
 +* software distributed under the License is distributed on an
 +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +* KIND, either express or implied.  See the License for the
 +* specific language governing permissions and limitations
 +* under the License.
 +*/
  package org.apache.cassandra.service;
  
 +import java.net.InetAddress;
 +import java.util.*;
  
 -import java.util.Collections;
 -import java.util.HashSet;
 -import java.util.Iterator;
 -import java.util.Set;
 -import java.util.UUID;
 -
++import com.google.common.base.Predicate;
  import com.google.common.collect.Sets;
 -
 +import org.junit.Before;
 +import org.junit.BeforeClass;
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
 -import org.apache.cassandra.io.sstable.SSTableReader;
++import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.locator.AbstractReplicationStrategy;
 +import org.apache.cassandra.locator.SimpleStrategy;
 +import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.Refs;
  
  import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
  
 -public class ActiveRepairServiceTest extends SchemaLoader
 +public class ActiveRepairServiceTest
  {
 +    public static final String KEYSPACE5 = "Keyspace5";
-     public static final String CF_STANDRAD1 = "Standard1";
++    public static final String CF_STANDARD1 = "Standard1";
 +    public static final String CF_COUNTER = "Counter1";
  
 -    private static final String KEYSPACE1 = "Keyspace1";
 -    private static final String CF = "Standard1";
 +    public String cfname;
 +    public ColumnFamilyStore store;
 +    public InetAddress LOCAL, REMOTE;
 +
 +    private boolean initialized;
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE5,
 +                                    SimpleStrategy.class,
 +                                    KSMetaData.optsWithRF(2),
 +                                    SchemaLoader.standardCFMD(KEYSPACE5, CF_COUNTER),
-                                     SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDRAD1));
++                                    SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDARD1));
 +    }
 +
 +    @Before
 +    public void prepare() throws Exception
 +    {
 +        if (!initialized)
 +        {
 +            SchemaLoader.startGossiper();
 +            initialized = true;
 +
 +            LOCAL = FBUtilities.getBroadcastAddress();
 +            // generate a fake endpoint for which we can spoof receiving/sending trees
 +            REMOTE = InetAddress.getByName("127.0.0.2");
 +        }
 +
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +        tmd.clearUnsafe();
 +        StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken()));
 +        tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE);
 +        assert tmd.isMember(REMOTE);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsPlusOne() throws Throwable
 +    {
 +        // generate rf+1 nodes, and ensure that all nodes are returned
 +        Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsTimesTwo() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are
returned
 +        addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
 +        Set<InetAddress> expected = new HashSet<>();
 +        for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
 +        {
 +            expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
 +        }
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsPlusOneInLocalDC() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf+1 nodes, and ensure that all nodes are returned
 +        Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        // remove remote endpoints
 +        TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
 +        HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
 +        expected = Sets.intersection(expected, localEndpoints);
 +
 +        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()),
null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsTimesTwoInLocalDC() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are
returned
 +        addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
 +        Set<InetAddress> expected = new HashSet<>();
 +        for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
 +        {
 +            expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
 +        }
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        // remove remote endpoints
 +        TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology();
 +        HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
 +        expected = Sets.intersection(expected, localEndpoints);
 +
 +        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
 +            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()),
null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf*2 nodes, and ensure that only neighbors specified by the hosts are
returned
 +        addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy();
 +        List<InetAddress> expected = new ArrayList<>();
 +        for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
 +        {
 +            expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
 +        }
 +
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
 +
 +       assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5,
 +                                                                      StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(),
 +                                                                      null, hosts).iterator().next());
 +    }
 +
 +    @Test(expected = IllegalArgumentException.class)
 +    public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable
 +    {
 +        addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        //Dont give local endpoint
 +        Collection<String> hosts = Arrays.asList("127.0.0.3");
 +        ActiveRepairService.getNeighbors(KEYSPACE5, StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(),
null, hosts);
 +    }
 +
 +    Set<InetAddress> addTokens(int max) throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +        Set<InetAddress> endpoints = new HashSet<>();
 +        for (int i = 1; i <= max; i++)
 +        {
 +            InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
 +            tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), endpoint);
 +            endpoints.add(endpoint);
 +        }
 +        return endpoints;
 +    }
+ 
+     @Test
+     public void testGetActiveRepairedSSTableRefs()
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
+         Set<SSTableReader> original = store.getUnrepairedSSTables();
+ 
+         UUID prsId = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store),
null);
++        ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store),
null, true, false);
+         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
+ 
+         //add all sstables to parent repair session
+         prs.addSSTables(store.metadata.cfId, original);
+ 
+         //retrieve all sstable references from parent repair sessions
+         Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+         Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+         assertEquals(original, retrieved);
+         refs.release();
+ 
+         //remove 1 sstable from data data tracker
+         Set<SSTableReader> newLiveSet = new HashSet<>(original);
+         Iterator<SSTableReader> it = newLiveSet.iterator();
 -        SSTableReader removed = it.next();
++        final SSTableReader removed = it.next();
+         it.remove();
 -        store.getDataTracker().replaceWithNewInstances(Collections.singleton(removed), Collections.EMPTY_SET);
++        store.getTracker().dropSSTables(new Predicate<SSTableReader>()
++        {
++            public boolean apply(SSTableReader reader)
++            {
++                return removed.equals(reader);
++            }
++        }, OperationType.COMPACTION, null);
+ 
+         //retrieve sstable references from parent repair session again - removed sstable
must not be present
+         refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+         retrieved = Sets.newHashSet(refs.iterator());
+         assertEquals(newLiveSet, retrieved);
+         assertFalse(retrieved.contains(removed));
+         refs.release();
+     }
+ 
+     private ColumnFamilyStore prepareColumnFamilyStore()
+     {
 -        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 -        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 -        store.truncateBlocking();
++        Keyspace keyspace = Keyspace.open(KEYSPACE5);
++        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_STANDARD1);
+         store.disableAutoCompaction();
+         long timestamp = System.currentTimeMillis();
+         //create 10 sstables
+         for (int i = 0; i < 10; i++)
+         {
+             DecoratedKey key = Util.dk(Integer.toString(i));
 -            Mutation rm = new Mutation(KEYSPACE1, key.getKey());
++            Mutation rm = new Mutation(KEYSPACE5, key.getKey());
+             for (int j = 0; j < 10; j++)
+                 rm.add("Standard1", Util.cellname(Integer.toString(j)),
+                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        timestamp,
+                        0);
+             rm.apply();
+             store.forceBlockingFlush();
+         }
+         return store;
+     }
  }


Mime
View raw message