cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Date Tue, 02 Dec 2014 09:51:29 GMT
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionManager.java
	test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06f626ac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06f626ac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06f626ac

Branch: refs/heads/trunk
Commit: 06f626acd27b051222616c0c91f7dd8d556b8d45
Parents: 25314c2 d15c918
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Tue Dec 2 10:49:59 2014 +0100
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Tue Dec 2 10:50:18 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 66 +++++++------
 .../db/compaction/AntiCompactionTest.java       | 99 ++++++++++++++------
 3 files changed, 108 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06f626ac/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 22cc598,7df396d..141c3a8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,7 +1,43 @@@
 +3.0
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063, 7813)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208)
 +
 +
  2.1.3
+  * Release sstable references after anticompaction (CASSANDRA-8386)
   * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 - * Fix high size calculations for prepared statements (CASSANDRA-8231)
   * Centralize shared executors (CASSANDRA-8055)
   * Fix filtering for CONTAINS (KEY) relations on frozen collection
     clustering columns when the query is restricted to a single

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06f626ac/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a9a4773,d85ffd7..ed875b8
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -407,40 -409,48 +409,48 @@@ public class CompactionManager implemen
          Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
          Set<SSTableReader> nonAnticompacting = new HashSet<>();
          Iterator<SSTableReader> sstableIterator = sstables.iterator();
-         while (sstableIterator.hasNext())
+         try
          {
-             SSTableReader sstable = sstableIterator.next();
-             for (Range<Token> r : Range.normalize(ranges))
+             while (sstableIterator.hasNext())
              {
-                 Range<Token> sstableRange = new Range<>(sstable.first.getToken(),
sstable.last.getToken());
-                 if (r.contains(sstableRange))
-                 {
-                     logger.info("SSTable {} fully contained in range {}, mutating repairedAt
instead of anticompacting", sstable, r);
-                     sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor,
repairedAt);
-                     sstable.reloadSSTableMetadata();
-                     mutatedRepairStatuses.add(sstable);
-                     sstableIterator.remove();
-                     break;
-                 }
-                 else if (!sstableRange.intersects(r))
+                 SSTableReader sstable = sstableIterator.next();
+                 for (Range<Token> r : Range.normalize(ranges))
                  {
-                     logger.info("SSTable {} ({}) does not intersect repaired range {}, not
touching repairedAt.", sstable, sstableRange, r);
-                     nonAnticompacting.add(sstable);
-                     sstableIterator.remove();
-                     break;
-                 }
-                 else
-                 {
-                     logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable,
sstableRange, r);
 -                    Range<Token> sstableRange = new Range<>(sstable.first.getToken(),
sstable.last.getToken(), sstable.partitioner);
++                    Range<Token> sstableRange = new Range<>(sstable.first.getToken(),
sstable.last.getToken());
+                     if (r.contains(sstableRange))
+                     {
+                         logger.info("SSTable {} fully contained in range {}, mutating repairedAt
instead of anticompacting", sstable, r);
+                         sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor,
repairedAt);
+                         sstable.reloadSSTableMetadata();
+                         mutatedRepairStatuses.add(sstable);
+                         sstableIterator.remove();
+                         break;
+                     }
+                     else if (!sstableRange.intersects(r))
+                     {
+                         logger.info("SSTable {} ({}) does not intersect repaired range {},
not touching repairedAt.", sstable, sstableRange, r);
+                         nonAnticompacting.add(sstable);
+                         sstableIterator.remove();
+                         break;
+                     }
+                     else
+                     {
+                         logger.info("SSTable {} ({}) will be anticompacted on range {}",
sstable, sstableRange, r);
+                     }
                  }
              }
+             cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+             cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+             SSTableReader.releaseReferences(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+             if (!sstables.isEmpty())
+                 doAntiCompaction(cfs, ranges, sstables, repairedAt);
          }
-         cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
-         cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
-         if (!sstables.isEmpty())
-             doAntiCompaction(cfs, ranges, sstables, repairedAt);
-         SSTableReader.releaseReferences(sstables);
-         cfs.getDataTracker().unmarkCompacting(sstables);
+         finally
+         {
+             SSTableReader.releaseReferences(sstables);
+             cfs.getDataTracker().unmarkCompacting(sstables);
+         }
+ 
          logger.info(String.format("Completed anticompaction successfully"));
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06f626ac/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 87e4315,090839e..2396acb
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -56,49 -50,17 +56,34 @@@ import org.apache.cassandra.utils.ByteB
  
  import com.google.common.collect.Iterables;
  
 -public class AntiCompactionTest extends SchemaLoader
 +public class AntiCompactionTest
  {
 -    private static final String KEYSPACE1 = "Keyspace1";
 +    private static final String KEYSPACE1 = "AntiCompactionTest";
      private static final String CF = "Standard1";
  
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                SimpleStrategy.class,
 +                KSMetaData.optsWithRF(1),
 +                SchemaLoader.standardCFMD(KEYSPACE1, CF));
 +    }
 +
 +    @After
 +    public void truncateCF()
 +    {
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 +        store.truncateBlocking();
 +    }
 +
      @Test
--    public void antiCompactOne() throws InterruptedException, ExecutionException, IOException
++    public void antiCompactOne() throws Exception
      {
-         Keyspace keyspace = Keyspace.open(KEYSPACE1);
-         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
-         store.disableAutoCompaction();
-         long timestamp = System.currentTimeMillis();
-         for (int i = 0; i < 10; i++)
-         {
-             DecoratedKey key = Util.dk(Integer.toString(i));
-             Mutation rm = new Mutation(KEYSPACE1, key.getKey());
-             for (int j = 0; j < 10; j++)
-                 rm.add(CF, Util.cellname(Integer.toString(j)),
-                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                        timestamp,
-                        0);
-             rm.applyUnsafe();
-         }
-         store.forceBlockingFlush();
+         ColumnFamilyStore store = prepareColumnFamilyStore();
          Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
          assertEquals(store.getSSTables().size(), sstables.size());
          Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()),
new BytesToken("4".getBytes()));
@@@ -113,19 -75,21 +98,21 @@@
          int nonRepairedKeys = 0;
          for (SSTableReader sstable : store.getSSTables())
          {
-             ICompactionScanner scanner = sstable.getScanner();
-             while (scanner.hasNext())
 -            try (SSTableScanner scanner = sstable.getScanner())
++            try (ICompactionScanner scanner = sstable.getScanner())
              {
-                 SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
-                 if (sstable.isRepaired())
-                 {
-                     assertTrue(range.contains(row.getKey().getToken()));
-                     repairedKeys++;
-                 }
-                 else
+                 while (scanner.hasNext())
                  {
-                     assertFalse(range.contains(row.getKey().getToken()));
-                     nonRepairedKeys++;
+                     SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+                     if (sstable.isRepaired())
+                     {
+                         assertTrue(range.contains(row.getKey().getToken()));
+                         repairedKeys++;
+                     }
+                     else
+                     {
+                         assertFalse(range.contains(row.getKey().getToken()));
+                         nonRepairedKeys++;
+                     }
                  }
              }
          }
@@@ -163,7 -131,12 +155,7 @@@
          File dir = cfs.directories.getDirectoryForNewSSTables();
          String filename = cfs.getTempSSTablePath(dir);
  
-         SSTableWriter writer = SSTableWriter.create(filename,0,0);
 -        SSTableWriter writer = new SSTableWriter(filename,
 -                0,
 -                0,
 -                cfs.metadata,
 -                StorageService.getPartitioner(),
 -                new MetadataCollector(cfs.metadata.comparator));
++        SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
  
          for (int i = 0; i < count * 5; i++)
              writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
@@@ -215,60 -153,60 +207,107 @@@
          List<Range<Token>> ranges = Arrays.asList(range);
  
          SSTableReader.acquireReferences(sstables);
 -        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
 -
 -        assertThat(store.getSSTables().size(), is(1));
 -        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
 -        assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
 -        assertThat(store.getDataTracker().getCompacting().size(), is(0));
 +        long repairedAt = 1000;
 +        CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
 +        /*
 +        Anticompaction will be anti-compacting 10 SSTables but will be doing this two at
a time
 +        so there will be no net change in the number of sstables
 +         */
 +        assertEquals(10, store.getSSTables().size());
 +        int repairedKeys = 0;
 +        int nonRepairedKeys = 0;
 +        for (SSTableReader sstable : store.getSSTables())
 +        {
 +            ICompactionScanner scanner = sstable.getScanner();
 +            while (scanner.hasNext())
 +            {
 +                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
 +                if (sstable.isRepaired())
 +                {
 +                    assertTrue(range.contains(row.getKey().getToken()));
 +                    assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
 +                    repairedKeys++;
 +                }
 +                else
 +                {
 +                    assertFalse(range.contains(row.getKey().getToken()));
 +                    assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
 +                    nonRepairedKeys++;
 +                }
 +            }
 +        }
 +        assertEquals(repairedKeys, 40);
 +        assertEquals(nonRepairedKeys, 60);
      }
 -
      @Test
+     public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException,
IOException
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
+         Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+         assertEquals(store.getSSTables().size(), sstables.size());
+         Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()),
new BytesToken("9999".getBytes()));
+         List<Range<Token>> ranges = Arrays.asList(range);
+ 
+         SSTableReader.acquireReferences(sstables);
+         CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
+ 
+         assertThat(store.getSSTables().size(), is(1));
+         assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
+         assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+         assertThat(store.getDataTracker().getCompacting().size(), is(0));
+     }
+ 
+ 
++    @Test
 +    public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException,
ExecutionException, IOException
 +    {
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 +        store.disableAutoCompaction();
 +
 +        for (int table = 0; table < 10; table++)
 +        {
 +            generateSStable(store,Integer.toString(table));
 +        }
 +        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
 +        assertEquals(store.getSSTables().size(), sstables.size());
 +        
 +        Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()),
new BytesToken("-1".getBytes()));
 +        List<Range<Token>> ranges = Arrays.asList(range);
 +
 +        SSTableReader.acquireReferences(sstables);
 +        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
 +
 +        assertThat(store.getSSTables().size(), is(10));
 +        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
 +    }
 +
+     private ColumnFamilyStore prepareColumnFamilyStore()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE1);
+         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 -        store.truncateBlocking();
+         store.disableAutoCompaction();
+         long timestamp = System.currentTimeMillis();
+         for (int i = 0; i < 10; i++)
+         {
+             DecoratedKey key = Util.dk(Integer.toString(i));
+             Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+             for (int j = 0; j < 10; j++)
+                 rm.add("Standard1", Util.cellname(Integer.toString(j)),
+                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        timestamp,
+                        0);
+             rm.apply();
+         }
+         store.forceBlockingFlush();
+         return store;
+     }
 -    
++
+     @After
 -    public void truncateCF()
++    public void truncateCfs()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE1);
+         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+         store.truncateBlocking();
+     }
  }


Mime
View raw message