cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [4/6] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2
Date Mon, 06 Jul 2015 11:19:43 GMT
Merge branch 'cassandra-2.1' into cassandra-2.2

Conflicts:
	src/java/org/apache/cassandra/db/DataTracker.java
	src/java/org/apache/cassandra/db/compaction/CompactionTask.java
	src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
	src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
	test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f430c53
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f430c53
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f430c53

Branch: refs/heads/trunk
Commit: 1f430c5392ec2bee5d13fb56f6de8260f35e72cc
Parents: b7ae07e ec320e8
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Mon Jul 6 12:14:07 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Mon Jul 6 12:14:07 2015 +0100

----------------------------------------------------------------------
 .../cassandra/db/compaction/CompactionTask.java |  5 +----
 .../apache/cassandra/db/lifecycle/Helpers.java  | 10 ++++++++++
 .../db/lifecycle/LifecycleTransaction.java      |  8 +++++++-
 .../apache/cassandra/db/lifecycle/Tracker.java  | 16 ++++++++++++----
 .../cassandra/io/sstable/SSTableRewriter.java   |  9 +++++++--
 .../io/sstable/SSTableRewriterTest.java         | 20 ++++++++------------
 6 files changed, 45 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f430c53/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 6aa59e4,c6e3d2f..ea64fb2
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -220,17 -255,26 +220,14 @@@ public class CompactionTask extends Abs
  
              double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
              long totalSourceRows = 0;
 -            long[] counts = ci.getMergedRowCounts();
 -            StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
 -            Map<Integer, Long> mergedRows = new HashMap<>();
 -            for (int i = 0; i < counts.length; i++)
 -            {
 -                long count = counts[i];
 -                if (count == 0)
 -                    continue;
 -
 -                int rows = i + 1;
 -                totalSourceRows += rows * count;
 -                mergeSummary.append(String.format("%d:%d, ", rows, count));
 -                mergedRows.put(rows, count);
 -            }
 -
 -            SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
 -            logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
 -                                      oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
 +            String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
 +            logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
 +                                      taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
              logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
 -            logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
 +            logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
 +
 +            if (offline)
-             {
-                 for (SSTableReader reader : newSStables)
-                     reader.selfRef().release();
-             }
++                Refs.release(Refs.selfRefs(newSStables));
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f430c53/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/Helpers.java
index e14c339,0000000..536e13c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@@ -1,222 -1,0 +1,232 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.util.*;
 +
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.*;
 +
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +
 +import static com.google.common.base.Predicates.*;
 +import static com.google.common.collect.Iterables.any;
 +import static com.google.common.collect.Iterables.concat;
 +import static com.google.common.collect.Iterables.filter;
 +import static com.google.common.collect.Iterables.getFirst;
 +import static org.apache.cassandra.utils.Throwables.merge;
 +
 +class Helpers
 +{
 +    /**
 +     * update the contents of a set with the provided sets, ensuring that the items to remove are
 +     * really present, and that the items to add are not (unless we're also removing them)
 +     * @return a new set with the contents of the provided one modified
 +     */
 +    static <T> Set<T> replace(Set<T> original, Set<T> remove, Iterable<T> add)
 +    {
 +        return ImmutableSet.copyOf(replace(identityMap(original), remove, add).keySet());
 +    }
 +
 +    /**
 +     * update the contents of an "identity map" with the provided sets, ensuring that the items to remove are
 +     * really present, and that the items to add are not (unless we're also removing them)
 +     * @return a new identity map with the contents of the provided one modified
 +     */
 +    static <T> Map<T, T> replace(Map<T, T> original, Set<T> remove, Iterable<T> add)
 +    {
 +        // ensure the ones being removed are the exact same ones present
 +        for (T reader : remove)
 +            assert original.get(reader) == reader;
 +
 +        // ensure we don't already contain any we're adding, that we aren't also removing
 +        assert !any(add, and(not(in(remove)), in(original.keySet()))) : String.format("original:%s remove:%s add:%s", original.keySet(), remove, add);
 +
 +        Map<T, T> result =
 +            identityMap(concat(add, filter(original.keySet(), not(in(remove)))));
 +
 +        assert result.size() == original.size() - remove.size() + Iterables.size(add) :
 +        String.format("Expecting new size of %d, got %d while replacing %s by %s in %s",
 +                      original.size() - remove.size() + Iterables.size(add), result.size(), remove, add, original.keySet());
 +        return result;
 +    }
 +
 +    /**
 +     * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
 +     * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
 +     */
 +    static Throwable setReplaced(Iterable<SSTableReader> readers, Throwable accumulate)
 +    {
 +        for (SSTableReader reader : readers)
 +        {
 +            try
 +            {
 +                reader.setReplaced();
 +            }
 +            catch (Throwable t)
 +            {
 +                accumulate = merge(accumulate, t);
 +            }
 +        }
 +        return accumulate;
 +    }
 +
 +    /**
++     * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
++     * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
++     */
++    static void setupKeycache(Iterable<SSTableReader> readers)
++    {
++        for (SSTableReader reader : readers)
++            reader.setupKeyCache();
++    }
++
++    /**
 +     * assert that none of these readers have been replaced
 +     */
 +    static void checkNotReplaced(Iterable<SSTableReader> readers)
 +    {
 +        for (SSTableReader reader : readers)
 +            assert !reader.isReplaced();
 +    }
 +
 +    /**
 +     * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
 +     * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
 +     */
 +    static Throwable markObsolete(Tracker tracker, Iterable<SSTableReader> readers, Throwable accumulate)
 +    {
 +        for (SSTableReader reader : readers)
 +        {
 +            try
 +            {
 +                boolean firstToCompact = reader.markObsolete(tracker);
 +                assert firstToCompact : reader + " was already marked compacted";
 +            }
 +            catch (Throwable t)
 +            {
 +                accumulate = merge(accumulate, t);
 +            }
 +        }
 +        return accumulate;
 +    }
 +
 +    /**
 +     * @return the identity function, as a Map, with domain of the provided values
 +     */
 +    static <T> Map<T, T> identityMap(Iterable<T> values)
 +    {
 +        ImmutableMap.Builder<T, T> builder = ImmutableMap.<T, T>builder();
 +        for (T t : values)
 +            builder.put(t, t);
 +        return builder.build();
 +    }
 +
 +    /**
 +     * @return an Iterable of the union if the sets, with duplicates being represented by their first encountered instance
 +     * (as defined by the order of set provision)
 +     */
 +    static <T> Iterable<T> concatUniq(Set<T>... sets)
 +    {
 +        List<Predicate<T>> notIn = new ArrayList<>(sets.length);
 +        for (Set<T> set : sets)
 +            notIn.add(not(in(set)));
 +        List<Iterable<T>> results = new ArrayList<>(sets.length);
 +        for (int i = 0 ; i < sets.length ; i++)
 +            results.add(filter(sets[i], and(notIn.subList(0, i))));
 +        return concat(results);
 +    }
 +
 +    /**
 +     * @return a Predicate yielding true for an item present in NONE of the provided sets
 +     */
 +    static <T> Predicate<T> notIn(Set<T>... sets)
 +    {
 +        return not(orIn(sets));
 +    }
 +
 +    /**
 +     * @return a Predicate yielding true for an item present in ANY of the provided sets
 +     */
 +    static <T> Predicate<T> orIn(Collection<T>... sets)
 +    {
 +        Predicate<T>[] orIn = new Predicate[sets.length];
 +        for (int i = 0 ; i < orIn.length ; i++)
 +            orIn[i] = in(sets[i]);
 +        return or(orIn);
 +    }
 +
 +    /**
 +     * filter out (i.e. remove) matching elements
 +     * @return filter, filtered to only those elements that *are not* present in *any* of the provided sets (are present in none)
 +     */
 +    static <T> Iterable<T> filterOut(Iterable<T> filter, Set<T>... inNone)
 +    {
 +        return filter(filter, notIn(inNone));
 +    }
 +
 +    /**
 +     * filter in (i.e. retain)
 +     *
 +     * @return filter, filtered to only those elements that *are* present in *any* of the provided sets
 +     */
 +    static <T> Iterable<T> filterIn(Iterable<T> filter, Set<T>... inAny)
 +    {
 +        return filter(filter, orIn(inAny));
 +    }
 +
 +    static Set<SSTableReader> emptySet()
 +    {
 +        return Collections.emptySet();
 +    }
 +
 +    static <T> T select(T t, Collection<T> col)
 +    {
 +        if (col instanceof Set && !col.contains(t))
 +            return null;
 +        return getFirst(filter(col, equalTo(t)), null);
 +    }
 +
 +    static <T> T selectFirst(T t, Collection<T> ... sets)
 +    {
 +        for (Collection<T> set : sets)
 +        {
 +            T select = select(t, set);
 +            if (select != null)
 +                return select;
 +        }
 +        return null;
 +    }
 +
 +    static <T> Predicate<T> idIn(Set<T> set)
 +    {
 +        return idIn(identityMap(set));
 +    }
 +
 +    static <T> Predicate<T> idIn(final Map<T, T> identityMap)
 +    {
 +        return new Predicate<T>()
 +        {
 +            public boolean apply(T t)
 +            {
 +                return identityMap.get(t) == t;
 +            }
 +        };
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f430c53/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index f3091d5,0000000..e14e2a1
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@@ -1,512 -1,0 +1,518 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.util.*;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Function;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.*;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier;
 +import org.apache.cassandra.utils.concurrent.Transactional;
 +
 +import static com.google.common.base.Functions.compose;
 +import static com.google.common.base.Predicates.*;
 +import static com.google.common.collect.ImmutableSet.copyOf;
 +import static com.google.common.collect.Iterables.*;
 +import static java.util.Collections.singleton;
 +import static org.apache.cassandra.db.lifecycle.Helpers.*;
 +import static org.apache.cassandra.db.lifecycle.View.updateCompacting;
 +import static org.apache.cassandra.db.lifecycle.View.updateLiveSet;
 +import static org.apache.cassandra.utils.Throwables.maybeFail;
 +import static org.apache.cassandra.utils.concurrent.Refs.release;
 +import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
 +
 +public class LifecycleTransaction extends Transactional.AbstractTransactional
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(LifecycleTransaction.class);
 +
 +    /**
 +     * a class that represents accumulated modifications to the Tracker.
 +     * has two instances, one containing modifications that are "staged" (i.e. invisible)
 +     * and one containing those "logged" that have been made visible through a call to checkpoint()
 +     */
 +    private static class State
 +    {
 +        // readers that are either brand new, update a previous new reader, or update one of the original readers
 +        final Set<SSTableReader> update = new HashSet<>();
 +        // disjoint from update, represents a subset of originals that is no longer needed
 +        final Set<SSTableReader> obsolete = new HashSet<>();
 +
 +        void log(State staged)
 +        {
 +            update.removeAll(staged.obsolete);
 +            update.removeAll(staged.update);
 +            update.addAll(staged.update);
 +            obsolete.addAll(staged.obsolete);
 +        }
 +
 +        boolean contains(SSTableReader reader)
 +        {
 +            return update.contains(reader) || obsolete.contains(reader);
 +        }
 +
 +        boolean isEmpty()
 +        {
 +            return update.isEmpty() && obsolete.isEmpty();
 +        }
 +
 +        void clear()
 +        {
 +            update.clear();
 +            obsolete.clear();
 +        }
 +    }
 +
 +    public final Tracker tracker;
 +    private final OperationType operationType;
 +    // the original readers this transaction was opened over, and that it guards
 +    // (no other transactions may operate over these readers concurrently)
 +    private final Set<SSTableReader> originals = new HashSet<>();
 +    // the set of readers we've marked as compacting (only updated on creation and in checkpoint())
 +    private final Set<SSTableReader> marked = new HashSet<>();
 +    // the identity set of readers we've ever encountered; used to ensure we don't accidentally revisit the
 +    // same version of a reader. potentially a dangerous property if there are reference counting bugs
 +    // as they won't be caught until the transaction's lifespan is over.
 +    private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<UniqueIdentifier, Boolean>());
 +
 +    // changes that have been made visible
 +    private final State logged = new State();
 +    // changes that are pending
 +    private final State staged = new State();
 +
 +    /**
 +     * construct a Transaction for use in an offline operation
 +     */
 +    public static LifecycleTransaction offline(OperationType operationType, SSTableReader reader)
 +    {
 +        return offline(operationType, singleton(reader));
 +    }
 +
 +    /**
 +     * construct a Transaction for use in an offline operation
 +     */
 +    public static LifecycleTransaction offline(OperationType operationType, Iterable<SSTableReader> readers)
 +    {
 +        // if offline, for simplicity we just use a dummy tracker
 +        Tracker dummy = new Tracker(null, false);
 +        dummy.addInitialSSTables(readers);
 +        dummy.apply(updateCompacting(emptySet(), readers));
 +        return new LifecycleTransaction(dummy, operationType, readers);
 +    }
 +
 +    LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers)
 +    {
 +        this.tracker = tracker;
 +        this.operationType = operationType;
 +        for (SSTableReader reader : readers)
 +        {
 +            originals.add(reader);
 +            marked.add(reader);
 +            identities.add(reader.instanceId);
 +        }
 +    }
 +
 +    public void doPrepare()
 +    {
 +        // note for future: in anticompaction two different operations use the same Transaction, and both prepareToCommit()
 +        // separately: the second prepareToCommit is ignored as a "redundant" transition. since it is only a checkpoint
 +        // (and these happen anyway) this is fine but if more logic gets inserted here than is performed in a checkpoint,
 +        // it may break this use case, and care is needed
 +        checkpoint();
 +    }
 +
 +    /**
 +     * point of no return: commit all changes, but leave all readers marked as compacting
 +     */
 +    public Throwable doCommit(Throwable accumulate)
 +    {
 +        assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit";
 +
 +        logger.debug("Committing update:{}, obsolete:{}", staged.update, staged.obsolete);
 +
 +        // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done
 +        // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size
 +        // and notification status for the obsolete and new files
 +        accumulate = markObsolete(tracker, logged.obsolete, accumulate);
 +        accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
 +        accumulate = release(selfRefs(logged.obsolete), accumulate);
 +        accumulate = tracker.notifySSTablesChanged(originals, logged.update, operationType, accumulate);
 +        return accumulate;
 +    }
 +
 +    /**
 +     * undo all of the changes made by this transaction, resetting the state to its original form
 +     */
 +    public Throwable doAbort(Throwable accumulate)
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete);
 +
 +        if (logged.isEmpty() && staged.isEmpty())
 +            return accumulate;
 +
 +        // mark obsolete all readers that are not versions of those present in the original set
 +        Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals);
 +        logger.debug("Obsoleting {}", obsolete);
 +        // we don't pass the tracker in for the obsoletion, since these readers have never been notified externally
 +        // nor had their size accounting affected
 +        accumulate = markObsolete(null, obsolete, accumulate);
 +
 +        // replace all updated readers with a version restored to its original state
 +        accumulate = tracker.apply(updateLiveSet(logged.update, restoreUpdatedOriginals()), accumulate);
 +        // setReplaced immediately preceding versions that have not been obsoleted
 +        accumulate = setReplaced(logged.update, accumulate);
 +        // we have replaced all of logged.update and never made visible staged.update,
 +        // and the files we have logged as obsolete we clone fresh versions of, so they are no longer needed either
 +        // any _staged_ obsoletes should either be in staged.update already, and dealt with there,
 +        // or is still in its original form (so left as is); in either case no extra action is needed
 +        accumulate = release(selfRefs(concat(staged.update, logged.update, logged.obsolete)), accumulate);
 +        logged.clear();
 +        staged.clear();
 +        return accumulate;
 +    }
 +
 +    @Override
 +    protected Throwable doPostCleanup(Throwable accumulate)
 +    {
 +        return unmarkCompacting(marked, accumulate);
 +    }
 +
++    public boolean isOffline()
++    {
++        return tracker.isDummy();
++    }
++
 +    public void permitRedundantTransitions()
 +    {
 +        super.permitRedundantTransitions();
 +    }
 +
 +    /**
 +     * call when a consistent batch of changes is ready to be made atomically visible
 +     * these will be exposed in the Tracker atomically, or an exception will be thrown; in this case
 +     * the transaction should be rolled back
 +     */
 +    public void checkpoint()
 +    {
 +        maybeFail(checkpoint(null));
 +    }
 +    private Throwable checkpoint(Throwable accumulate)
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Checkpointing update:{}, obsolete:{}", staged.update, staged.obsolete);
 +
 +        if (staged.isEmpty())
 +            return accumulate;
 +
 +        Set<SSTableReader> toUpdate = toUpdate();
 +        Set<SSTableReader> fresh = copyOf(fresh());
 +
 +        // check the current versions of the readers we're replacing haven't somehow been replaced by someone else
 +        checkNotReplaced(filterIn(toUpdate, staged.update));
 +
 +        // ensure any new readers are in the compacting set, since we aren't done with them yet
 +        // and don't want anyone else messing with them
 +        // apply atomically along with updating the live set of readers
 +        tracker.apply(compose(updateCompacting(emptySet(), fresh),
 +                              updateLiveSet(toUpdate, staged.update)));
 +
 +        // log the staged changes and our newly marked readers
 +        marked.addAll(fresh);
 +        logged.log(staged);
 +
 +        // setup our tracker, and mark our prior versions replaced, also releasing our references to them
 +        // we do not replace/release obsoleted readers, since we may need to restore them on rollback
 +        accumulate = setReplaced(filterOut(toUpdate, staged.obsolete), accumulate);
 +        accumulate = release(selfRefs(filterOut(toUpdate, staged.obsolete)), accumulate);
 +
 +        staged.clear();
 +        return accumulate;
 +    }
 +
 +    /**
 +     * update a reader: if !original, this is a reader that is being introduced by this transaction;
 +     * otherwise it must be in the originals() set, i.e. a reader guarded by this transaction
 +     */
 +    public void update(SSTableReader reader, boolean original)
 +    {
 +        assert !staged.update.contains(reader) : "each reader may only be updated once per checkpoint: " + reader;
 +        assert !identities.contains(reader.instanceId) : "each reader instance may only be provided as an update once: " + reader;
 +        // check it isn't obsolete, and that it matches the original flag
 +        assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not update a reader that has been obsoleted";
 +        assert original == originals.contains(reader) : String.format("the 'original' indicator was incorrect (%s provided): %s", original, reader);
 +        staged.update.add(reader);
 +        identities.add(reader.instanceId);
-         reader.setupKeyCache();
++        if (!isOffline())
++            reader.setupKeyCache();
 +    }
 +
 +    /**
 +     * mark this reader as for obsoletion. this does not actually obsolete the reader until commit() is called,
 +     * but on checkpoint() the reader will be removed from the live set
 +     */
 +    public void obsolete(SSTableReader reader)
 +    {
 +        logger.debug("Staging for obsolescence {}", reader);
 +        // check this is: a reader guarded by the transaction, an instance we have already worked with
 +        // and that we haven't already obsoleted it, nor do we have other changes staged for it
 +        assert identities.contains(reader.instanceId) : "only reader instances that have previously been provided may be obsoleted: " + reader;
 +        assert originals.contains(reader) : "only readers in the 'original' set may be obsoleted: " + reader + " vs " + originals;
 +        assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not obsolete a reader that has already been obsoleted: " + reader;
 +        assert !staged.update.contains(reader) : "may not obsolete a reader that has a staged update (must checkpoint first): " + reader;
 +        assert current(reader) == reader : "may only obsolete the latest version of the reader: " + reader;
 +        staged.obsolete.add(reader);
 +    }
 +
 +    /**
 +     * obsolete every file in the original transaction
 +     */
 +    public void obsoleteOriginals()
 +    {
 +        logger.debug("Staging for obsolescence {}", originals);
 +        // if we're obsoleting, we should have no staged updates for the original files
 +        assert Iterables.isEmpty(filterIn(staged.update, originals)) : staged.update;
 +
 +        // stage obsoletes for any currently visible versions of any original readers
 +        Iterables.addAll(staged.obsolete, filterIn(current(), originals));
 +    }
 +
 +    /**
 +     * return the readers we're replacing in checkpoint(), i.e. the currently visible version of those in staged
 +     */
 +    private Set<SSTableReader> toUpdate()
 +    {
 +        return copyOf(filterIn(current(), staged.obsolete, staged.update));
 +    }
 +
 +    /**
 +     * new readers that haven't appeared previously (either in the original set or the logged updates)
 +     */
 +    private Iterable<SSTableReader> fresh()
 +    {
 +        return filterOut(staged.update,
 +                         originals, logged.update);
 +    }
 +
 +    /**
 +     * returns the currently visible readers managed by this transaction
 +     */
 +    public Iterable<SSTableReader> current()
 +    {
 +        // i.e., those that are updates that have been logged (made visible),
 +        // and any original readers that have neither been obsoleted nor updated
 +        return concat(logged.update, filterOut(originals, logged.update, logged.obsolete));
 +    }
 +
 +    /**
 +     * update the current replacement of any original reader back to its original start
 +     */
 +    private List<SSTableReader> restoreUpdatedOriginals()
 +    {
 +        Iterable<SSTableReader> torestore = filterIn(originals, logged.update, logged.obsolete);
 +        return ImmutableList.copyOf(transform(torestore,
 +                                              new Function<SSTableReader, SSTableReader>()
 +                                              {
 +                                                  public SSTableReader apply(SSTableReader reader)
 +                                                  {
 +                                                      return current(reader).cloneWithNewStart(reader.first, null);
 +                                                  }
 +                                              }));
 +    }
 +
 +    /**
 +     * the set of readers guarded by this transaction _in their original instance/state_
 +     * call current(SSTableReader) on any reader in this set to get the latest instance
 +     */
 +    public Set<SSTableReader> originals()
 +    {
 +        return Collections.unmodifiableSet(originals);
 +    }
 +
 +    /**
 +     * indicates if the reader has been marked for obsoletion
 +     */
 +    public boolean isObsolete(SSTableReader reader)
 +    {
 +        return logged.obsolete.contains(reader) || staged.obsolete.contains(reader);
 +    }
 +
 +    /**
 +     * return the current version of the provided reader, whether or not it is visible or staged;
 +     * i.e. returns the first version present by testing staged, logged and originals in order.
 +     */
 +    public SSTableReader current(SSTableReader reader)
 +    {
 +        Set<SSTableReader> container;
 +        if (staged.contains(reader))
 +            container = staged.update.contains(reader) ? staged.update : staged.obsolete;
 +        else if (logged.contains(reader))
 +            container = logged.update.contains(reader) ? logged.update : logged.obsolete;
 +        else if (originals.contains(reader))
 +            container = originals;
 +        else throw new AssertionError();
 +        return select(reader, container);
 +    }
 +
 +    /**
 +     * remove the reader from the set we're modifying
 +     */
 +    public void cancel(SSTableReader cancel)
 +    {
 +        logger.debug("Cancelling {} from transaction", cancel);
 +        assert originals.contains(cancel) : "may only cancel a reader in the 'original' set: " + cancel + " vs " + originals;
 +        assert !(staged.contains(cancel) || logged.contains(cancel)) : "may only cancel a reader that has not been updated or obsoleted in this transaction: " + cancel;
 +        originals.remove(cancel);
 +        marked.remove(cancel);
 +        maybeFail(unmarkCompacting(singleton(cancel), null));
 +    }
 +
 +    /**
 +     * remove the readers from the set we're modifying
 +     */
 +    public void cancel(Iterable<SSTableReader> cancels)
 +    {
 +        for (SSTableReader cancel : cancels)
 +            cancel(cancel);
 +    }
 +
 +    /**
 +     * remove the provided readers from this Transaction, and return a new Transaction to manage them
 +     * only permitted to be called if the current Transaction has never been used
 +     */
 +    public LifecycleTransaction split(Collection<SSTableReader> readers)
 +    {
 +        logger.debug("Splitting {} into new transaction", readers);
 +        checkUnused();
 +        for (SSTableReader reader : readers)
 +            assert identities.contains(reader.instanceId) : "may only split the same reader instance the transaction was opened with: " + reader;
 +
 +        for (SSTableReader reader : readers)
 +        {
 +            identities.remove(reader.instanceId);
 +            originals.remove(reader);
 +            marked.remove(reader);
 +        }
 +        return new LifecycleTransaction(tracker, operationType, readers);
 +    }
 +
 +    /**
 +     * check this transaction has never been used
 +     */
 +    private void checkUnused()
 +    {
 +        assert logged.isEmpty();
 +        assert staged.isEmpty();
 +        assert identities.size() == originals.size();
 +        assert originals.size() == marked.size();
 +    }
 +
 +    private Throwable unmarkCompacting(Set<SSTableReader> unmark, Throwable accumulate)
 +    {
 +        accumulate = tracker.apply(updateCompacting(unmark, emptySet()), accumulate);
 +        // when the CFS is invalidated, it will call unreferenceSSTables().  However, unreferenceSSTables only deals
 +        // with sstables that aren't currently being compacted.  If there are ongoing compactions that finish or are
 +        // interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here.
 +        accumulate = tracker.dropSSTablesIfInvalid(accumulate);
 +        return accumulate;
 +    }
 +
 +    // convenience method for callers that know only one sstable is involved in the transaction
 +    public SSTableReader onlyOne()
 +    {
 +        assert originals.size() == 1;
 +        return getFirst(originals, null);
 +    }
 +
 +    // a class representing the current state of the reader within this transaction, encoding the actions both logged
 +    // and pending, and the reader instances that are visible now, and will be after the next checkpoint (with null
 +    // indicating either obsolescence, or that the reader does not occur in the transaction; which is defined
 +    // by the corresponding Action)
 +    @VisibleForTesting
 +    public static class ReaderState
 +    {
 +        public static enum Action
 +        {
 +            UPDATED, OBSOLETED, NONE;
 +            public static Action get(boolean updated, boolean obsoleted)
 +            {
 +                assert !(updated && obsoleted);
 +                return updated ? UPDATED : obsoleted ? OBSOLETED : NONE;
 +            }
 +        }
 +
 +        final Action staged;
 +        final Action logged;
 +        final SSTableReader nextVisible;
 +        final SSTableReader currentlyVisible;
 +        final boolean original;
 +
 +        public ReaderState(Action logged, Action staged, SSTableReader currentlyVisible, SSTableReader nextVisible, boolean original)
 +        {
 +            this.staged = staged;
 +            this.logged = logged;
 +            this.currentlyVisible = currentlyVisible;
 +            this.nextVisible = nextVisible;
 +            this.original = original;
 +        }
 +
 +        public boolean equals(Object that)
 +        {
 +            return that instanceof ReaderState && equals((ReaderState) that);
 +        }
 +
 +        public boolean equals(ReaderState that)
 +        {
 +            return this.staged == that.staged && this.logged == that.logged && this.original == that.original
 +                && this.currentlyVisible == that.currentlyVisible && this.nextVisible == that.nextVisible;
 +        }
 +
 +        public String toString()
 +        {
 +            return String.format("[logged=%s staged=%s original=%s]", logged, staged, original);
 +        }
 +
 +        public static SSTableReader visible(SSTableReader reader, Predicate<SSTableReader> obsolete, Collection<SSTableReader> ... selectFrom)
 +        {
 +            return obsolete.apply(reader) ? null : selectFirst(reader, selectFrom);
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public ReaderState state(SSTableReader reader)
 +    {
 +        SSTableReader currentlyVisible = ReaderState.visible(reader, in(logged.obsolete), logged.update, originals);
 +        SSTableReader nextVisible = ReaderState.visible(reader, orIn(staged.obsolete, logged.obsolete), staged.update, logged.update, originals);
 +        return new ReaderState(ReaderState.Action.get(logged.update.contains(reader), logged.obsolete.contains(reader)),
 +                               ReaderState.Action.get(staged.update.contains(reader), staged.obsolete.contains(reader)),
 +                               currentlyVisible, nextVisible, originals.contains(reader)
 +        );
 +    }
 +
 +    public String toString()
 +    {
 +        return originals.toString();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f430c53/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index a32dd87,0000000..f1c4685
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -1,459 -1,0 +1,467 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.io.File;
 +import java.util.*;
 +import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Function;
 +import com.google.common.base.Predicate;
 +import com.google.common.base.Predicates;
 +import com.google.common.collect.*;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.notifications.*;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +
 +import static com.google.common.base.Predicates.and;
 +import static com.google.common.collect.ImmutableSet.copyOf;
 +import static com.google.common.collect.Iterables.filter;
 +import static java.util.Collections.singleton;
 +import static org.apache.cassandra.db.lifecycle.Helpers.*;
 +import static org.apache.cassandra.db.lifecycle.View.permitCompacting;
 +import static org.apache.cassandra.db.lifecycle.View.updateCompacting;
 +import static org.apache.cassandra.db.lifecycle.View.updateLiveSet;
 +import static org.apache.cassandra.utils.Throwables.maybeFail;
 +import static org.apache.cassandra.utils.Throwables.merge;
 +import static org.apache.cassandra.utils.concurrent.Refs.release;
 +import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
 +
 +public class Tracker
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(Tracker.class);
 +
 +    public final Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<>();
 +    public final ColumnFamilyStore cfstore;
 +    final AtomicReference<View> view;
 +    public final boolean loadsstables;
 +
 +    public Tracker(ColumnFamilyStore cfstore, boolean loadsstables)
 +    {
 +        this.cfstore = cfstore;
 +        this.view = new AtomicReference<>();
 +        this.loadsstables = loadsstables;
 +        this.reset();
 +    }
 +
 +    public LifecycleTransaction tryModify(SSTableReader sstable, OperationType operationType)
 +    {
 +        return tryModify(singleton(sstable), operationType);
 +    }
 +
 +    /**
 +     * @return a Transaction over the provided sstables if we are able to mark the given @param sstables as compacted, before anyone else
 +     */
 +    public LifecycleTransaction tryModify(Iterable<SSTableReader> sstables, OperationType operationType)
 +    {
 +        if (Iterables.isEmpty(sstables))
 +            return new LifecycleTransaction(this, operationType, sstables);
 +        if (null == apply(permitCompacting(sstables), updateCompacting(emptySet(), sstables)))
 +            return null;
 +        return new LifecycleTransaction(this, operationType, sstables);
 +    }
 +
 +
 +    // METHODS FOR ATOMICALLY MODIFYING THE VIEW
 +
 +    Pair<View, View> apply(Function<View, View> function)
 +    {
 +        return apply(Predicates.<View>alwaysTrue(), function);
 +    }
 +
 +    Throwable apply(Function<View, View> function, Throwable accumulate)
 +    {
 +        try
 +        {
 +            apply(function);
 +        }
 +        catch (Throwable t)
 +        {
 +            accumulate = merge(accumulate, t);
 +        }
 +        return accumulate;
 +    }
 +
 +    /**
 +     * atomically tests permit against the view and applies function to it, if permit yields true, returning the original;
 +     * otherwise the method aborts, returning null
 +     */
 +    Pair<View, View> apply(Predicate<View> permit, Function<View, View> function)
 +    {
 +        while (true)
 +        {
 +            View cur = view.get();
 +            if (!permit.apply(cur))
 +                return null;
 +            View updated = function.apply(cur);
 +            if (view.compareAndSet(cur, updated))
 +                return Pair.create(cur, updated);
 +        }
 +    }
 +
 +    Throwable updateSizeTracking(Iterable<SSTableReader> oldSSTables, Iterable<SSTableReader> newSSTables, Throwable accumulate)
 +    {
-         if (cfstore == null)
++        if (isDummy())
 +            return accumulate;
 +
 +        long add = 0;
 +        for (SSTableReader sstable : newSSTables)
 +        {
 +            if (logger.isDebugEnabled())
 +                logger.debug("adding {} to list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name);
 +            try
 +            {
 +                add += sstable.bytesOnDisk();
 +            }
 +            catch (Throwable t)
 +            {
 +                accumulate = merge(accumulate, t);
 +            }
 +        }
 +        long subtract = 0;
 +        for (SSTableReader sstable : oldSSTables)
 +        {
 +            if (logger.isDebugEnabled())
 +                logger.debug("removing {} from list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name);
 +            try
 +            {
 +                subtract += sstable.bytesOnDisk();
 +            }
 +            catch (Throwable t)
 +            {
 +                accumulate = merge(accumulate, t);
 +            }
 +        }
 +        StorageMetrics.load.inc(add - subtract);
 +        cfstore.metric.liveDiskSpaceUsed.inc(add - subtract);
 +        // we don't subtract from total until the sstable is deleted
 +        cfstore.metric.totalDiskSpaceUsed.inc(add);
 +        return accumulate;
 +    }
 +
 +    // SETUP / CLEANUP
 +
 +    public void addInitialSSTables(Iterable<SSTableReader> sstables)
 +    {
++        if (!isDummy())
++            setupKeycache(sstables);
 +        apply(updateLiveSet(emptySet(), sstables));
 +        maybeFail(updateSizeTracking(emptySet(), sstables, null));
 +        // no notifications or backup necessary
 +    }
 +
 +    public void addSSTables(Iterable<SSTableReader> sstables)
 +    {
 +        addInitialSSTables(sstables);
 +        for (SSTableReader sstable : sstables)
 +        {
 +            maybeIncrementallyBackup(sstable);
 +            notifyAdded(sstable);
 +        }
 +    }
 +
 +    /** (Re)initializes the tracker, purging all references. */
 +    @VisibleForTesting
 +    public void reset()
 +    {
 +        view.set(new View(
-                          cfstore != null ? ImmutableList.of(new Memtable(cfstore)) : Collections.<Memtable>emptyList(),
++                         !isDummy() ? ImmutableList.of(new Memtable(cfstore)) : Collections.<Memtable>emptyList(),
 +                         ImmutableList.<Memtable>of(),
 +                         Collections.<SSTableReader, SSTableReader>emptyMap(),
 +                         Collections.<SSTableReader>emptySet(),
 +                         SSTableIntervalTree.empty()));
 +    }
 +
 +    public Throwable dropSSTablesIfInvalid(Throwable accumulate)
 +    {
-         if (cfstore != null && !cfstore.isValid())
++        if (!isDummy() && !cfstore.isValid())
 +            accumulate = dropSSTables(accumulate);
 +        return accumulate;
 +    }
 +
 +    public void dropSSTables()
 +    {
 +        maybeFail(dropSSTables(null));
 +    }
 +
 +    public Throwable dropSSTables(Throwable accumulate)
 +    {
 +        return dropSSTables(Predicates.<SSTableReader>alwaysTrue(), OperationType.UNKNOWN, accumulate);
 +    }
 +
 +    /**
 +     * removes all sstables that are not busy compacting.
 +     */
 +    public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate)
 +    {
 +        Pair<View, View> result = apply(new Function<View, View>()
 +        {
 +            public View apply(View view)
 +            {
 +                Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting))));
 +                return updateLiveSet(toremove, emptySet()).apply(view);
 +            }
 +        });
 +
 +        Set<SSTableReader> removed = Sets.difference(result.left.sstables, result.right.sstables);
 +        assert Iterables.all(removed, remove);
 +
 +        if (!removed.isEmpty())
 +        {
 +            // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
 +            accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), operationType, accumulate);
 +            accumulate = updateSizeTracking(removed, emptySet(), accumulate);
 +            accumulate = markObsolete(this, removed, accumulate);
 +            accumulate = release(selfRefs(removed), accumulate);
 +        }
 +        return accumulate;
 +    }
 +
 +    /**
 +     * Removes every SSTable in the directory from the Tracker's view.
 +     * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
 +     */
 +    public void removeUnreadableSSTables(final File directory)
 +    {
 +        maybeFail(dropSSTables(new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader reader)
 +            {
 +                return reader.descriptor.directory.equals(directory);
 +            }
 +        }, OperationType.UNKNOWN, null));
 +    }
 +
 +
 +
 +    // FLUSHING
 +
 +    /**
 +     * get the Memtable that the ordered writeOp should be directed to
 +     */
 +    public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
 +    {
 +        // since any new memtables appended to the list after we fetch it will be for operations started
 +        // after us, we can safely assume that we will always find the memtable that 'accepts' us;
 +        // if the barrier for any memtable is set whilst we are reading the list, it must accept us.
 +
 +        // there may be multiple memtables in the list that would 'accept' us, however we only ever choose
 +        // the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't
 +        // assign operations to a memtable that was retired/queued before we started)
 +        for (Memtable memtable : view.get().liveMemtables)
 +        {
 +            if (memtable.accepts(opGroup, replayPosition))
 +                return memtable;
 +        }
 +        throw new AssertionError(view.get().liveMemtables.toString());
 +    }
 +
 +    /**
 +     * Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables,
 +     * returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until
 +     * discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable
 +     * must be followed by discarding(m), they cannot be interleaved.
 +     *
 +     * @return the previously active memtable
 +     */
 +    public Memtable switchMemtable(boolean truncating)
 +    {
 +        Memtable newMemtable = new Memtable(cfstore);
 +        Pair<View, View> result = apply(View.switchMemtable(newMemtable));
 +        if (truncating)
 +            notifyRenewed(newMemtable);
 +
 +        return result.left.getCurrentMemtable();
 +    }
 +
 +    public void markFlushing(Memtable memtable)
 +    {
 +        apply(View.markFlushing(memtable));
 +    }
 +
 +    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
 +    {
++        assert !isDummy();
 +        if (sstable == null)
 +        {
 +            // sstable may be null if we flushed batchlog and nothing needed to be retained
 +            // if it's null, we don't care what state the cfstore is in, we just replace it and continue
 +            apply(View.replaceFlushed(memtable, null));
 +            return;
 +        }
 +
 +        sstable.setupKeyCache();
 +        // back up before creating a new Snapshot (which makes the new one eligible for compaction)
 +        maybeIncrementallyBackup(sstable);
 +
 +        apply(View.replaceFlushed(memtable, sstable));
 +
 +        Throwable fail;
 +        fail = updateSizeTracking(emptySet(), singleton(sstable), null);
 +        // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
 +        fail = notifyAdded(sstable, fail);
 +
-         if (cfstore != null && !cfstore.isValid())
++        if (!isDummy() && !cfstore.isValid())
 +            dropSSTables();
 +
 +        maybeFail(fail);
 +    }
 +
 +
 +
 +    // MISCELLANEOUS public utility calls
 +
 +    public Set<SSTableReader> getSSTables()
 +    {
 +        return view.get().sstables;
 +    }
 +
 +    public Set<SSTableReader> getCompacting()
 +    {
 +        return view.get().compacting;
 +    }
 +
 +    public Set<SSTableReader> getUncompacting()
 +    {
 +        return view.get().nonCompactingSStables();
 +    }
 +
 +    public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
 +    {
 +        return view.get().getUncompacting(candidates);
 +    }
 +
 +    public void maybeIncrementallyBackup(final SSTableReader sstable)
 +    {
 +        if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
 +            return;
 +
 +        File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
 +        sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
 +    }
 +
 +
 +    // NOTIFICATION
 +
 +    Throwable notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType, Throwable accumulate)
 +    {
 +        INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
 +        for (INotificationConsumer subscriber : subscribers)
 +        {
 +            try
 +            {
 +                subscriber.handleNotification(notification, this);
 +            }
 +            catch (Throwable t)
 +            {
 +                accumulate = merge(accumulate, t);
 +            }
 +        }
 +        return accumulate;
 +    }
 +
 +    Throwable notifyAdded(SSTableReader added, Throwable accumulate)
 +    {
 +        INotification notification = new SSTableAddedNotification(added);
 +        for (INotificationConsumer subscriber : subscribers)
 +        {
 +            try
 +            {
 +                subscriber.handleNotification(notification, this);
 +            }
 +            catch (Throwable t)
 +            {
 +                accumulate = merge(accumulate, t);
 +            }
 +        }
 +        return accumulate;
 +    }
 +
 +    public void notifyAdded(SSTableReader added)
 +    {
 +        maybeFail(notifyAdded(added, null));
 +    }
 +
 +    public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
 +    {
 +        INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
 +        for (INotificationConsumer subscriber : subscribers)
 +            subscriber.handleNotification(notification, this);
 +    }
 +
 +    public void notifyDeleting(SSTableReader deleting)
 +    {
 +        INotification notification = new SSTableDeletingNotification(deleting);
 +        for (INotificationConsumer subscriber : subscribers)
 +            subscriber.handleNotification(notification, this);
 +    }
 +
 +    public void notifyRenewed(Memtable renewed)
 +    {
 +        INotification notification = new MemtableRenewedNotification(renewed);
 +        for (INotificationConsumer subscriber : subscribers)
 +            subscriber.handleNotification(notification, this);
 +    }
 +
 +    public void notifyTruncated(long truncatedAt)
 +    {
 +        INotification notification = new TruncationNotification(truncatedAt);
 +        for (INotificationConsumer subscriber : subscribers)
 +            subscriber.handleNotification(notification, this);
 +    }
 +
++    public boolean isDummy()
++    {
++        return cfstore == null;
++    }
++
 +    public void subscribe(INotificationConsumer consumer)
 +    {
 +        subscribers.add(consumer);
 +    }
 +
 +    public void unsubscribe(INotificationConsumer consumer)
 +    {
 +        subscribers.remove(consumer);
 +    }
 +
 +    private static Set<SSTableReader> emptySet()
 +    {
 +        return Collections.emptySet();
 +    }
 +
 +    public View getView()
 +    {
 +        return view.get();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f430c53/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index b4b6c72,82492a8..dc4fe75
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@@ -20,6 -20,8 +20,7 @@@ package org.apache.cassandra.io.sstable
  import java.util.*;
  
  import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.base.Throwables;
 -import com.google.common.collect.ImmutableList;
++import com.google.common.util.concurrent.Runnables;
  
  import org.apache.cassandra.cache.InstrumentingCache;
  import org.apache.cassandra.cache.KeyCacheKey;
@@@ -214,16 -285,19 +215,17 @@@ public class SSTableRewriter extends Tr
      {
          if (isOffline)
              return;
 -        List<SSTableReader> toReplace = new ArrayList<>();
 -        List<SSTableReader> replaceWith = new ArrayList<>();
 +        if (preemptiveOpenInterval == Long.MAX_VALUE)
 +            return;
 +
          final List<DecoratedKey> invalidateKeys = new ArrayList<>();
 -        if (!reset)
 -        {
 -            newReader.setupKeyCache();
 -            invalidateKeys.addAll(cachedKeys.keySet());
 -            for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
 -                newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
 -        }
 +        invalidateKeys.addAll(cachedKeys.keySet());
++        newReader.setupKeyCache();
 +        for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
 +            newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
  
          cachedKeys = new HashMap<>();
 -        for (SSTableReader sstable : ImmutableList.copyOf(rewriting))
 +        for (SSTableReader sstable : transaction.originals())
          {
              // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
              // note: only one such writer should be written to at any moment

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f430c53/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index dbf95c1,5ebfef7..105ecf6
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -239,36 -179,30 +234,36 @@@ public class SSTableRewriterTest extend
          Keyspace keyspace = Keyspace.open(KEYSPACE);
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
          truncate(cfs);
-         assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+ 
          ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
 -        for (int i = 0; i < 1000; i++)
 -            cf.addColumn(Util.column(String.valueOf(i), "a", 1));
 +        for (int i = 0; i < 100; i++)
 +            cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
          File dir = cfs.directories.getDirectoryForNewSSTables();
 -        SSTableWriter writer = getWriter(cfs, dir);
 -        try
 +
 +        try (SSTableWriter writer = getWriter(cfs, dir);)
          {
 -            for (int i = 0; i < 1000; i++)
 +            for (int i = 0; i < 10000; i++)
                  writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf);
 -            SSTableReader s = writer.openEarly(1000);
 +            SSTableReader s = writer.setMaxDataAge(1000).openEarly();
 +            assert s != null;
              assertFileCounts(dir.list(), 2, 2);
 -            for (int i = 1000; i < 2000; i++)
 +            for (int i = 10000; i < 20000; i++)
                  writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf);
 -            SSTableReader s2 = writer.openEarly(1000);
 +            SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
              assertTrue(s.last.compareTo(s2.last) < 0);
              assertFileCounts(dir.list(), 2, 2);
 -            s.markObsolete(cfs.getDataTracker());
 +            s.markObsolete(cfs.getTracker());
              s.selfRef().release();
              s2.selfRef().release();
 -            Thread.sleep(1000);
 -            assertFileCounts(dir.list(), 0, 2);
 +            // These checks don't work on Windows because the writer has the channel still
 +            // open till .abort() is called (via the builder)
 +            if (!FBUtilities.isWindows())
 +            {
 +                SSTableDeletingTask.waitForDeletions();
 +                assertFileCounts(dir.list(), 0, 2);
 +            }
              writer.abort();
 -            Thread.sleep(1000);
 +            SSTableDeletingTask.waitForDeletions();
              int datafiles = assertFileCounts(dir.list(), 0, 0);
              assertEquals(datafiles, 0);
              validateCFS(cfs);
@@@ -555,14 -502,17 +550,12 @@@
                  }
              }
  
 -            List<SSTableReader> sstables = rewriter.finish();
 -            cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
 -            Thread.sleep(1000);
 -            assertFileCounts(s.descriptor.directory.list(), 0, 0);
 -            truncate(cfs);
 -        }
 -        catch (Throwable t)
 -        {
 -            rewriter.abort();
 -            throw t;
 +            sstables = rewriter.finish();
          }
 +
 +        SSTableDeletingTask.waitForDeletions();
 +        assertFileCounts(s.descriptor.directory.list(), 0, 0);
-         cfs.truncateBlocking();
-         SSTableDeletingTask.waitForDeletions();
 +        validateCFS(cfs);
      }
  
      @Test
@@@ -615,22 -566,18 +608,23 @@@
          truncate(cfs);
          cfs.disableAutoCompaction();
          SSTableReader s = writeFile(cfs, 1000);
 -        cfs.getDataTracker().markCompacting(Arrays.asList(s), true, false);
 -        SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10);
 -        splitter.split();
 -        Thread.sleep(1000);
 -        assertFileCounts(s.descriptor.directory.list(), 0, 0);
 -        s.selfRef().release();
 -        for (File f : s.descriptor.directory.listFiles())
 +        try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.UNKNOWN, s))
          {
 -            // we need to clear out the data dir, otherwise tests running after this breaks
 -            f.delete();
 +            SSTableSplitter splitter = new SSTableSplitter(cfs, txn, 10);
 +            splitter.split();
 +
 +            assertFileCounts(s.descriptor.directory.list(), 0, 0);
 +
 +            s.selfRef().release();
 +            SSTableDeletingTask.waitForDeletions();
 +
 +            for (File f : s.descriptor.directory.listFiles())
 +            {
 +                // we need to clear out the data dir, otherwise tests running after this breaks
 +                FileUtils.deleteRecursive(f);
 +            }
          }
+         truncate(cfs);
      }
  
      @Test
@@@ -764,11 -707,20 +758,12 @@@
                  keyCount++;
                  validateKeys(keyspace);
              }
 -            try
 -            {
 -                cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(), OperationType.COMPACTION);
 -                cfs.getDataTracker().unmarkCompacting(compacting);
 -            }
 -            catch (Throwable t)
 -            {
 -                rewriter.abort();
 -            }
 +            rewriter.finish();
          }
          validateKeys(keyspace);
 -        Thread.sleep(1000);
 +        SSTableDeletingTask.waitForDeletions();
          validateCFS(cfs);
+         truncate(cfs);
      }
  
      @Test
@@@ -866,46 -819,29 +861,48 @@@
          }
      }
  
 -    private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
 +    public static void truncate(ColumnFamilyStore cfs)
      {
 -        ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
 -        for (int i = 0; i < count / 100; i++)
 -            cf.addColumn(Util.cellname(i), random(0, 1000), 1);
 -        File dir = cfs.directories.getDirectoryForNewSSTables();
 -        String filename = cfs.getTempSSTablePath(dir);
 +        cfs.truncateBlocking();
++        SSTableDeletingTask.waitForDeletions();
 +        Uninterruptibles.sleepUninterruptibly(10L,TimeUnit.MILLISECONDS);
 +        assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
 +        assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
++        validateCFS(cfs);
 +    }
 +
 +    public static SSTableReader writeFile(ColumnFamilyStore cfs, int count)
 +    {
 +        return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null);
 +    }
  
 -        SSTableWriter writer = new SSTableWriter(filename,
 -                0,
 -                0,
 -                cfs.metadata,
 -                StorageService.getPartitioner(),
 -                new MetadataCollector(cfs.metadata.comparator));
 +    public static Set<SSTableReader> writeFiles(ColumnFamilyStore cfs, int fileCount, int partitionCount, int cellCount, int cellSize)
 +    {
 +        int i = 0;
 +        Set<SSTableReader> result = new LinkedHashSet<>();
 +        for (int f = 0 ; f < fileCount ; f++)
 +        {
 +            File dir = cfs.directories.getDirectoryForNewSSTables();
 +            String filename = cfs.getTempSSTablePath(dir);
  
 -        for (int i = 0; i < count * 5; i++)
 -            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
 -        return writer.closeAndOpenReader();
 +            SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
 +            int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount;
 +            for ( ; i < end ; i++)
 +            {
 +                ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
 +                for (int j = 0; j < cellCount ; j++)
 +                    cf.addColumn(Util.cellname(j), random(0, cellSize), 1);
 +                writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
 +            }
 +            result.add(writer.finish(true));
 +        }
 +        return result;
      }
  
 -    private void validateCFS(ColumnFamilyStore cfs)
 +    public static void validateCFS(ColumnFamilyStore cfs)
      {
          Set<Integer> liveDescriptors = new HashSet<>();
 +        long spaceUsed = 0;
          for (SSTableReader sstable : cfs.getSSTables())
          {
              assertFalse(sstable.isMarkedCompacted());
@@@ -924,13 -859,19 +921,12 @@@
                  }
              }
          }
 -        assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
 -        assertTrue("" + cfs.getTotalDiskSpaceUsed(), cfs.getTotalDiskSpaceUsed() >= 0);
 -    }
 -
 -    private void truncate(ColumnFamilyStore cfs)
 -    {
 -        cfs.truncateBlocking();
 -        SSTableDeletingTask.waitForDeletions();
 -        validateCFS(cfs);
 -        assertTrue(cfs.getTotalDiskSpaceUsed() == 0);
 +        assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
 +        assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
 +        assertTrue(cfs.getTracker().getCompacting().isEmpty());
      }
  
- 
 -    private int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
 +    public static int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
      {
          int tmplinkcount = 0;
          int tmpcount = 0;


Mime
View raw message