cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ble...@apache.org
Subject [7/9] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0
Date Tue, 22 Sep 2015 20:13:18 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Keyspace.java
index 0676e45,92a0950..cf34e9a
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@@ -302,16 -264,14 +302,16 @@@ public class Keyspac
          createReplicationStrategy(metadata);
  
          this.metric = new KeyspaceMetrics(this);
 -        for (CFMetaData cfm : new ArrayList<>(metadata.cfMetaData().values()))
 +        this.viewManager = new ViewManager(this);
 +        for (CFMetaData cfm : metadata.tablesAndViews())
          {
-             logger.debug("Initializing {}.{}", getName(), cfm.cfName);
+             logger.trace("Initializing {}.{}", getName(), cfm.cfName);
              initCf(cfm.cfId, cfm.cfName, loadSSTables);
          }
 +        this.viewManager.reload();
      }
  
 -    private Keyspace(KSMetaData metadata)
 +    private Keyspace(KeyspaceMetadata metadata)
      {
          this.metadata = metadata;
          createReplicationStrategy(metadata);
@@@ -415,37 -379,6 +415,37 @@@
          if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
              throw new RuntimeException("Testing write failures");
  
 +        Lock lock = null;
 +        boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
 +
 +        if (requiresViewUpdate)
 +        {
 +            lock = ViewManager.acquireLockFor(mutation.key().getKey());
 +
 +            if (lock == null)
 +            {
 +                if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
 +                {
-                     logger.debug("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
++                    logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
 +                    Tracing.trace("Could not acquire MV lock");
 +                    throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
 +                }
 +                else
 +                {
 +                    //This view update can't happen right now. so rather than keep this thread busy
 +                    // we will re-apply ourself to the queue and try again later
 +                    StageManager.getStage(Stage.MUTATION).execute(() -> {
 +                        if (writeCommitLog)
 +                            mutation.apply();
 +                        else
 +                            mutation.applyUnsafe();
 +                    });
 +
 +                    return;
 +                }
 +            }
 +        }
 +        int nowInSec = FBUtilities.nowInSeconds();
          try (OpOrder.Group opGroup = writeOrder.start())
          {
              // write the mutation to the commitlog and memtables
@@@ -502,22 -415,30 +502,22 @@@
  
      /**
       * @param key row to index
 -     * @param cfs ColumnFamily to index row in
 -     * @param idxNames columns to index, in comparator order
 +     * @param cfs ColumnFamily to index partition in
 +     * @param indexes the indexes to submit the row to
       */
 -    public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
 +    public static void indexPartition(DecoratedKey key, ColumnFamilyStore cfs, Set<Index> indexes)
      {
-         if (logger.isDebugEnabled())
-             logger.debug("Indexing partition {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
+         if (logger.isTraceEnabled())
 -            logger.trace("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
++            logger.trace("Indexing partition {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
  
 -        try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
 -        {
 -            Set<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
 +        SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata,
 +                                                                                      FBUtilities.nowInSeconds(),
 +                                                                                      key);
  
 -            Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.getKey(), DEFAULT_PAGE_SIZE);
 -            while (pager.hasNext())
 -            {
 -                ColumnFamily cf = pager.next();
 -                ColumnFamily cf2 = cf.cloneMeShallow();
 -                for (Cell cell : cf)
 -                {
 -                    if (cfs.indexManager.indexes(cell.name(), indexes))
 -                        cf2.addColumn(cell);
 -                }
 -                cfs.indexManager.indexRow(key.getKey(), cf2, opGroup);
 -            }
 +        try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start();
 +             UnfilteredRowIterator partition = cmd.queryMemtableAndDisk(cfs, opGroup))
 +        {
 +            cfs.indexManager.indexPartition(partition, opGroup, indexes, cmd.nowInSec());
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 7af65d1,e96a71e..81decde
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -354,17 -355,18 +354,17 @@@ public class Memtable implements Compar
  
          protected Directories getDirectories()
          {
 -            return cfs.directories;
 +            return cfs.getDirectories();
          }
  
 -        private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory)
 +        private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
          {
-             logger.info("Writing {}", Memtable.this.toString());
+             logger.debug("Writing {}", Memtable.this.toString());
  
 -            SSTableReader ssTable;
 -            // errors when creating the writer that may leave empty temp files.
 -            try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)))
 +            Collection<SSTableReader> ssTables;
 +            try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
              {
-                 boolean trackContention = logger.isDebugEnabled();
+                 boolean trackContention = logger.isTraceEnabled();
                  int heavilyContendedRowCount = 0;
                  // (we can't clear out the map as-we-go to free up memory,
                  //  since the memtable is being used for queries in the "pending flush" category)
@@@ -392,26 -394,26 +392,26 @@@
  
                  if (writer.getFilePointer() > 0)
                  {
-                     logger.info(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                               writer.getFilename(),
-                                               FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                               context));
+                     logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
 -                                              writer.getFilename(),
 -                                              FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()),
 -                                              context));
++                                               writer.getFilename(),
++                                               FBUtilities.prettyPrintMemory(writer.getFilePointer()),
++                                               context));
  
 -                    // temp sstables should contain non-repaired data.
 -                    ssTable = writer.finish(true);
 +                    // sstables should contain non-repaired data.
 +                    ssTables = writer.finish(true);
                  }
                  else
                  {
-                     logger.info("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                     logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
                                  writer.getFilename(), context);
                      writer.abort();
 -                    ssTable = null;
 +                    ssTables = null;
                  }
  
                  if (heavilyContendedRowCount > 0)
-                     logger.debug(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
 -                    logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), Memtable.this.toString()));
++                    logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
  
 -                return ssTable;
 +                return ssTables;
              }
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
index 6502ed3,ab934c6..3666b27
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@@ -39,9 -39,9 +39,9 @@@ public class MigrationRequestVerbHandle
  
      public void doVerb(MessageIn message, int id)
      {
-         logger.debug("Received migration request from {}.", message.from);
+         logger.trace("Received migration request from {}.", message.from);
          MessageOut<Collection<Mutation>> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
 -                                                                     LegacySchemaTables.convertSchemaToMutations(),
 +                                                                     SchemaKeyspace.convertSchemaToMutations(),
                                                                       MigrationManager.MigrationsSerializer.instance);
          MessagingService.instance().sendReply(response, id, message.from);
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 6b2585e,308edcd..5c47e1e
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -1320,36 -1122,6 +1320,36 @@@ public final class SystemKeyspac
          return result.one().getString("release_version");
      }
  
 +    /**
 +     * Check data directories for old files that can be removed when migrating from 2.1 or 2.2 to 3.0,
 +     * these checks can be removed in 4.0, see CASSANDRA-7066
 +     */
 +    public static void migrateDataDirs()
 +    {
 +        Iterable<String> dirs = Arrays.asList(DatabaseDescriptor.getAllDataFileLocations());
 +        for (String dataDir : dirs)
 +        {
-             logger.debug("Checking directory {} for old files", dataDir);
++            logger.trace("Checking directory {} for old files", dataDir);
 +            File dir = new File(dataDir);
 +            assert dir.exists() : dir + " should have been created by startup checks";
 +
 +            for (File ksdir : dir.listFiles((d, n) -> d.isDirectory()))
 +            {
 +                for (File cfdir : ksdir.listFiles((d, n) -> d.isDirectory()))
 +                {
 +                    if (Descriptor.isLegacyFile(cfdir))
 +                    {
 +                        FileUtils.deleteRecursive(cfdir);
 +                    }
 +                    else
 +                    {
 +                        FileUtils.delete(cfdir.listFiles((d, n) -> Descriptor.isLegacyFile(new File(d, n))));
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
      private static ByteBuffer rangeToBytes(Range<Token> range)
      {
          try (DataOutputBuffer out = new DataOutputBuffer())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 7049191,cb02a8c..2668bba
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -404,7 -388,11 +404,7 @@@ public class CommitLogReplaye
                  if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
                      break;
              }
-             logger.info("Finished reading {}", file);
 -        }
 -        finally
 -        {
 -            FileUtils.closeQuietly(reader);
+             logger.debug("Finished reading {}", file);
          }
      }
  
@@@ -562,8 -551,8 +562,8 @@@
              return;
          }
  
-         if (logger.isDebugEnabled())
-             logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
+         if (logger.isTraceEnabled())
 -            logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
++            logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
  
          Runnable runnable = new WrappedRunnable()
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 75d50e7,ea20a1f..77c0cbb
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -157,10 -165,10 +157,10 @@@ public class CompactionManager implemen
              return Collections.emptyList();
          }
  
-         logger.debug("Scheduling a background task check for {}.{} with {}",
+         logger.trace("Scheduling a background task check for {}.{} with {}",
                       cfs.keyspace.getName(),
                       cfs.name,
 -                     cfs.getCompactionStrategy().getName());
 +                     cfs.getCompactionStrategyManager().getName());
          List<Future<?>> futures = new ArrayList<>();
          // we must schedule it at least once, otherwise compaction will stop for a CF until next flush
          if (executor.isShutdown())
@@@ -210,11 -218,11 +210,11 @@@
                      return;
                  }
  
 -                AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
 -                AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs));
 +                CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
 +                AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, FBUtilities.nowInSeconds()));
                  if (task == null)
                  {
-                     logger.debug("No tasks available");
+                     logger.trace("No tasks available");
                      return;
                  }
                  task.execute(metrics);
@@@ -452,8 -460,8 +452,8 @@@
                                        LifecycleTransaction txn,
                                        long repairedAt) throws InterruptedException, IOException
      {
 -        logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
 +        logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getLiveSSTables());
-         logger.debug("Starting anticompaction for ranges {}", ranges);
+         logger.trace("Starting anticompaction for ranges {}", ranges);
          Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
          Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
          Set<SSTableReader> nonAnticompacting = new HashSet<>();
@@@ -781,10 -788,10 +781,10 @@@
  
          long totalkeysWritten = 0;
  
 -        long expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
 +        long expectedBloomFilterSize = Math.max(cfs.metadata.params.minIndexInterval,
                                                 SSTableReader.getApproximateKeyCount(txn.originals()));
-         if (logger.isDebugEnabled())
-             logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
+         if (logger.isTraceEnabled())
+             logger.trace("Expected bloom filter size : {}", expectedBloomFilterSize);
  
          logger.info("Cleaning up {}", sstable);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 47c8de8,0000000..bd72c64
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -1,513 -1,0 +1,513 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.compaction;
 +
 +
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +
 +import com.google.common.collect.Iterables;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.SerializationHeader;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.notifications.*;
 +import org.apache.cassandra.schema.CompactionParams;
 +import org.apache.cassandra.service.ActiveRepairService;
 +
 +/**
 + * Manages the compaction strategies.
 + *
 + * Currently has two instances of actual compaction strategies - one for repaired data and one for
 + * unrepaired data. This is done to be able to totally separate the different sets of sstables.
 + */
 +public class CompactionStrategyManager implements INotificationConsumer
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
 +    private final ColumnFamilyStore cfs;
 +    private volatile AbstractCompactionStrategy repaired;
 +    private volatile AbstractCompactionStrategy unrepaired;
 +    private volatile boolean enabled = true;
 +    public boolean isActive = true;
 +    private volatile CompactionParams params;
 +    /*
 +        We keep a copy of the schema compaction parameters here to be able to decide if we
 +        should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
 +
 +        If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
 +        we will use the new compaction parameters.
 +     */
 +    private CompactionParams schemaCompactionParams;
 +
 +    public CompactionStrategyManager(ColumnFamilyStore cfs)
 +    {
 +        cfs.getTracker().subscribe(this);
-         logger.debug("{} subscribed to the data tracker.", this);
++        logger.trace("{} subscribed to the data tracker.", this);
 +        this.cfs = cfs;
 +        reload(cfs.metadata);
 +        params = cfs.metadata.params.compaction;
 +        enabled = params.isEnabled();
 +    }
 +
 +    /**
 +     * Return the next background task
 +     *
 +     * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
 +     *
 +     */
 +    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
 +    {
 +        if (!isEnabled())
 +            return null;
 +
 +        maybeReload(cfs.metadata);
 +
 +        if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
 +        {
 +            AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
 +            if (repairedTask != null)
 +                return repairedTask;
 +            return unrepaired.getNextBackgroundTask(gcBefore);
 +        }
 +        else
 +        {
 +            AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
 +            if (unrepairedTask != null)
 +                return unrepairedTask;
 +            return repaired.getNextBackgroundTask(gcBefore);
 +        }
 +    }
 +
 +    public boolean isEnabled()
 +    {
 +        return enabled && isActive;
 +    }
 +
 +    public synchronized void resume()
 +    {
 +        isActive = true;
 +    }
 +
 +    /**
 +     * pause compaction while we cancel all ongoing compactions
 +     *
 +     * Separate call from enable/disable to not have to save the enabled-state externally
 +      */
 +    public synchronized void pause()
 +    {
 +        isActive = false;
 +    }
 +
 +
 +    private void startup()
 +    {
 +        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
 +        {
 +            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
 +                getCompactionStrategyFor(sstable).addSSTable(sstable);
 +        }
 +        repaired.startup();
 +        unrepaired.startup();
 +    }
 +
 +    /**
 +     * return the compaction strategy for the given sstable
 +     *
 +     * returns differently based on the repaired status
 +     * @param sstable
 +     * @return
 +     */
 +    private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
 +    {
 +        if (sstable.isRepaired())
 +            return repaired;
 +        else
 +            return unrepaired;
 +    }
 +
 +    public void shutdown()
 +    {
 +        isActive = false;
 +        repaired.shutdown();
 +        unrepaired.shutdown();
 +    }
 +
 +    public synchronized void maybeReload(CFMetaData metadata)
 +    {
 +        // compare the old schema configuration to the new one, ignore any locally set changes.
 +        if (metadata.params.compaction.equals(schemaCompactionParams))
 +            return;
 +        reload(metadata);
 +    }
 +
 +    /**
 +     * Reload the compaction strategies
 +     *
 +     * Called after changing configuration and at startup.
 +     * @param metadata
 +     */
 +    public synchronized void reload(CFMetaData metadata)
 +    {
 +        boolean disabledWithJMX = !enabled && shouldBeEnabled();
 +        setStrategy(metadata.params.compaction);
 +        schemaCompactionParams = metadata.params.compaction;
 +
 +        if (disabledWithJMX || !shouldBeEnabled())
 +            disable();
 +        else
 +            enable();
 +        startup();
 +    }
 +
 +    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
 +    {
 +        cfs.getTracker().replaceFlushed(memtable, sstables);
 +        if (sstables != null && !sstables.isEmpty())
 +            CompactionManager.instance.submitBackground(cfs);
 +    }
 +
 +    public int getUnleveledSSTables()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
 +        {
 +            int count = 0;
 +            count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
 +            count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
 +            return count;
 +        }
 +        return 0;
 +    }
 +
 +    public synchronized int[] getSSTableCountPerLevel()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
 +        {
 +            int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
 +            int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
 +            res = sumArrays(res, repairedCountPerLevel);
 +            int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
 +            res = sumArrays(res, unrepairedCountPerLevel);
 +            return res;
 +        }
 +        return null;
 +    }
 +
 +    private static int[] sumArrays(int[] a, int[] b)
 +    {
 +        int[] res = new int[Math.max(a.length, b.length)];
 +        for (int i = 0; i < res.length; i++)
 +        {
 +            if (i < a.length && i < b.length)
 +                res[i] = a[i] + b[i];
 +            else if (i < a.length)
 +                res[i] = a[i];
 +            else
 +                res[i] = b[i];
 +        }
 +        return res;
 +    }
 +
 +    public boolean shouldDefragment()
 +    {
 +        assert repaired.getClass().equals(unrepaired.getClass());
 +        return repaired.shouldDefragment();
 +    }
 +
 +    public Directories getDirectories()
 +    {
 +        assert repaired.getClass().equals(unrepaired.getClass());
 +        return repaired.getDirectories();
 +    }
 +
 +    public synchronized void handleNotification(INotification notification, Object sender)
 +    {
 +        if (notification instanceof SSTableAddedNotification)
 +        {
 +            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
 +            for (SSTableReader sstable : flushedNotification.added)
 +            {
 +                if (sstable.isRepaired())
 +                    repaired.addSSTable(sstable);
 +                else
 +                    unrepaired.addSSTable(sstable);
 +            }
 +        }
 +        else if (notification instanceof SSTableListChangedNotification)
 +        {
 +            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
 +            Set<SSTableReader> repairedRemoved = new HashSet<>();
 +            Set<SSTableReader> repairedAdded = new HashSet<>();
 +            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
 +            Set<SSTableReader> unrepairedAdded = new HashSet<>();
 +
 +            for (SSTableReader sstable : listChangedNotification.removed)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedRemoved.add(sstable);
 +                else
 +                    unrepairedRemoved.add(sstable);
 +            }
 +            for (SSTableReader sstable : listChangedNotification.added)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedAdded.add(sstable);
 +                else
 +                    unrepairedAdded.add(sstable);
 +            }
 +            if (!repairedRemoved.isEmpty())
 +            {
 +                repaired.replaceSSTables(repairedRemoved, repairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : repairedAdded)
 +                    repaired.addSSTable(sstable);
 +            }
 +
 +            if (!unrepairedRemoved.isEmpty())
 +            {
 +                unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : unrepairedAdded)
 +                    unrepaired.addSSTable(sstable);
 +            }
 +        }
 +        else if (notification instanceof SSTableRepairStatusChanged)
 +        {
 +            for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
 +            {
 +                if (sstable.isRepaired())
 +                {
 +                    unrepaired.removeSSTable(sstable);
 +                    repaired.addSSTable(sstable);
 +                }
 +                else
 +                {
 +                    repaired.removeSSTable(sstable);
 +                    unrepaired.addSSTable(sstable);
 +                }
 +            }
 +        }
 +        else if (notification instanceof SSTableDeletingNotification)
 +        {
 +            SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
 +            if (sstable.isRepaired())
 +                repaired.removeSSTable(sstable);
 +            else
 +                unrepaired.removeSSTable(sstable);
 +        }
 +    }
 +
 +    public void enable()
 +    {
 +        if (repaired != null)
 +            repaired.enable();
 +        if (unrepaired != null)
 +            unrepaired.enable();
 +        // enable this last to make sure the strategies are ready to get calls.
 +        enabled = true;
 +    }
 +
 +    public void disable()
 +    {
 +        // disable this first avoid asking disabled strategies for compaction tasks
 +        enabled = false;
 +        if (repaired != null)
 +            repaired.disable();
 +        if (unrepaired != null)
 +            unrepaired.disable();
 +    }
 +
 +    /**
 +     * Create ISSTableScanner from the given sstables
 +     *
 +     * Delegates the call to the compaction strategies to allow LCS to create a scanner
 +     * @param sstables
 +     * @param range
 +     * @return
 +     */
 +    @SuppressWarnings("resource")
 +    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
 +    {
 +        List<SSTableReader> repairedSSTables = new ArrayList<>();
 +        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable.isRepaired())
 +                repairedSSTables.add(sstable);
 +            else
 +                unrepairedSSTables.add(sstable);
 +        }
 +
 +        Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
 +
 +        for (Range<Token> range : ranges)
 +        {
 +            AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
 +            AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
 +
 +            for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
 +            {
 +                if (!scanners.add(scanner))
 +                    scanner.close();
 +            }
 +        }
 +
 +        return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
 +    }
 +
 +    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
 +    {
 +        return getScanners(sstables, Collections.singleton(null));
 +    }
 +
 +    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
 +    {
 +        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
 +    }
 +
 +    public long getMaxSSTableBytes()
 +    {
 +        return unrepaired.getMaxSSTableBytes();
 +    }
 +
 +    public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
 +    {
 +        return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
 +    }
 +
 +    public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
 +    {
 +        // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
 +        // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
 +        // sstables are marked the compactions are re-enabled
 +        return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
 +        {
 +            @Override
 +            public Collection<AbstractCompactionTask> call() throws Exception
 +            {
 +                synchronized (CompactionStrategyManager.this)
 +                {
 +                    Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
 +                    Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
 +
 +                    if (repairedTasks == null && unrepairedTasks == null)
 +                        return null;
 +
 +                    if (repairedTasks == null)
 +                        return unrepairedTasks;
 +                    if (unrepairedTasks == null)
 +                        return repairedTasks;
 +
 +                    List<AbstractCompactionTask> tasks = new ArrayList<>();
 +                    tasks.addAll(repairedTasks);
 +                    tasks.addAll(unrepairedTasks);
 +                    return tasks;
 +                }
 +            }
 +        }, false, false);
 +    }
 +
 +    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
 +    {
 +        return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
 +    }
 +
 +    public int getEstimatedRemainingTasks()
 +    {
 +        int tasks = 0;
 +        tasks += repaired.getEstimatedRemainingTasks();
 +        tasks += unrepaired.getEstimatedRemainingTasks();
 +
 +        return tasks;
 +    }
 +
 +    public boolean shouldBeEnabled()
 +    {
 +        return params.isEnabled();
 +    }
 +
 +    public String getName()
 +    {
 +        return unrepaired.getName();
 +    }
 +
 +    public List<AbstractCompactionStrategy> getStrategies()
 +    {
 +        return Arrays.asList(repaired, unrepaired);
 +    }
 +
 +    public synchronized void setNewLocalCompactionStrategy(CompactionParams params)
 +    {
 +        logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
 +        setStrategy(params);
 +        if (shouldBeEnabled())
 +            enable();
 +        else
 +            disable();
 +        startup();
 +    }
 +
 +    private void setStrategy(CompactionParams params)
 +    {
 +        if (repaired != null)
 +            repaired.shutdown();
 +        if (unrepaired != null)
 +            unrepaired.shutdown();
 +        repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
 +        unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
 +        this.params = params;
 +    }
 +
 +    public CompactionParams getCompactionParams()
 +    {
 +        return params;
 +    }
 +
 +    public boolean onlyPurgeRepairedTombstones()
 +    {
 +        return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
 +    }
 +
 +    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
 +    {
 +        if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
 +        {
 +            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
 +        }
 +        else
 +        {
 +            return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 1d96324,575c326..be81c80
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -142,11 -138,13 +142,11 @@@ public class CompactionTask extends Abs
              ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel()));
          }
          ssTableLoggerMsg.append("]");
 -        String taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString();
 -        logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg);
  
-         logger.info("Compacting ({}) {}", taskId, ssTableLoggerMsg);
 -        long start = System.nanoTime();
++        logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg);
  
 +        long start = System.nanoTime();
          long totalKeysWritten = 0;
 -
          long estimatedKeys = 0;
          try (CompactionController controller = getCompactionController(transaction.originals()))
          {
@@@ -213,11 -220,11 +213,11 @@@
  
              double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
              long totalSourceRows = 0;
 -            String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
 +            String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
-             logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
+             logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
 -                                      taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
 +                                      taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
-             logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
-             logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
+             logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+             logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
  
              if (offline)
                  Refs.release(Refs.selfRefs(newSStables));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 7fd5717,d90318f..b7bf83f
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@@ -338,9 -338,9 +338,9 @@@ public class LeveledManifes
                  {
                      int nextLevel = getNextLevel(candidates);
                      candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
-                     if (logger.isDebugEnabled())
-                         logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
+                     if (logger.isTraceEnabled())
+                         logger.trace("Compaction candidates for L{} are {}", i, toString(candidates));
 -                    return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategy().getMaxSSTableBytes());
 +                    return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategyManager().getMaxSSTableBytes());
                  }
                  else
                  {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 05f446c,b4125bb..f8a8240
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@@ -80,10 -79,10 +80,10 @@@ public class SizeTieredCompactionStrate
          int minThreshold = cfs.getMinimumCompactionThreshold();
          int maxThreshold = cfs.getMaximumCompactionThreshold();
  
 -        Iterable<SSTableReader> candidates = filterSuspectSSTables(Sets.intersection(cfs.getUncompactingSSTables(), sstables));
 +        Iterable<SSTableReader> candidates = filterSuspectSSTables(filter(cfs.getUncompactingSSTables(), sstables::contains));
  
          List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize);
-         logger.debug("Compaction buckets are {}", buckets);
+         logger.trace("Compaction buckets are {}", buckets);
          updateEstimatedCompactionsByTasks(buckets);
          List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
          if (!mostInteresting.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 07ca3d0,ed07df9..796391c
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@@ -82,10 -81,19 +82,10 @@@ public class SplittingSizeTieredCompact
              }
          }
          ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex);
 -        File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
          long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
          currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
 -        @SuppressWarnings("resource")
 -        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
 -                                                                            currentPartitionsToWrite,
 -                                                                            minRepairedAt,
 -                                                                            cfs.metadata,
 -                                                                            cfs.partitioner,
 -                                                                            new MetadataCollector(allSSTables, cfs.metadata.comparator, 0));
 -
 -        sstableWriter.switchWriter(writer);
 +        switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
-         logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
+         logger.trace("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
      }
  
      @Override
@@@ -100,20 -118,4 +100,20 @@@
          }
          return rie != null;
      }
 -}
 +
 +    public void switchCompactionLocation(Directories.DataDirectory location)
 +    {
 +        long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
 +        @SuppressWarnings("resource")
 +        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
 +                                                    currentPartitionsToWrite,
 +                                                    minRepairedAt,
 +                                                    cfs.metadata,
 +                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
 +                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
 +                                                    txn);
-         logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
++        logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
 +        sstableWriter.switchWriter(writer);
 +
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 59bbc7d,9b52269..83d0f82
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@@ -222,14 -149,9 +222,14 @@@ public class LifecycleTransaction exten
      public Throwable doCommit(Throwable accumulate)
      {
          assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit";
-         logger.debug("Committing update:{}, obsolete:{}", staged.update, staged.obsolete);
 -
+         logger.trace("Committing update:{}, obsolete:{}", staged.update, staged.obsolete);
  
 +        // accumulate must be null if we have been used correctly, so fail immediately if it is not
 +        maybeFail(accumulate);
 +
 +        // transaction log commit failure means we must abort; safe commit is not possible
 +        maybeFail(log.commit(null));
 +
          // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done
          // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size
          // and notification status for the obsolete and new files
@@@ -247,23 -167,18 +247,23 @@@
       */
      public Throwable doAbort(Throwable accumulate)
      {
-         if (logger.isDebugEnabled())
-             logger.debug("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete);
+         if (logger.isTraceEnabled())
+             logger.trace("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete);
  
 +        accumulate = abortObsoletion(obsoletions, accumulate);
 +
          if (logged.isEmpty() && staged.isEmpty())
 -            return accumulate;
 +            return log.abort(accumulate);
  
          // mark obsolete all readers that are not versions of those present in the original set
          Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals);
-         logger.debug("Obsoleting {}", obsolete);
+         logger.trace("Obsoleting {}", obsolete);
 -        // we don't pass the tracker in for the obsoletion, since these readers have never been notified externally
 -        // nor had their size accounting affected
 -        accumulate = markObsolete(null, obsolete, accumulate);
 +
 +        accumulate = prepareForObsoletion(obsolete, log, obsoletions = new ArrayList<>(), accumulate);
 +        // it's safe to abort even if committed, see maybeFail in doCommit() above, in this case it will just report
 +        // a failure to abort, which is useful information to have for debug
 +        accumulate = log.abort(accumulate);
 +        accumulate = markObsolete(obsoletions, accumulate);
  
          // replace all updated readers with a version restored to its original state
          accumulate = tracker.apply(updateLiveSet(logged.update, restoreUpdatedOriginals()), accumulate);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index da6d78d,0000000..78ea0f1
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@@ -1,419 -1,0 +1,419 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.Files;
 +import java.nio.file.NoSuchFileException;
 +import java.util.*;
 +import java.util.concurrent.ConcurrentLinkedQueue;
 +import java.util.concurrent.TimeUnit;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.util.concurrent.Runnables;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LogRecord.Type;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTable;
 +import org.apache.cassandra.io.sstable.SnapshotDeletingTask;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.Ref;
 +import org.apache.cassandra.utils.concurrent.RefCounted;
 +import org.apache.cassandra.utils.concurrent.Transactional;
 +
 +/**
 + * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction,
 + * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent
 + * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also
 + * *requires* that the prepareToCommit() phase only take actions that can be rolled back.
 + *
 + * IMPORTANT: The transaction must complete (commit or abort) before any temporary files are deleted, even though the
 + * txn log file itself will not be deleted until all tracked files are deleted. This is required by FileLister to ensure
 + * a consistent disk state. LifecycleTransaction ensures this requirement, so this class should really never be used
 + * outside of LT. @see FileLister.classifyFiles(TransactionData txn)
 + *
 + * A class that tracks sstable files involved in a transaction across sstables:
 + * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails.
 + *
 + * The transaction log file contains new and old sstables as follows:
 + *
 + * add:[sstable-2][CRC]
 + * remove:[sstable-1,max_update_time,num files][CRC]
 + *
 + * where sstable-2 is a new sstable to be retained if the transaction succeeds and sstable-1 is an old sstable to be
 + * removed. CRC is an incremental CRC of the file content up to this point. For old sstable files we also log the
 + * last update time of all files for the sstable descriptor and a checksum of vital properties such as update times
 + * and file sizes.
 + *
 + * Upon commit we add a final line to the log file:
 + *
 + * commit:[commit_time][CRC]
 + *
 + * When the transaction log is cleaned-up by the TransactionTidier, which happens only after any old sstables have been
 + * osoleted, then any sstable files for old sstables are removed before deleting the transaction log if the transaction
 + * was committed, vice-versa if the transaction was aborted.
 + *
 + * On start-up we look for any transaction log files and repeat the cleanup process described above.
 + *
 + * See CASSANDRA-7066 for full details.
 + */
 +class LogTransaction extends Transactional.AbstractTransactional implements Transactional
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(LogTransaction.class);
 +
 +    /**
 +     * If the format of the lines in the transaction log is wrong or the checksum
 +     * does not match, then we throw this exception.
 +     */
 +    public static final class CorruptTransactionLogException extends RuntimeException
 +    {
 +        public final LogFile file;
 +
 +        public CorruptTransactionLogException(String message, LogFile file)
 +        {
 +            super(message);
 +            this.file = file;
 +        }
 +    }
 +
 +    private final Tracker tracker;
 +    private final LogFile data;
 +    private final Ref<LogTransaction> selfRef;
 +    // Deleting sstables is tricky because the mmapping might not have been finalized yet,
 +    // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
 +    // Additionally, we need to make sure to delete the data file first, so on restart the others
 +    // will be recognized as GCable.
 +    private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>();
 +
 +    LogTransaction(OperationType opType, CFMetaData metadata)
 +    {
 +        this(opType, metadata, null);
 +    }
 +
 +    LogTransaction(OperationType opType, CFMetaData metadata, Tracker tracker)
 +    {
 +        this(opType, new Directories(metadata), tracker);
 +    }
 +
 +    LogTransaction(OperationType opType, Directories directories, Tracker tracker)
 +    {
 +        this(opType, directories.getDirectoryForNewSSTables(), tracker);
 +    }
 +
 +    LogTransaction(OperationType opType, File folder, Tracker tracker)
 +    {
 +        this.tracker = tracker;
 +        int folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath());
 +        this.data = new LogFile(opType, folder, folderDescriptor, UUIDGen.getTimeUUID());
 +        this.selfRef = new Ref<>(this, new TransactionTidier(data, folderDescriptor));
 +
-         if (logger.isDebugEnabled())
-             logger.debug("Created transaction logs with id {}", data.id);
++        if (logger.isTraceEnabled())
++            logger.trace("Created transaction logs with id {}", data.id);
 +    }
 +
 +    /**
 +     * Track a reader as new.
 +     **/
 +    void trackNew(SSTable table)
 +    {
 +        data.add(Type.ADD, table);
 +    }
 +
 +    /**
 +     * Stop tracking a reader as new.
 +     */
 +    void untrackNew(SSTable table)
 +    {
 +        data.remove(Type.ADD, table);
 +    }
 +
 +    /**
 +     * Schedule a reader for deletion as soon as it is fully unreferenced.
 +     */
 +    SSTableTidier obsoleted(SSTableReader reader)
 +    {
 +        if (data.contains(Type.ADD, reader))
 +        {
 +            if (data.contains(Type.REMOVE, reader))
 +                throw new IllegalArgumentException();
 +
 +            return new SSTableTidier(reader, true, this);
 +        }
 +
 +        data.add(Type.REMOVE, reader);
 +
 +        if (tracker != null)
 +            tracker.notifyDeleting(reader);
 +
 +        return new SSTableTidier(reader, false, this);
 +    }
 +
 +    OperationType getType()
 +    {
 +        return data.getType();
 +    }
 +
 +    UUID getId()
 +    {
 +        return data.getId();
 +    }
 +
 +    @VisibleForTesting
 +    String getDataFolder()
 +    {
 +        return data.folder.getPath();
 +    }
 +
 +    @VisibleForTesting
 +    LogFile getLogFile()
 +    {
 +        return data;
 +    }
 +
 +    static void delete(File file)
 +    {
 +        try
 +        {
-             if (logger.isDebugEnabled())
-                 logger.debug("Deleting {}", file);
++            if (logger.isTraceEnabled())
++                logger.trace("Deleting {}", file);
 +
 +            Files.delete(file.toPath());
 +        }
 +        catch (NoSuchFileException e)
 +        {
 +            logger.error("Unable to delete {} as it does not exist", file);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.error("Unable to delete {}", file, e);
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * The transaction tidier.
 +     *
 +     * When the transaction reference is fully released we try to delete all the obsolete files
 +     * depending on the transaction result, as well as the transaction log file.
 +     */
 +    private static class TransactionTidier implements RefCounted.Tidy, Runnable
 +    {
 +        private final LogFile data;
 +        private final int folderDescriptor;
 +
 +        TransactionTidier(LogFile data, int folderDescriptor)
 +        {
 +            this.data = data;
 +            this.folderDescriptor = folderDescriptor;
 +        }
 +
 +        public void tidy() throws Exception
 +        {
 +            run();
 +        }
 +
 +        public String name()
 +        {
 +            return data.toString();
 +        }
 +
 +        public void run()
 +        {
-             if (logger.isDebugEnabled())
-                 logger.debug("Removing files for transaction {}", name());
++            if (logger.isTraceEnabled())
++                logger.trace("Removing files for transaction {}", name());
 +
 +            assert data.completed() : "Expected a completed transaction: " + data;
 +
 +            Throwable err = data.removeUnfinishedLeftovers(null);
 +
 +            if (err != null)
 +            {
 +                logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err);
 +                failedDeletions.add(this);
 +            }
 +            else
 +            {
-                 if (logger.isDebugEnabled())
-                     logger.debug("Closing file transaction {}", name());
++                if (logger.isTraceEnabled())
++                    logger.trace("Closing file transaction {}", name());
 +                CLibrary.tryCloseFD(folderDescriptor);
 +            }
 +        }
 +    }
 +
 +    static class Obsoletion
 +    {
 +        final SSTableReader reader;
 +        final SSTableTidier tidier;
 +
 +        Obsoletion(SSTableReader reader, SSTableTidier tidier)
 +        {
 +            this.reader = reader;
 +            this.tidier = tidier;
 +        }
 +    }
 +
 +    /**
 +     * The SSTableReader tidier. When a reader is fully released and no longer referenced
 +     * by any one, we run this. It keeps a reference to the parent transaction and releases
 +     * it when done, so that the final transaction cleanup can run when all obsolete readers
 +     * are released.
 +     */
 +    public static class SSTableTidier implements Runnable
 +    {
 +        // must not retain a reference to the SSTableReader, else leak detection cannot kick in
 +        private final Descriptor desc;
 +        private final long sizeOnDisk;
 +        private final Tracker tracker;
 +        private final boolean wasNew;
 +        private final Ref<LogTransaction> parentRef;
 +
 +        public SSTableTidier(SSTableReader referent, boolean wasNew, LogTransaction parent)
 +        {
 +            this.desc = referent.descriptor;
 +            this.sizeOnDisk = referent.bytesOnDisk();
 +            this.tracker = parent.tracker;
 +            this.wasNew = wasNew;
 +            this.parentRef = parent.selfRef.tryRef();
 +        }
 +
 +        public void run()
 +        {
 +            SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +
 +            try
 +            {
 +                // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier
 +                File datafile = new File(desc.filenameFor(Component.DATA));
 +
 +                delete(datafile);
 +                // let the remainder be cleaned up by delete
 +                SSTable.delete(desc, SSTable.discoverComponentsFor(desc));
 +            }
 +            catch (Throwable t)
 +            {
 +                logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc);
 +                failedDeletions.add(this);
 +                return;
 +            }
 +
 +            if (tracker != null && tracker.cfstore != null && !wasNew)
 +                tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk);
 +
 +            // release the referent to the parent so that the all transaction files can be released
 +            parentRef.release();
 +        }
 +
 +        public void abort()
 +        {
 +            parentRef.release();
 +        }
 +    }
 +
 +
 +    static void rescheduleFailedDeletions()
 +    {
 +        Runnable task;
 +        while ( null != (task = failedDeletions.poll()))
 +            ScheduledExecutors.nonPeriodicTasks.submit(task);
 +
 +        // On Windows, snapshots cannot be deleted so long as a segment of the root element is memory-mapped in NTFS.
 +        SnapshotDeletingTask.rescheduleFailedTasks();
 +    }
 +
 +    static void waitForDeletions()
 +    {
 +        FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(Runnables.doNothing(), 0, TimeUnit.MILLISECONDS));
 +    }
 +
 +    @VisibleForTesting
 +    Throwable complete(Throwable accumulate)
 +    {
 +        try
 +        {
 +            accumulate = selfRef.ensureReleased(accumulate);
 +            return accumulate;
 +        }
 +        catch (Throwable t)
 +        {
 +            logger.error("Failed to complete file transaction {}", getId(), t);
 +            return Throwables.merge(accumulate, t);
 +        }
 +    }
 +
 +    protected Throwable doCommit(Throwable accumulate)
 +    {
 +        data.commit();
 +        return complete(accumulate);
 +    }
 +
 +    protected Throwable doAbort(Throwable accumulate)
 +    {
 +        data.abort();
 +        return complete(accumulate);
 +    }
 +
 +    protected void doPrepare() { }
 +
 +    /**
 +     * Called on startup to scan existing folders for any unfinished leftovers of
 +     * operations that were ongoing when the process exited. Also called by the standalone
 +     * sstableutil tool when the cleanup option is specified, @see StandaloneSSTableUtil.
 +     *
 +     */
 +    static void removeUnfinishedLeftovers(CFMetaData metadata)
 +    {
 +        for (File dir : new Directories(metadata).getCFDirectories())
 +        {
 +            int folderDescriptor = CLibrary.tryOpenDirectory(dir.getPath());
 +            try
 +            {
 +                File[] logs = dir.listFiles(LogFile::isLogFile);
 +
 +                for (File log : logs)
 +                {
 +                    LogFile data = LogFile.make(log, folderDescriptor);
 +                    data.readRecords();
 +                    if (data.verify())
 +                    {
 +                        Throwable failure = data.removeUnfinishedLeftovers(null);
 +                        if (failure != null)
 +                            logger.error("Failed to remove unfinished transaction leftovers for log {}", log, failure);
 +                    }
 +                    else
 +                    {
 +                        logger.error("Unexpected disk state: failed to read transaction log {}", log);
 +                    }
 +                }
 +            }
 +            finally
 +            {
 +                CLibrary.tryCloseFD(folderDescriptor);
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/BootStrapper.java
index 31fda34,a6b1ad7..c0f0402
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@@ -178,34 -181,6 +178,34 @@@ public class BootStrapper extends Progr
          return getRandomTokens(metadata, numTokens);
      }
  
 +    private static Collection<Token> getSpecifiedTokens(final TokenMetadata metadata,
 +                                                        Collection<String> initialTokens)
 +    {
-         logger.debug("tokens manually specified as {}",  initialTokens);
++        logger.trace("tokens manually specified as {}",  initialTokens);
 +        List<Token> tokens = new ArrayList<>(initialTokens.size());
 +        for (String tokenString : initialTokens)
 +        {
 +            Token token = metadata.partitioner.getTokenFactory().fromString(tokenString);
 +            if (metadata.getEndpoint(token) != null)
 +                throw new ConfigurationException("Bootstrapping to existing token " + tokenString + " is not allowed (decommission/removenode the old node first).");
 +            tokens.add(token);
 +        }
 +        return tokens;
 +    }
 +
 +    static Collection<Token> allocateTokens(final TokenMetadata metadata,
 +                                            InetAddress address,
 +                                            String allocationKeyspace,
 +                                            int numTokens)
 +    {
 +        Keyspace ks = Keyspace.open(allocationKeyspace);
 +        if (ks == null)
 +            throw new ConfigurationException("Problem opening token allocation keyspace " + allocationKeyspace);
 +        AbstractReplicationStrategy rs = ks.getReplicationStrategy();
 +
 +        return TokenAllocation.allocateTokens(metadata, rs, address, numTokens);
 +    }
 +
      public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
      {
          Set<Token> tokens = new HashSet<>(numTokens);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index 3f74c33,36da92d..ec5167b
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@@ -98,238 -78,4 +98,238 @@@ public class CqlInputFormat extends org
          return new CqlRecordReader();
      }
  
 +    protected void validateConfiguration(Configuration conf)
 +    {
 +        if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null)
 +        {
 +            throw new UnsupportedOperationException("you must set the keyspace and table with setInputColumnFamily()");
 +        }
 +        if (ConfigHelper.getInputInitialAddress(conf) == null)
 +            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress");
 +        if (ConfigHelper.getInputPartitioner(conf) == null)
 +            throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
 +    }
 +
 +    public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(JobContext context) throws IOException
 +    {
 +        Configuration conf = HadoopCompat.getConfiguration(context);
 +
 +        validateConfiguration(conf);
 +
 +        keyspace = ConfigHelper.getInputKeyspace(conf);
 +        cfName = ConfigHelper.getInputColumnFamily(conf);
 +        partitioner = ConfigHelper.getInputPartitioner(conf);
-         logger.debug("partitioner is {}", partitioner);
++        logger.trace("partitioner is {}", partitioner);
 +
 +        // canonical ranges and nodes holding replicas
 +        Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace);
 +
 +        // canonical ranges, split into pieces, fetching the splits in parallel
 +        ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
 +        List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
 +
 +        try
 +        {
 +            List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>();
 +            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
 +            Range<Token> jobRange = null;
 +            if (jobKeyRange != null)
 +            {
 +                if (jobKeyRange.start_key != null)
 +                {
 +                    if (!partitioner.preservesOrder())
 +                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
 +                    if (jobKeyRange.start_token != null)
 +                        throw new IllegalArgumentException("only start_key supported");
 +                    if (jobKeyRange.end_token != null)
 +                        throw new IllegalArgumentException("only start_key supported");
 +                    jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key),
 +                                           partitioner.getToken(jobKeyRange.end_key));
 +                }
 +                else if (jobKeyRange.start_token != null)
 +                {
 +                    jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
 +                                           partitioner.getTokenFactory().fromString(jobKeyRange.end_token));
 +                }
 +                else
 +                {
 +                    logger.warn("ignoring jobKeyRange specified without start_key or start_token");
 +                }
 +            }
 +
 +            session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
 +            Metadata metadata = session.getCluster().getMetadata();
 +
 +            for (TokenRange range : masterRangeNodes.keySet())
 +            {
 +                if (jobRange == null)
 +                {
 +                    // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
 +                    splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf)));
 +                }
 +                else
 +                {
 +                    TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange);
 +                    if (range.intersects(jobTokenRange))
 +                    {
 +                        for (TokenRange intersection: range.intersectWith(jobTokenRange))
 +                        {
 +                            // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
 +                            splitfutures.add(executor.submit(new SplitCallable(intersection,  masterRangeNodes.get(range), conf)));
 +                        }
 +                    }
 +                }
 +            }
 +
 +            // wait until we have all the results back
 +            for (Future<List<org.apache.hadoop.mapreduce.InputSplit>> futureInputSplits : splitfutures)
 +            {
 +                try
 +                {
 +                    splits.addAll(futureInputSplits.get());
 +                }
 +                catch (Exception e)
 +                {
 +                    throw new IOException("Could not get input splits", e);
 +                }
 +            }
 +        }
 +        finally
 +        {
 +            executor.shutdownNow();
 +        }
 +
 +        assert splits.size() > 0;
 +        Collections.shuffle(splits, new Random(System.nanoTime()));
 +        return splits;
 +    }
 +
 +    private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
 +    {
 +        return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
 +                metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
 +    }
 +
 +    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
 +    {
 +        int splitSize = ConfigHelper.getInputSplitSize(conf);
 +        try
 +        {
 +            return describeSplits(keyspace, cfName, range, splitSize);
 +        }
 +        catch (Exception e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
 +    {
 +        try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect())
 +        {
 +            Map<TokenRange, Set<Host>> map = new HashMap<>();
 +            Metadata metadata = session.getCluster().getMetadata();
 +            for (TokenRange tokenRange : metadata.getTokenRanges())
 +                map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
 +            return map;
 +        }
 +    }
 +
 +    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
 +    {
 +        String query = String.format("SELECT mean_partition_size, partitions_count " +
 +                                     "FROM %s.%s " +
 +                                     "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
 +                                     SystemKeyspace.NAME,
 +                                     SystemKeyspace.SIZE_ESTIMATES);
 +
 +        ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString());
 +
 +        Row row = resultSet.one();
 +        // If we have no data on this split, return the full split i.e., do not sub-split
 +        // Assume smallest granularity of partition count available from CASSANDRA-7688
 +        if (row == null)
 +        {
 +            Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
 +            wrappedTokenRange.put(tokenRange, (long) 128);
 +            return wrappedTokenRange;
 +        }
 +
 +        long meanPartitionSize = row.getLong("mean_partition_size");
 +        long partitionCount = row.getLong("partitions_count");
 +
 +        int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize);
 +        List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
 +        Map<TokenRange, Long> rangesWithLength = new HashMap<>();
 +        for (TokenRange range : splitRanges)
 +            rangesWithLength.put(range, partitionCount/splitCount);
 +
 +        return rangesWithLength;
 +    }
 +
 +    // Old Hadoop API
 +    public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
 +    {
 +        TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());
 +        List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac);
 +        InputSplit[] oldInputSplits = new InputSplit[newInputSplits.size()];
 +        for (int i = 0; i < newInputSplits.size(); i++)
 +            oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i);
 +        return oldInputSplits;
 +    }
 +
 +    /**
 +     * Gets a token tokenRange and splits it up according to the suggested
 +     * size into input splits that Hadoop can use.
 +     */
 +    class SplitCallable implements Callable<List<org.apache.hadoop.mapreduce.InputSplit>>
 +    {
 +
 +        private final TokenRange tokenRange;
 +        private final Set<Host> hosts;
 +        private final Configuration conf;
 +
 +        public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf)
 +        {
 +            this.tokenRange = tr;
 +            this.hosts = hosts;
 +            this.conf = conf;
 +        }
 +
 +        public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception
 +        {
 +            ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
 +            Map<TokenRange, Long> subSplits;
 +            subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
 +            // turn the sub-ranges into InputSplits
 +            String[] endpoints = new String[hosts.size()];
 +
 +            // hadoop needs hostname, not ip
 +            int endpointIndex = 0;
 +            for (Host endpoint : hosts)
 +                endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
 +
 +            boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner;
 +
 +            for (TokenRange subSplit : subSplits.keySet())
 +            {
 +                List<TokenRange> ranges = subSplit.unwrap();
 +                for (TokenRange subrange : ranges)
 +                {
 +                    ColumnFamilySplit split =
 +                            new ColumnFamilySplit(
 +                                    partitionerIsOpp ?
 +                                            subrange.getStart().toString().substring(2) : subrange.getStart().toString(),
 +                                    partitionerIsOpp ?
 +                                            subrange.getEnd().toString().substring(2) : subrange.getStart().toString(),
 +                                    subSplits.get(subSplit),
 +                                    endpoints);
 +
-                     logger.debug("adding {}", split);
++                    logger.trace("adding {}", split);
 +                    splits.add(split);
 +                }
 +            }
 +            return splits;
 +        }
 +    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hints/HintVerbHandler.java
index 458d01f,0000000..b2c7b6a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@@ -1,89 -1,0 +1,89 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.cassandra.hints;
 +
 +import java.net.InetAddress;
 +import java.util.UUID;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.net.IVerbHandler;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.serializers.MarshalException;
 +import org.apache.cassandra.service.StorageService;
 +
 +/**
 + * Verb handler used both for hint dispatch and streaming.
 + *
 + * With the non-sstable format, we cannot just stream hint sstables on node decommission. So sometimes, at decommission
 + * time, we might have to stream hints to a non-owning host (say, if the owning host B is down during decommission of host A).
 + * In that case the handler just stores the received hint in its local hint store.
 + */
 +public final class HintVerbHandler implements IVerbHandler<HintMessage>
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(HintVerbHandler.class);
 +
 +    public void doVerb(MessageIn<HintMessage> message, int id)
 +    {
 +        UUID hostId = message.payload.hostId;
 +        Hint hint = message.payload.hint;
 +
 +        // If we see an unknown table id, it means the table, or one of the tables in the mutation, had been dropped.
 +        // In that case there is nothing we can really do, or should do, other than log it go on.
 +        // This will *not* happen due to a not-yet-seen table, because we don't transfer hints unless there
 +        // is schema agreement between the sender and the receiver.
 +        if (hint == null)
 +        {
-             logger.debug("Failed to decode and apply a hint for {} - table with id {} is unknown",
++            logger.trace("Failed to decode and apply a hint for {} - table with id {} is unknown",
 +                         hostId,
 +                         message.payload.unknownTableID);
 +            reply(id, message.from);
 +            return;
 +        }
 +
 +        // We must perform validation before applying the hint, and there is no other place to do it other than here.
 +        try
 +        {
 +            hint.mutation.getPartitionUpdates().forEach(PartitionUpdate::validate);
 +        }
 +        catch (MarshalException e)
 +        {
 +            logger.warn("Failed to validate a hint for {} (table id {}) - skipped", hostId);
 +            reply(id, message.from);
 +            return;
 +        }
 +
 +        // Apply the hint if this node is the destination, store for future dispatch if this node isn't (must have gotten
 +        // it from a decommissioned node that had streamed it before going out).
 +        if (hostId.equals(StorageService.instance.getLocalHostUUID()))
 +            hint.apply();
 +        else
 +            HintsService.instance.write(hostId, hint);
 +
 +        reply(id, message.from);
 +    }
 +
 +    private static void reply(int id, InetAddress to)
 +    {
 +        MessagingService.instance().sendReply(HintResponse.message, id, to);
 +    }
 +}


Mime
View raw message