cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c...@apache.org
Subject cassandra git commit: Enhanced Compaction Logging
Date Mon, 02 May 2016 19:04:13 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 307890363 -> e16d8a7a6


Enhanced Compaction Logging

patch by Carl Yeksigian; reviewed by Marcus Eriksson for CASSANDRA-10805


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e16d8a7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e16d8a7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e16d8a7a

Branch: refs/heads/trunk
Commit: e16d8a7a667d50271a183a95be894126cb2a5414
Parents: 3078903
Author: Carl Yeksigian <carl@apache.org>
Authored: Mon May 2 15:01:39 2016 -0400
Committer: Carl Yeksigian <carl@apache.org>
Committed: Mon May 2 15:03:38 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  13 +-
 .../compaction/AbstractCompactionStrategy.java  |  23 +-
 .../db/compaction/CompactionLogger.java         | 342 +++++++++++++++++++
 .../compaction/CompactionStrategyManager.java   |  39 ++-
 .../cassandra/db/compaction/CompactionTask.java |   2 +
 .../DateTieredCompactionStrategy.java           |  31 ++
 .../DateTieredCompactionStrategyOptions.java    |   8 +-
 .../compaction/LeveledCompactionStrategy.java   |  27 +-
 .../SizeTieredCompactionStrategy.java           |   1 +
 10 files changed, 475 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c802031..1a3069c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Enhanced Compaction Logging (CASSANDRA-10805)
  * Make prepared statement cache size configurable (CASSANDRA-11555)
  * Integrated JMX authentication and authorization (CASSANDRA-10091)
  * Add units to stress ouput (CASSANDRA-11352)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b47cf85..6b841c2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1163,12 +1163,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
             memtable.cfs.replaceFlushed(memtable, sstables);
             reclaim(memtable);
-                logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
-                             sstables,
-                             sstables.size(),
-                             FBUtilities.prettyPrintMemory(totalBytesOnDisk),
-                             FBUtilities.prettyPrintMemory(maxBytesOnDisk),
-                             FBUtilities.prettyPrintMemory(minBytesOnDisk));
+            memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
+            logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
+                         sstables,
+                         sstables.size(),
+                         FBUtilities.prettyPrintMemory(totalBytesOnDisk),
+                         FBUtilities.prettyPrintMemory(maxBytesOnDisk),
+                         FBUtilities.prettyPrintMemory(minBytesOnDisk));
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 40f0ce2..668bc51 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -63,11 +63,13 @@ public abstract class AbstractCompactionStrategy
     // minimum interval needed to perform tombstone removal compaction in seconds, default
86400 or 1 day.
     protected static final long DEFAULT_TOMBSTONE_COMPACTION_INTERVAL = 86400;
     protected static final boolean DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION = false;
+    protected static final boolean DEFAULT_LOG_ALL_OPTION = false;
 
     protected static final String TOMBSTONE_THRESHOLD_OPTION = "tombstone_threshold";
     protected static final String TOMBSTONE_COMPACTION_INTERVAL_OPTION = "tombstone_compaction_interval";
     // disable range overlap check when deciding if an SSTable is candidate for tombstone
compaction (CASSANDRA-6563)
     protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction";
+    protected static final String LOG_ALL_OPTION = "log_all";
     protected static final String COMPACTION_ENABLED = "enabled";
     public static final String ONLY_PURGE_REPAIRED_TOMBSTONES = "only_purge_repaired_tombstones";
 
@@ -78,6 +80,7 @@ public abstract class AbstractCompactionStrategy
     protected long tombstoneCompactionInterval;
     protected boolean uncheckedTombstoneCompaction;
     protected boolean disableTombstoneCompactions = false;
+    protected boolean logAll = true;
 
     private final Directories directories;
 
@@ -110,6 +113,8 @@ public abstract class AbstractCompactionStrategy
             tombstoneCompactionInterval = optionValue == null ? DEFAULT_TOMBSTONE_COMPACTION_INTERVAL
: Long.parseLong(optionValue);
             optionValue = options.get(UNCHECKED_TOMBSTONE_COMPACTION_OPTION);
             uncheckedTombstoneCompaction = optionValue == null ? DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION
: Boolean.parseBoolean(optionValue);
+            optionValue = options.get(LOG_ALL_OPTION);
+            logAll = optionValue == null ? DEFAULT_LOG_ALL_OPTION : Boolean.parseBoolean(optionValue);
             if (!shouldBeEnabled())
                 this.disable();
         }
@@ -463,7 +468,16 @@ public abstract class AbstractCompactionStrategy
         if (unchecked != null)
         {
             if (!unchecked.equalsIgnoreCase("true") && !unchecked.equalsIgnoreCase("false"))
-                throw new ConfigurationException(String.format("'%s' should be either 'true'
or 'false', not '%s'",UNCHECKED_TOMBSTONE_COMPACTION_OPTION, unchecked));
+                throw new ConfigurationException(String.format("'%s' should be either 'true'
or 'false', not '%s'", UNCHECKED_TOMBSTONE_COMPACTION_OPTION, unchecked));
+        }
+
+        String logAll = options.get(LOG_ALL_OPTION);
+        if (logAll != null)
+        {
+            if (!logAll.equalsIgnoreCase("true") && !logAll.equalsIgnoreCase("false"))
+            {
+                throw new ConfigurationException(String.format("'%s' should either be 'true'
or 'false', not %s", LOG_ALL_OPTION, logAll));
+            }
         }
 
         String compactionEnabled = options.get(COMPACTION_ENABLED);
@@ -474,10 +488,12 @@ public abstract class AbstractCompactionStrategy
                 throw new ConfigurationException(String.format("enabled should either be
'true' or 'false', not %s", compactionEnabled));
             }
         }
+
         Map<String, String> uncheckedOptions = new HashMap<String, String>(options);
         uncheckedOptions.remove(TOMBSTONE_THRESHOLD_OPTION);
         uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
         uncheckedOptions.remove(UNCHECKED_TOMBSTONE_COMPACTION_OPTION);
+        uncheckedOptions.remove(LOG_ALL_OPTION);
         uncheckedOptions.remove(COMPACTION_ENABLED);
         uncheckedOptions.remove(ONLY_PURGE_REPAIRED_TOMBSTONES);
         return uncheckedOptions;
@@ -521,6 +537,11 @@ public abstract class AbstractCompactionStrategy
         return groupedSSTables;
     }
 
+    public CompactionLogger.Strategy strategyLogger()
+    {
+        return CompactionLogger.Strategy.none;
+    }
+
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
                                                        long keyCount,
                                                        long repairedAt,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java b/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java
new file mode 100644
index 0000000..16a7f2a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionLogger.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.file.*;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
+
+public class CompactionLogger
+{
+    public interface Strategy
+    {
+        JsonNode sstable(SSTableReader sstable);
+
+        JsonNode options();
+
+        static Strategy none = new Strategy()
+        {
+            public JsonNode sstable(SSTableReader sstable)
+            {
+                return null;
+            }
+
+            public JsonNode options()
+            {
+                return null;
+            }
+        };
+    }
+
+    /**
+     * This will produce the compaction strategy's starting information.
+     */
+    public interface StrategySummary
+    {
+        JsonNode getSummary();
+    }
+
+    /**
+     * This is an interface to allow writing to a different interface.
+     */
+    public interface Writer
+    {
+        /**
+         * This is used when we are already trying to write out the start of a
+         * @param statement This should be written out to the medium capturing the logs
+         * @param tag       This is an identifier for a strategy; each strategy should have
a distinct Object
+         */
+        void writeStart(JsonNode statement, Object tag);
+
+        /**
+         * @param statement This should be written out to the medium capturing the logs
+         * @param summary   This can be used when a tag is not recognized by this writer;
this can be because the file
+         *                  has been rolled, or otherwise the writer had to start over
+         * @param tag       This is an identifier for a strategy; each strategy should have
a distinct Object
+         */
+        void write(JsonNode statement, StrategySummary summary, Object tag);
+    }
+
+    private interface CompactionStrategyAndTableFunction
+    {
+        JsonNode apply(AbstractCompactionStrategy strategy, SSTableReader sstable);
+    }
+
+    private static final JsonNodeFactory json = JsonNodeFactory.instance;
+    private static final Logger logger = LoggerFactory.getLogger(CompactionLogger.class);
+    private static final Writer serializer = new CompactionLogSerializer();
+    private final ColumnFamilyStore cfs;
+    private final CompactionStrategyManager csm;
+    private final AtomicInteger identifier = new AtomicInteger(0);
+    private final Map<AbstractCompactionStrategy, String> compactionStrategyMapping
= new ConcurrentHashMap<>();
+    private final AtomicBoolean enabled = new AtomicBoolean(false);
+
+    public CompactionLogger(ColumnFamilyStore cfs, CompactionStrategyManager csm)
+    {
+        this.csm = csm;
+        this.cfs = cfs;
+    }
+
+    private void forEach(Consumer<AbstractCompactionStrategy> consumer)
+    {
+        csm.getStrategies()
+           .forEach(l -> l.forEach(consumer));
+    }
+
+    private ArrayNode compactionStrategyMap(Function<AbstractCompactionStrategy, JsonNode>
select)
+    {
+        ArrayNode node = json.arrayNode();
+        forEach(acs -> node.add(select.apply(acs)));
+        return node;
+    }
+
+    private ArrayNode sstableMap(Collection<SSTableReader> sstables, CompactionStrategyAndTableFunction
csatf)
+    {
+        ArrayNode node = json.arrayNode();
+        sstables.forEach(t -> node.add(csatf.apply(csm.getCompactionStrategyFor(t), t)));
+        return node;
+    }
+
+    private String getId(AbstractCompactionStrategy strategy)
+    {
+        return compactionStrategyMapping.computeIfAbsent(strategy, s -> String.valueOf(identifier.getAndIncrement()));
+    }
+
+    private JsonNode formatSSTables(AbstractCompactionStrategy strategy)
+    {
+        ArrayNode node = json.arrayNode();
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+        {
+            if (csm.getCompactionStrategyFor(sstable) == strategy)
+                node.add(formatSSTable(strategy, sstable));
+        }
+        return node;
+    }
+
+    private JsonNode formatSSTable(AbstractCompactionStrategy strategy, SSTableReader sstable)
+    {
+        ObjectNode node = json.objectNode();
+        node.put("generation", sstable.descriptor.generation);
+        node.put("version", sstable.descriptor.version.getVersion());
+        node.put("size", sstable.onDiskLength());
+        JsonNode logResult = strategy.strategyLogger().sstable(sstable);
+        if (logResult != null)
+            node.put("details", logResult);
+        return node;
+    }
+
+    private JsonNode startStrategy(AbstractCompactionStrategy strategy)
+    {
+        ObjectNode node = json.objectNode();
+        node.put("strategyId", getId(strategy));
+        node.put("type", strategy.getName());
+        node.put("tables", formatSSTables(strategy));
+        node.put("repaired", csm.isRepaired(strategy));
+        List<String> folders = csm.getStrategyFolders(strategy);
+        ArrayNode folderNode = json.arrayNode();
+        for (String folder : folders)
+        {
+            folderNode.add(folder);
+        }
+        node.put("folders", folderNode);
+
+        JsonNode logResult = strategy.strategyLogger().options();
+        if (logResult != null)
+            node.put("options", logResult);
+        return node;
+    }
+
+    private JsonNode shutdownStrategy(AbstractCompactionStrategy strategy)
+    {
+        ObjectNode node = json.objectNode();
+        node.put("strategyId", getId(strategy));
+        return node;
+    }
+
+    private JsonNode describeSSTable(AbstractCompactionStrategy strategy, SSTableReader sstable)
+    {
+        ObjectNode node = json.objectNode();
+        node.put("strategyId", getId(strategy));
+        node.put("table", formatSSTable(strategy, sstable));
+        return node;
+    }
+
+    private void describeStrategy(ObjectNode node)
+    {
+        node.put("keyspace", cfs.keyspace.getName());
+        node.put("table", cfs.getTableName());
+        node.put("time", System.currentTimeMillis());
+    }
+
+    private JsonNode startStrategies()
+    {
+        ObjectNode node = json.objectNode();
+        node.put("type", "enable");
+        describeStrategy(node);
+        node.put("strategies", compactionStrategyMap(this::startStrategy));
+        return node;
+    }
+
+    public void enable()
+    {
+        if (enabled.compareAndSet(false, true))
+        {
+            serializer.writeStart(startStrategies(), this);
+        }
+    }
+
+    public void disable()
+    {
+        if (enabled.compareAndSet(true, false))
+        {
+            ObjectNode node = json.objectNode();
+            node.put("type", "disable");
+            describeStrategy(node);
+            node.put("strategies", compactionStrategyMap(this::shutdownStrategy));
+            serializer.write(node, this::startStrategies, this);
+        }
+    }
+
+    public void flush(Collection<SSTableReader> sstables)
+    {
+        if (enabled.get())
+        {
+            ObjectNode node = json.objectNode();
+            node.put("type", "flush");
+            describeStrategy(node);
+            node.put("tables", sstableMap(sstables, this::describeSSTable));
+            serializer.write(node, this::startStrategies, this);
+        }
+    }
+
+    public void compaction(long startTime, Collection<SSTableReader> input, long endTime,
Collection<SSTableReader> output)
+    {
+        if (enabled.get())
+        {
+            ObjectNode node = json.objectNode();
+            node.put("type", "compaction");
+            describeStrategy(node);
+            node.put("start", String.valueOf(startTime));
+            node.put("end", String.valueOf(endTime));
+            node.put("input", sstableMap(input, this::describeSSTable));
+            node.put("output", sstableMap(output, this::describeSSTable));
+            serializer.write(node, this::startStrategies, this);
+        }
+    }
+
+    public void pending(AbstractCompactionStrategy strategy, int remaining)
+    {
+        if (remaining != 0 && enabled.get())
+        {
+            ObjectNode node = json.objectNode();
+            node.put("type", "pending");
+            describeStrategy(node);
+            node.put("strategyId", getId(strategy));
+            node.put("pending", remaining);
+            serializer.write(node, this::startStrategies, this);
+        }
+    }
+
+    private static class CompactionLogSerializer implements Writer
+    {
+        private static final String logDirectory = System.getProperty("cassandra.logdir",
".");
+        private final ExecutorService loggerService = Executors.newFixedThreadPool(1);
+        // This is only accessed on the logger service thread, so it does not need to be
thread safe
+        private final Set<Object> rolled = new HashSet<>();
+        private OutputStreamWriter stream;
+
+        private static OutputStreamWriter createStream() throws IOException
+        {
+            int count = 0;
+            Path compactionLog = Paths.get(logDirectory, "compaction.log");
+            if (Files.exists(compactionLog))
+            {
+                Path tryPath = compactionLog;
+                while (Files.exists(tryPath))
+                {
+                    tryPath = Paths.get(logDirectory, String.format("compaction-%d.log",
count++));
+                }
+                Files.move(compactionLog, tryPath);
+            }
+
+            return new OutputStreamWriter(Files.newOutputStream(compactionLog, StandardOpenOption.CREATE_NEW,
StandardOpenOption.WRITE));
+        }
+
+        private void writeLocal(String toWrite)
+        {
+            try
+            {
+                if (stream == null)
+                    stream = createStream();
+                stream.write(toWrite);
+                stream.flush();
+            }
+            catch (IOException ioe)
+            {
+                // We'll drop the change and log the error to the logger.
+                NoSpamLogger.log(logger, NoSpamLogger.Level.ERROR, 1, TimeUnit.MINUTES,
+                                 "Could not write to the log file: {}", ioe);
+            }
+
+        }
+
+        public void writeStart(JsonNode statement, Object tag)
+        {
+            final String toWrite = statement.toString() + System.lineSeparator();
+            loggerService.execute(() -> {
+                rolled.add(tag);
+                writeLocal(toWrite);
+            });
+        }
+
+        public void write(JsonNode statement, StrategySummary summary, Object tag)
+        {
+            final String toWrite = statement.toString() + System.lineSeparator();
+            loggerService.execute(() -> {
+                if (!rolled.contains(tag))
+                {
+                    writeLocal(summary.getSummary().toString() + System.lineSeparator());
+                    rolled.add(tag);
+                }
+                writeLocal(toWrite);
+            });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index be861e1..4d93294 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -22,6 +22,7 @@ import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
@@ -61,6 +62,7 @@ import org.apache.cassandra.service.StorageService;
 public class CompactionStrategyManager implements INotificationConsumer
 {
     private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
+    public final CompactionLogger compactionLogger;
     private final ColumnFamilyStore cfs;
     private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
     private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
@@ -86,6 +88,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         cfs.getTracker().subscribe(this);
         logger.trace("{} subscribed to the data tracker.", this);
         this.cfs = cfs;
+        this.compactionLogger = new CompactionLogger(cfs, this);
         reload(cfs.metadata);
         params = cfs.metadata.params.compaction;
         locations = getDirectories().getWriteableLocations();
@@ -162,6 +165,10 @@ public class CompactionStrategyManager implements INotificationConsumer
         {
             writeLock.unlock();
         }
+        repaired.forEach(AbstractCompactionStrategy::startup);
+        unrepaired.forEach(AbstractCompactionStrategy::startup);
+        if (Stream.concat(repaired.stream(), unrepaired.stream()).anyMatch(cs -> cs.logAll))
+            compactionLogger.enable();
     }
 
     /**
@@ -171,7 +178,7 @@ public class CompactionStrategyManager implements INotificationConsumer
      * @param sstable
      * @return
      */
-    private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+    public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
     {
         int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
         readLock.lock();
@@ -234,6 +241,7 @@ public class CompactionStrategyManager implements INotificationConsumer
             isActive = false;
             repaired.forEach(AbstractCompactionStrategy::shutdown);
             unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+            compactionLogger.disable();
         }
         finally
         {
@@ -847,4 +855,33 @@ public class CompactionStrategyManager implements INotificationConsumer
             readLock.unlock();
         }
     }
+
+    public boolean isRepaired(AbstractCompactionStrategy strategy)
+    {
+        return repaired.contains(strategy);
+    }
+
+    public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
+    {
+        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+        if (cfs.getPartitioner().splitter().isPresent())
+        {
+            int unrepairedIndex = unrepaired.indexOf(strategy);
+            if (unrepairedIndex > 0)
+            {
+                return Collections.singletonList(locations[unrepairedIndex].location.getAbsolutePath());
+            }
+            int repairedIndex = repaired.indexOf(strategy);
+            if (repairedIndex > 0)
+            {
+                return Collections.singletonList(locations[repairedIndex].location.getAbsolutePath());
+            }
+        }
+        List<String> folders = new ArrayList<>(locations.length);
+        for (Directories.DataDirectory location : locations)
+        {
+            folders.add(location.location.getAbsolutePath());
+        }
+        return folders;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 1465ba4..5df91fd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -150,6 +150,7 @@ public class CompactionTask extends AbstractCompactionTask
         logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg);
 
         long start = System.nanoTime();
+        long startTime = System.currentTimeMillis();
         long totalKeysWritten = 0;
         long estimatedKeys = 0;
         try (CompactionController controller = getCompactionController(transaction.originals()))
@@ -234,6 +235,7 @@ public class CompactionTask extends AbstractCompactionTask
                                       mergeSummary));
             logger.trace(String.format("CF Total Bytes Compacted: %s", FBUtilities.prettyPrintMemory(CompactionTask.addToTotalBytesCompacted(endsize))));
             logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten,
estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
+            cfs.getCompactionStrategyManager().compactionLogger.compaction(startTime, transaction.originals(),
System.currentTimeMillis(), newSStables);
 
             // update the metrics
             cfs.metric.compactionBytesWritten.inc(endsize);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 9a17e06..7c1ff13 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
@@ -32,6 +33,9 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.utils.Pair;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
 
 import static com.google.common.collect.Iterables.filter;
 
@@ -342,6 +346,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
                     n += Math.ceil((double)stcsBucket.size() / cfs.getMaximumCompactionThreshold());
         }
         estimatedRemainingTasks = n;
+        cfs.getCompactionStrategyManager().compactionLogger.pending(this, n);
     }
 
 
@@ -453,6 +458,32 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         return uncheckedOptions;
     }
 
+    public CompactionLogger.Strategy strategyLogger() {
+        return new CompactionLogger.Strategy()
+        {
+            public JsonNode sstable(SSTableReader sstable)
+            {
+                ObjectNode node = JsonNodeFactory.instance.objectNode();
+                node.put("min_timestamp", sstable.getMinTimestamp());
+                node.put("max_timestamp", sstable.getMaxTimestamp());
+                return node;
+            }
+
+            public JsonNode options()
+            {
+                ObjectNode node = JsonNodeFactory.instance.objectNode();
+                TimeUnit resolution = DateTieredCompactionStrategy.this.options.timestampResolution;
+                node.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY,
+                         resolution.toString());
+                node.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY,
+                         resolution.toSeconds(DateTieredCompactionStrategy.this.options.baseTime));
+                node.put(DateTieredCompactionStrategyOptions.MAX_WINDOW_SIZE_KEY,
+                         resolution.toSeconds(DateTieredCompactionStrategy.this.options.maxWindowSize));
+                return node;
+            }
+        };
+    }
+
     public String toString()
     {
         return String.format("DateTieredCompactionStrategy[%s/%s]",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
index 78a0cab..fee9e34 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
@@ -44,6 +44,7 @@ public final class DateTieredCompactionStrategyOptions
 
     @Deprecated
     protected final long maxSSTableAge;
+    protected final TimeUnit timestampResolution;
     protected final long baseTime;
     protected final long expiredSSTableCheckFrequency;
     protected final long maxWindowSize;
@@ -51,7 +52,7 @@ public final class DateTieredCompactionStrategyOptions
     public DateTieredCompactionStrategyOptions(Map<String, String> options)
     {
         String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
-        TimeUnit timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION
: TimeUnit.valueOf(optionValue);
+        timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue);
         if (timestampResolution != DEFAULT_TIMESTAMP_RESOLUTION)
             logger.warn("Using a non-default timestamp_resolution {} - are you really doing
inserts with USING TIMESTAMP <non_microsecond_timestamp> (or driver equivalent)?", timestampResolution.toString());
         optionValue = options.get(MAX_SSTABLE_AGE_KEY);
@@ -68,9 +69,10 @@ public final class DateTieredCompactionStrategyOptions
     public DateTieredCompactionStrategyOptions()
     {
         maxSSTableAge = Math.round(DEFAULT_MAX_SSTABLE_AGE_DAYS * DEFAULT_TIMESTAMP_RESOLUTION.convert((long)
DEFAULT_MAX_SSTABLE_AGE_DAYS, TimeUnit.DAYS));
-        baseTime = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS);
+        timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION;
+        baseTime = timestampResolution.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS);
         expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS,
TimeUnit.SECONDS);
-        maxWindowSize = DEFAULT_TIMESTAMP_RESOLUTION.convert(1, TimeUnit.DAYS);
+        maxWindowSize = timestampResolution.convert(1, TimeUnit.DAYS);
     }
 
     public static Map<String, String> validateOptions(Map<String, String> options,
Map<String, String> uncheckedOptions) throws  ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 068d283..b6ad64c 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -38,6 +38,9 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.FBUtilities;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
 
 public class LeveledCompactionStrategy extends AbstractCompactionStrategy
 {
@@ -208,7 +211,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
 
     public int getEstimatedRemainingTasks()
     {
-        return manifest.getEstimatedTasks();
+        int n = manifest.getEstimatedTasks();
+        cfs.getCompactionStrategyManager().compactionLogger.pending(this, n);
+        return n;
     }
 
     public long getMaxSSTableBytes()
@@ -444,6 +449,26 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
         return null;
     }
 
+    public CompactionLogger.Strategy strategyLogger()
+    {
+        return new CompactionLogger.Strategy()
+        {
+            public JsonNode sstable(SSTableReader sstable)
+            {
+                ObjectNode node = JsonNodeFactory.instance.objectNode();
+                node.put("level", sstable.getSSTableLevel());
+                node.put("min_token", sstable.first.getToken().toString());
+                node.put("max_token", sstable.last.getToken().toString());
+                return node;
+            }
+
+            public JsonNode options()
+            {
+                return null;
+            }
+        };
+    }
+
     public static Map<String, String> validateOptions(Map<String, String> options)
throws ConfigurationException
     {
         Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16d8a7a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 28bdf5c..8ef2ac7 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -86,6 +86,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates),
sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize);
         logger.trace("Compaction buckets are {}", buckets);
         estimatedRemainingTasks = getEstimatedCompactionsByTasks(cfs, buckets);
+        cfs.getCompactionStrategyManager().compactionLogger.pending(this, estimatedRemainingTasks);
         List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold,
maxThreshold);
         if (!mostInteresting.isEmpty())
             return mostInteresting;


Mime
View raw message