cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [06/10] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2
Date Mon, 13 Jun 2016 13:28:57 GMT
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05f8a008
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05f8a008
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05f8a008

Branch: refs/heads/cassandra-2.2
Commit: 05f8a008f696d9624ec85176fa0e2a1ce06a1ad5
Parents: 593bbf5 72acbcd
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Mon Jun 13 14:34:01 2016 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Mon Jun 13 15:00:08 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  4 ++
 .../org/apache/cassandra/db/lifecycle/View.java |  5 ++
 .../cassandra/streaming/StreamSession.java      | 22 +++----
 .../io/sstable/SSTableRewriterTest.java         | 66 ++++++++++++++++++++
 5 files changed, 86 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d639d43,ebcc90c..491f72a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,31 -1,7 +1,32 @@@
 -2.1.15
 +2.2.7
 + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
 + * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
 + * Run CommitLog tests with different compression settings (CASSANDRA-9039)
 + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
 + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
 + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
 + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches and
 +   report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
+  * Create interval tree over canonical sstables to avoid missing sstables during streaming
(CASSANDRA-11886)
   * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections
(CASSANDRA-11749)
 - * Updated cqlsh Python driver to fix DESCRIBE problem for legacy tables (CASSANDRA-11055)
   * cqlsh: apply current keyspace to source command (CASSANDRA-11152)
   * Backport CASSANDRA-11578 (CASSANDRA-11750)
   * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e24917c,559ba0b..d3a5028
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -1588,8 -1529,12 +1588,12 @@@ public class DatabaseDescripto
  
      public static int getSSTablePreempiveOpenIntervalInMB()
      {
 -        return conf.sstable_preemptive_open_interval_in_mb;
 +        return FBUtilities.isWindows() ? -1 : conf.sstable_preemptive_open_interval_in_mb;
      }
+     public static void setSSTablePreempiveOpenIntervalInMB(int mb)
+     {
+         conf.sstable_preemptive_open_interval_in_mb = mb;
+     }
  
      public static boolean getTrickleFsync()
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index fba1627,0000000..e303801
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -1,281 -1,0 +1,286 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.util.*;
 +
 +import javax.annotation.Nullable;
 +
 +import com.google.common.base.Function;
 +import com.google.common.base.Functions;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.*;
 +
 +import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.utils.Interval;
 +
 +import static com.google.common.base.Predicates.equalTo;
 +import static com.google.common.base.Predicates.not;
 +import static com.google.common.collect.ImmutableList.copyOf;
 +import static com.google.common.collect.ImmutableList.of;
 +import static com.google.common.collect.Iterables.all;
 +import static com.google.common.collect.Iterables.concat;
 +import static com.google.common.collect.Iterables.filter;
 +import static java.util.Collections.singleton;
 +import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
 +import static org.apache.cassandra.db.lifecycle.Helpers.replace;
 +
 +/**
 + * An immutable structure holding the current memtable, the memtables pending
 + * flush, the sstables for a column family, and the sstables that are active
 + * in compaction (a subset of the sstables).
 + *
 + * Modifications to instances are all performed via a Function produced by the static methods
in this class.
 + * These are composed as necessary and provided to the Tracker.apply() methods, which atomically
reject or
 + * accept and apply the changes to the View.
 + *
 + */
 +public class View
 +{
 +    /**
 +     * ordinarily a list of size 1, but when preparing to flush will contain both the memtable
we will flush
 +     * and the new replacement memtable, until all outstanding write operations on the old
table complete.
 +     * The last item in the list is always the "current" memtable.
 +     */
 +    public final List<Memtable> liveMemtables;
 +    /**
 +     * contains all memtables that are no longer referenced for writing and are queued for
/ in the process of being
 +     * flushed. In chronologically ascending order.
 +     */
 +    public final List<Memtable> flushingMemtables;
 +    public final Set<SSTableReader> compacting;
 +    public final Set<SSTableReader> sstables;
 +    public final Set<SSTableReader> premature;
 +    // we use a Map here so that we can easily perform identity checks as well as equality
checks.
 +    // When marking compacting, we now  indicate if we expect the sstables to be present
(by default we do),
 +    // and we then check that not only are they all present in the live set, but that the
exact instance present is
 +    // the one we made our decision to compact against.
 +    public final Map<SSTableReader, SSTableReader> sstablesMap;
 +
 +    public final SSTableIntervalTree intervalTree;
 +
 +    View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader,
SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader>
premature, SSTableIntervalTree intervalTree)
 +    {
 +        assert liveMemtables != null;
 +        assert flushingMemtables != null;
 +        assert sstables != null;
 +        assert compacting != null;
 +        assert intervalTree != null;
 +
 +        this.liveMemtables = liveMemtables;
 +        this.flushingMemtables = flushingMemtables;
 +
 +        this.sstablesMap = sstables;
 +        this.sstables = sstablesMap.keySet();
 +        this.compacting = compacting;
 +        this.premature = premature;
 +        this.intervalTree = intervalTree;
 +    }
 +
 +    public Memtable getCurrentMemtable()
 +    {
 +        return liveMemtables.get(liveMemtables.size() - 1);
 +    }
 +
 +    /**
 +     * @return the active memtable and all the memtables that are pending flush.
 +     */
 +    public Iterable<Memtable> getAllMemtables()
 +    {
 +        return concat(flushingMemtables, liveMemtables);
 +    }
 +
 +    public Sets.SetView<SSTableReader> nonCompactingSStables()
 +    {
 +        return Sets.difference(sstables, compacting);
 +    }
 +
 +    public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
 +    {
 +        return filter(candidates, new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return !compacting.contains(sstable);
 +            }
 +        });
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size()
+ flushingMemtables.size() - 1, sstables, compacting);
 +    }
 +
 +    /**
 +      * Returns the sstables that have any partition between {@code left} and {@code right},
when both bounds are taken inclusively.
 +      * The interval formed by {@code left} and {@code right} shouldn't wrap.
 +      */
 +    public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition right)
 +    {
++        return sstablesInBounds(left, right, intervalTree);
++    }
++
++    public static List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition
right, SSTableIntervalTree intervalTree)
++    {
 +        assert !AbstractBounds.strictlyWrapsAround(left, right);
 +
 +        if (intervalTree.isEmpty())
 +            return Collections.emptyList();
 +
 +        RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
 +        return intervalTree.search(Interval.<RowPosition, SSTableReader>create(left,
stopInTree));
 +    }
 +
 +    // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:
 +
 +    // return a function to un/mark the provided readers compacting in a view
 +    static Function<View, View> updateCompacting(final Set<SSTableReader> unmark,
final Iterable<SSTableReader> mark)
 +    {
 +        if (unmark.isEmpty() && Iterables.isEmpty(mark))
 +            return Functions.identity();
 +        return new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                assert all(mark, Helpers.idIn(view.sstablesMap));
 +                return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
 +                                replace(view.compacting, unmark, mark),
 +                                view.premature, view.intervalTree);
 +            }
 +        };
 +    }
 +
 +    // construct a predicate to reject views that do not permit us to mark these readers
compacting;
 +    // i.e. one of them is either already compacting, has been compacted, or has been replaced
 +    static Predicate<View> permitCompacting(final Iterable<SSTableReader> readers)
 +    {
 +        return new Predicate<View>()
 +        {
 +            public boolean apply(View view)
 +            {
 +                for (SSTableReader reader : readers)
 +                    if (view.compacting.contains(reader) || view.sstablesMap.get(reader)
!= reader || reader.isMarkedCompacted() || view.premature.contains(reader))
 +                        return false;
 +                return true;
 +            }
 +        };
 +    }
 +
 +    // construct a function to change the liveset in a Snapshot
 +    static Function<View, View> updateLiveSet(final Set<SSTableReader> remove,
final Iterable<SSTableReader> add)
 +    {
 +        if (remove.isEmpty() && Iterables.isEmpty(add))
 +            return Functions.identity();
 +        return new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap,
remove, add);
 +                return new View(view.liveMemtables, view.flushingMemtables, sstableMap,
view.compacting, view.premature,
 +                                SSTableIntervalTree.build(sstableMap.keySet()));
 +            }
 +        };
 +    }
 +
 +    // called prior to initiating flush: add newMemtable to liveMemtables, making it the
latest memtable
 +    static Function<View, View> switchMemtable(final Memtable newMemtable)
 +    {
 +        return new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
 +                assert newLive.size() == view.liveMemtables.size() + 1;
 +                return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting,
view.premature, view.intervalTree);
 +            }
 +        };
 +    }
 +
 +    // called before flush: move toFlush from liveMemtables to flushingMemtables
 +    static Function<View, View> markFlushing(final Memtable toFlush)
 +    {
 +        return new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                List<Memtable> live = view.liveMemtables, flushing = view.flushingMemtables;
 +                List<Memtable> newLive = copyOf(filter(live, not(equalTo(toFlush))));
 +                List<Memtable> newFlushing = copyOf(concat(filter(flushing, lessThan(toFlush)),
 +                                                           of(toFlush),
 +                                                           filter(flushing, not(lessThan(toFlush)))));
 +                assert newLive.size() == live.size() - 1;
 +                assert newFlushing.size() == flushing.size() + 1;
 +                return new View(newLive, newFlushing, view.sstablesMap, view.compacting,
view.premature, view.intervalTree);
 +            }
 +        };
 +    }
 +
 +    // called after flush: removes memtable from flushingMemtables, and inserts flushed
into the live sstable set
 +    static Function<View, View> replaceFlushed(final Memtable memtable, final SSTableReader
flushed)
 +    {
 +        return new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables,
not(equalTo(memtable))));
 +                assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
 +
 +                if (flushed == null)
 +                    return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
 +                                    view.compacting, view.premature, view.intervalTree);
 +
 +                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap,
emptySet(), singleton(flushed));
 +                Set<SSTableReader> compacting = replace(view.compacting, emptySet(),
singleton(flushed));
 +                Set<SSTableReader> premature = replace(view.premature, emptySet(),
singleton(flushed));
 +                return new View(view.liveMemtables, flushingMemtables, sstableMap, compacting,
premature,
 +                                SSTableIntervalTree.build(sstableMap.keySet()));
 +            }
 +        };
 +    }
 +
 +    static Function<View, View> permitCompactionOfFlushed(final SSTableReader reader)
 +    {
 +        return new Function<View, View>()
 +        {
 +
 +            @Nullable
 +            public View apply(View view)
 +            {
 +                Set<SSTableReader> premature = ImmutableSet.copyOf(filter(view.premature,
not(equalTo(reader))));
 +                Set<SSTableReader> compacting = ImmutableSet.copyOf(filter(view.compacting,
not(equalTo(reader))));
 +                return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
compacting, premature, view.intervalTree);
 +            }
 +        };
 +    }
 +
 +
 +    private static <T extends Comparable<T>> Predicate<T> lessThan(final
T lessThan)
 +    {
 +        return new Predicate<T>()
 +        {
 +            public boolean apply(T t)
 +            {
 +                return t.compareTo(lessThan) < 0;
 +            }
 +        };
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index f14b94c,273631c..f4c900e
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -25,18 -25,22 +25,22 @@@ import java.util.*
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicBoolean;
  
 -import javax.annotation.Nullable;
 -
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Function;
  import com.google.common.collect.*;
 +
++import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DataTracker;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.RowPosition;
+ import org.apache.cassandra.dht.AbstractBounds;
+ import org.apache.cassandra.dht.Bounds;
 -import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.gms.*;
@@@ -279,7 -270,7 +283,7 @@@ public class StreamSession implements I
              flushSSTables(stores);
  
          List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-         List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges,
stores, repairedAt);
 -        List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges,
stores, repairedAt, repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE);
++        List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges,
stores, repairedAt, isIncremental);
          try
          {
              addTransferFiles(sections);
@@@ -314,33 -306,21 +319,26 @@@
          {
              for (ColumnFamilyStore cfStore : stores)
              {
 -                final List<AbstractBounds<RowPosition>> rowBoundsList = new
ArrayList<>(ranges.size());
 -                final IPartitioner partitioner = cfStore.partitioner;
 +                final List<Range<RowPosition>> keyRanges = new ArrayList<>(ranges.size());
                  for (Range<Token> range : ranges)
 -                    rowBoundsList.add(range.toRowBounds());
 -                refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View,
List<SSTableReader>>()
 +                    keyRanges.add(Range.makeRowRange(range));
 +                refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>()
                  {
 -                    public List<SSTableReader> apply(DataTracker.View view)
 +                    public List<SSTableReader> apply(View view)
                      {
-                         Map<SSTableReader, SSTableReader> permittedInstances = new
HashMap<>();
-                         for (SSTableReader reader : ColumnFamilyStore.CANONICAL_SSTABLES.apply(view))
-                             permittedInstances.put(reader, reader);
- 
 -                        DataTracker.SSTableIntervalTree intervalTree = DataTracker.buildIntervalTree(ColumnFamilyStore.CANONICAL_SSTABLES.apply(view));
++                        SSTableIntervalTree intervalTree = SSTableIntervalTree.build(ColumnFamilyStore.CANONICAL_SSTABLES.apply(view));
                          Set<SSTableReader> sstables = Sets.newHashSet();
 -                        for (AbstractBounds<RowPosition> rowBounds : rowBoundsList)
 +                        for (Range<RowPosition> keyRange : keyRanges)
                          {
 -                            for (SSTableReader sstable : DataTracker.View.sstablesInBounds(rowBounds,
intervalTree, partitioner))
 +                            // keyRange excludes its start, while sstableInBounds is inclusive
(of both start and end).
 +                            // This is fine however, because keyRange has been created from
a token range through Range.makeRowRange (see above).
 +                            // And that later method uses the Token.maxKeyBound() method
to creates the range, which return a "fake" key that
 +                            // sort after all keys having the token. That "fake" key cannot
however be equal to any real key, so that even
 +                            // including keyRange.left will still exclude any key having
the token of the original token range, and so we're
 +                            // still actually selecting what we wanted.
-                             for (SSTableReader sstable : view.sstablesInBounds(keyRange.left,
keyRange.right))
++                            for (SSTableReader sstable : View.sstablesInBounds(keyRange.left,
keyRange.right, intervalTree))
                              {
-                                 // sstableInBounds may contain early opened sstables
-                                 if (isIncremental && sstable.isRepaired())
-                                     continue;
-                                 sstable = permittedInstances.get(sstable);
-                                 if (sstable != null)
+                                 if (!isIncremental || !sstable.isRepaired())
                                      sstables.add(sstable);
                              }
                          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05f8a008/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index c2cc6e3,1fb28f5..f50953a
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -21,15 -21,11 +21,18 @@@ import java.io.File
  import java.io.IOException;
  import java.nio.ByteBuffer;
  import java.util.*;
+ import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
  
 +import com.google.common.collect.Iterables;
  import com.google.common.collect.Sets;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import com.google.common.util.concurrent.Uninterruptibles;
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
@@@ -41,25 -35,24 +44,29 @@@ import org.apache.cassandra.db.*
  import org.apache.cassandra.db.compaction.AbstractCompactedRow;
  import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
  import org.apache.cassandra.db.compaction.CompactionController;
 +import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.compaction.LazilyCompactedRow;
  import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.db.compaction.SSTableSplitter;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.metrics.StorageMetrics;
+ import org.apache.cassandra.notifications.INotification;
+ import org.apache.cassandra.notifications.INotificationConsumer;
+ import org.apache.cassandra.notifications.SSTableListChangedNotification;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.streaming.StreamSession;
  import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  
 -import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertFalse;
 -import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.*;
  
  public class SSTableRewriterTest extends SchemaLoader
  {
@@@ -807,10 -769,76 +814,69 @@@
                  }
              }
          }
 -        writer.abort();
 -        cfs.getDataTracker().unmarkCompacting(sstables);
 -        truncate(cfs);
 +        truncateCF();
 +        validateCFS(cfs);
      }
  
+     @Test
+     public void testSSTableSectionsForRanges() throws IOException, InterruptedException,
ExecutionException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         truncate(cfs);
+ 
+         cfs.addSSTable(writeFile(cfs, 1000));
+ 
+         Collection<SSTableReader> allSSTables = cfs.getSSTables();
+         assertEquals(1, allSSTables.size());
+         final Token firstToken = allSSTables.iterator().next().first.getToken();
+         DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
+ 
+         List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges(
+             Collections.singleton(new Range<Token>(firstToken, firstToken)),
+             Collections.singleton(cfs), 0L, false);
+         assertEquals(1, sectionsBeforeRewrite.size());
+         for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite)
+             section.ref.release();
+         final AtomicInteger checkCount = new AtomicInteger();
+         // needed since we get notified when compaction is done as well - we can't get sections
for ranges for obsoleted sstables
 -        INotificationConsumer consumer = new INotificationConsumer()
++        final AtomicBoolean done = new AtomicBoolean(false);
++        final AtomicBoolean failed = new AtomicBoolean(false);
++        Runnable r = new Runnable()
++        {
++            public void run()
++            {
++                while (!done.get())
+                 {
 -                    public void handleNotification(INotification notification, Object sender)
 -                    {
 -                        if (notification instanceof SSTableListChangedNotification)
 -                        {
 -                            Collection<SSTableReader> added = ((SSTableListChangedNotification)
notification).added;
 -                            Collection<SSTableReader> removed = ((SSTableListChangedNotification)
notification).removed;
 -                            // note that we need to check if added.equals(removed) because
once the compaction is done the old sstable will have
 -                            // selfRef().globalCount() == 0 and we cant get the SectionsForRanges
then. During incremental opening we always add and remove the same
 -                            // sstable (note that the sstables are x.equal(y) but not x
== y since the new one will be a new instance with a moved starting point
 -                            // In this case we must avoid trying to call getSSTableSectionsForRanges
since we are in the notification
 -                            // method and trying to reference an sstable with globalcount
== 0 puts it into a loop, and this blocks the tracker from removing the
 -                            // unreferenced sstable.
 -                            if (added.isEmpty() || !added.iterator().next().getColumnFamilyName().equals(cfs.getColumnFamilyName())
|| !added.equals(removed))
 -                                return;
 -
 -                            // at no point must the rewrite process hide
 -                            // sections returned by getSSTableSectionsForRanges
 -                            Set<Range<Token>> range = Collections.singleton(new
Range<Token>(firstToken, firstToken));
 -                            List<StreamSession.SSTableStreamingSections> sections
= StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, false);
 -                            assertEquals(1, sections.size());
 -                            for (StreamSession.SSTableStreamingSections section : sections)
 -                                section.ref.release();
 -                            checkCount.incrementAndGet();
 -                        }
 -                    }
 -                };
 -        cfs.getDataTracker().subscribe(consumer);
++                    Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken,
firstToken));
++                    List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range,
Collections.singleton(cfs), 0L, false);
++                    if (sections.size() != 1)
++                        failed.set(true);
++                    for (StreamSession.SSTableStreamingSections section : sections)
++                        section.ref.release();
++                    checkCount.incrementAndGet();
++                    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS);
++                }
++            }
++        };
++        Thread t = new Thread(r);
+         try
+         {
++            t.start();
+             cfs.forceMajorCompaction();
+             // reset
+         }
+         finally
+         {
+             DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50);
 -            cfs.getDataTracker().unsubscribe(consumer);
++            done.set(true);
++            t.join(20);
+         }
++        assertFalse(failed.get());
+         assertTrue(checkCount.get() >= 2);
+         truncate(cfs);
+     }
+ 
      /**
       * emulates anticompaction - writing from one source sstable to two new sstables
       *


Mime
View raw message