cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [3/3] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0
Date Mon, 10 Aug 2015 07:20:46 GMT
Merge branch 'cassandra-2.2' into cassandra-3.0

Conflicts:
	src/java/org/apache/cassandra/config/CFMetaData.java
	src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
	test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/929438b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/929438b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/929438b8

Branch: refs/heads/cassandra-3.0
Commit: 929438b8be32e38f6d921bfdc4a011cd526dfeb7
Parents: e389dc4 9c3b967
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Mon Aug 10 09:12:52 2015 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Mon Aug 10 09:13:01 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  3 +-
 .../org/apache/cassandra/config/CFMetaData.java |  7 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 29 ++++++-
 .../cassandra/db/ColumnFamilyStoreMBean.java    | 19 +++--
 .../compaction/CompactionStrategyManager.java   | 51 +++++++++---
 .../db/compaction/CompactionsCQLTest.java       | 81 +++++++++++++++++++-
 7 files changed, 166 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7cb9f16,772455c..639dd59
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -19,39 -13,6 +19,40 @@@ Merged from 2.1
     when both exist (CASSANDRA-9777)
   * Release snapshot selfRef when doing snapshot repair (CASSANDRA-9998)
   * Cannot replace token does not exist - DN node removed as Fat Client (CASSANDRA-9871)
 +Merged from 2.0:
 + * Don't cast expected bf size to an int (CASSANDRA-9959)
 +
 +
 +3.0.0-alpha1
 + * Implement proper sandboxing for UDFs (CASSANDRA-9402)
 + * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066)
 + * Allow extra schema definitions in cassandra-stress yaml (CASSANDRA-9850)
 + * Metrics should use up to date nomenclature (CASSANDRA-9448)
 + * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384)
 + * Cleanup crc and adler code for java 8 (CASSANDRA-9650)
 + * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808, 9825,
 +   9848, 9705, 9859, 9867, 9874, 9828, 9801)
 + * Update Guava to 18.0 (CASSANDRA-9653)
 + * Bloom filter false positive ratio is not honoured (CASSANDRA-8413)
 + * New option for cassandra-stress to leave a ratio of columns null (CASSANDRA-9522)
 + * Change hinted_handoff_enabled yaml setting, JMX (CASSANDRA-9035)
 + * Add algorithmic token allocation (CASSANDRA-7032)
 + * Add nodetool command to replay batchlog (CASSANDRA-9547)
 + * Make file buffer cache independent of paths being read (CASSANDRA-8897)
 + * Remove deprecated legacy Hadoop code (CASSANDRA-9353)
 + * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801)
 + * Change gossip stabilization to use endpoit size (CASSANDRA-9401)
 + * Change default garbage collector to G1 (CASSANDRA-7486)
 + * Populate TokenMetadata early during startup (CASSANDRA-9317)
 + * Undeprecate cache recentHitRate (CASSANDRA-6591)
 + * Add support for selectively varint encoding fields (CASSANDRA-9499, 9865)
 + * Materialized Views (CASSANDRA-6477)
 +Merged from 2.2:
 + * Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900)
 + * UDF / UDA execution time in trace (CASSANDRA-9723)
 + * Fix broken internode SSL (CASSANDRA-9884)
 +Merged from 2.1:
++ * Add new JMX methods to change local compaction strategy (CASSANDRA-9965)
   * Fix handling of enable/disable autocompaction (CASSANDRA-9899)
   * Add consistency level to tracing ouput (CASSANDRA-9827)
   * Remove repair snapshot leftover on startup (CASSANDRA-7357)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 0fa4ded,ccc5cc8..5a690bd
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,55 -13,6 +13,56 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 +3.0
 +===
 +
 +New features
 +------------
 +   - Materialized Views, which allow for server-side denormalization, is now
 +     available. Materialized views provide an alternative to secondary indexes
 +     for non-primary key queries, and perform much better for indexing high
 +     cardinality columns.
 +     See http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views
 +
 +
 +Upgrading
 +---------
 +   - 3.0 requires Java 8u20 or later.
 +   - The default JVM GC has been changed to G1GC.
 +   - The default JVM flag -XX:+PerfDisableSharedMem will cause the following tools JVM
 +     to stop working: jps, jstack, jinfo, jmc, jcmd as well as 3rd party tools like Jolokia.
 +     If you wish to use these tools you can comment this flag out in cassandra-env.{sh,ps1}
 +   - New write stages have been added for batchlog and materialized view mutations
 +     you can set their size in cassandra.yaml
 +   - User defined functions are now executed in a sandbox.
 +     To use UDFs and UDAs, you have to enable them in cassandra.yaml.
 +   - New SSTable version 'la' with improved bloom-filter false-positive handling
 +     compared to previous version 'ka' used in 2.2 and 2.1. Running sstableupgrade
 +     is not necessary but recommended.
 +   - Before upgrading to 3.0, make sure that your cluster is in complete agreement
 +     (schema versions outputted by `nodetool describecluster` are all the same).
 +   - Schema metadata is now stored in the new `system_schema` keyspace, and
 +     legacy `system.schema_*` tables are now gone; see CASSANDRA-6717 for details.
 +   - Pig's CassandraStorage has been removed. Use CqlNativeStorage instead.
 +   - Hadoop BulkOutputFormat and BulkRecordWriter have been removed; use
 +     CqlBulkOutputFormat and CqlBulkRecordWriter instead.
 +   - Hadoop ColumnFamilyInputFormat and ColumnFamilyOutputFormat have been removed;
 +     use CqlInputFormat and CqlOutputFormat instead.
 +   - Hadoop ColumnFamilyRecordReader and ColumnFamilyRecordWriter have been removed;
 +     use CqlRecordReader and CqlRecordWriter instead.
 +   - hinted_handoff_enabled in cassandra.yaml no longer supports a list of data centers.
 +     To specify a list of excluded data centers when hinted_handoff_enabled is set to true,
 +     use hinted_handoff_disabled_datacenters, see CASSANDRA-9035 for details.
 +   - The `sstable_compression` and `chunk_length_kb` compression options have been deprecated.
 +     The new options are `class` and `chunk_length_in_kb`. Disabling compression should
now
 +     be done by setting the new option `enabled` to `false`.
 +   - Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax
 +     has been deprecated since 2.1.0 and is being removed in 3.0.0.
 +   - Batchlog entries are now stored in a new table - system.batches.
 +     The old one has been deprecated.
- 
++   - JMX methods set/getCompactionStrategyClass have been removed, use
++     set/getLocalCompactionStrategy or set/getLocalCompactionStrategyJson instead.
 +
  2.2
  ===
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 1d38274,6468973..7719587
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -772,25 -819,60 +772,26 @@@ public final class CFMetaDat
              throw new ConfigurationException(String.format("Column family comparators do
not match or are not compatible (found %s; expected %s).", cfm.comparator.getClass().getSimpleName(),
comparator.getClass().getSimpleName()));
      }
  
 -    public static void validateCompactionOptions(Class<? extends AbstractCompactionStrategy>
strategyClass, Map<String, String> options) throws ConfigurationException
 +    public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String
className) throws ConfigurationException
 +    {
 +        className = className.contains(".") ? className : "org.apache.cassandra.db.compaction."
+ className;
 +        Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className,
"compaction strategy");
 +        if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass))
 +            throw new ConfigurationException(String.format("Specified compaction strategy
class (%s) is not derived from AbstractReplicationStrategy", className));
 +
 +        return strategyClass;
 +    }
 +
-     public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore
cfs)
++    public static AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore
cfs,
++                                                                              CompactionParams
compactionParams)
      {
          try
          {
 -            if (options == null)
 -                return;
 -
 -            Map<?,?> unknownOptions = (Map) strategyClass.getMethod("validateOptions",
Map.class).invoke(null, options);
 -            if (!unknownOptions.isEmpty())
 -                throw new ConfigurationException(String.format("Properties specified %s
are not understood by %s", unknownOptions.keySet(), strategyClass.getSimpleName()));
 +            Constructor<? extends AbstractCompactionStrategy> constructor =
-                 params.compaction.klass().getConstructor(ColumnFamilyStore.class, Map.class);
-             return constructor.newInstance(cfs, params.compaction.options());
++                compactionParams.klass().getConstructor(ColumnFamilyStore.class, Map.class);
++            return constructor.newInstance(cfs, compactionParams.options());
          }
 -        catch (NoSuchMethodException e)
 -        {
 -            logger.warn("Compaction Strategy {} does not have a static validateOptions method.
Validation ignored", strategyClass.getName());
 -        }
 -        catch (InvocationTargetException e)
 -        {
 -            if (e.getTargetException() instanceof ConfigurationException)
 -                throw (ConfigurationException) e.getTargetException();
 -            throw new ConfigurationException("Failed to validate compaction options: " +
options);
 -        }
 -        catch (ConfigurationException e)
 -        {
 -            throw e;
 -        }
 -        catch (Exception e)
 -        {
 -            throw new ConfigurationException("Failed to validate compaction options: " +
options);
 -        }
 -    }
 -
 -    public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String
className) throws ConfigurationException
 -    {
 -        className = className.contains(".") ? className : "org.apache.cassandra.db.compaction."
+ className;
 -        Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className,
"compaction strategy");
 -        if (className.equals(WrappingCompactionStrategy.class.getName()))
 -            throw new ConfigurationException("You can't set WrappingCompactionStrategy as
the compaction strategy!");
 -        if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass))
 -            throw new ConfigurationException(String.format("Specified compaction strategy
class (%s) is not derived from AbstractReplicationStrategy", className));
 -
 -        return strategyClass;
 -    }
 -
 -    public static AbstractCompactionStrategy createCompactionStrategyInstance(Class<?
extends AbstractCompactionStrategy> compactionStrategyClass,
 -                                                                              ColumnFamilyStore
cfs,
 -                                                                              Map<String,
String> compactionStrategyOptions)
 -    {
 -        try
 -        {
 -            Constructor<? extends AbstractCompactionStrategy> constructor =
 -                compactionStrategyClass.getConstructor(ColumnFamilyStore.class, Map.class);
 -            return constructor.newInstance(cfs, compactionStrategyOptions);
 -        }
 -        catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException
| InstantiationException e)
 +        catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException
| InstantiationException e)
          {
              throw new RuntimeException(e);
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index beb2b93,6b71be9..4ae6694
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -248,14 -259,57 +248,35 @@@ public class ColumnFamilyStore implemen
          };
      }
  
-     public void setCompactionStrategyClass(String compactionStrategyClass)
+     public void setLocalCompactionStrategyJson(String options)
      {
-         throw new UnsupportedOperationException("ColumnFamilyStore.setCompactionStrategyClass()
method is no longer supported");
+         setLocalCompactionStrategy(FBUtilities.fromJsonMap(options));
      }
  
-     public String getCompactionStrategyClass()
+     public String getLocalCompactionStrategyJson()
      {
-         return metadata.params.compaction.klass().getName();
+         return FBUtilities.json(getLocalCompactionStrategy());
+     }
+ 
+     public void setLocalCompactionStrategy(Map<String, String> options)
+     {
+         try
+         {
 -            Map<String, String> optionsCopy = new HashMap<>(options);
 -            Class<? extends AbstractCompactionStrategy> compactionStrategyClass =
CFMetaData.createCompactionStrategy(optionsCopy.get("class"));
 -            optionsCopy.remove("class");
 -            CFMetaData.validateCompactionOptions(compactionStrategyClass, optionsCopy);
 -            compactionStrategyWrapper.setNewLocalCompactionStrategy(compactionStrategyClass,
optionsCopy);
++            CompactionParams compactionParams = CompactionParams.fromMap(options);
++            compactionParams.validate();
++            compactionStrategyManager.setNewLocalCompactionStrategy(compactionParams);
+         }
+         catch (Throwable t)
+         {
+             logger.error("Could not set new local compaction strategy", t);
+             // dont propagate the ConfigurationException over jmx, user will only see a
ClassNotFoundException
+             throw new IllegalArgumentException("Could not set new local compaction strategy:
"+t.getMessage());
+         }
+     }
+ 
+     public Map<String, String> getLocalCompactionStrategy()
+     {
 -        Map<String, String> options = new HashMap<>(compactionStrategyWrapper.options);
 -        options.put("class", compactionStrategyWrapper.getName());
 -        return options;
 -    }
 -
 -    public void setCompactionStrategyClass(String compactionStrategyClass)
 -    {
 -        try
 -        {
 -            metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass);
 -            compactionStrategyWrapper.maybeReloadCompactionStrategy(metadata);
 -        }
 -        catch (ConfigurationException e)
 -        {
 -            throw new IllegalArgumentException(e.getMessage());
 -        }
 -    }
 -
 -    public String getCompactionStrategyClass()
 -    {
 -        return metadata.compactionStrategyClass.getName();
++        return compactionStrategyManager.getCompactionParams().asMap();
      }
  
      public Map<String,String> getCompressionParameters()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index c23df74,1a8ba1d..84c6dd1
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@@ -70,15 -67,36 +70,24 @@@ public interface ColumnFamilyStoreMBea
      public void setMaximumCompactionThreshold(int threshold);
  
      /**
-      * Sets the compaction strategy by class name
-      * @param className the name of the compaction strategy class
+      * Sets the compaction strategy locally for this node
+      *
+      * Note that this will be set until an ALTER with compaction = {..} is executed or the
node is restarted
+      *
+      * @param options compaction options with the same syntax as when doing ALTER ... WITH
compaction = {..}
       */
-     public void setCompactionStrategyClass(String className);
+     public void setLocalCompactionStrategyJson(String options);
+     public String getLocalCompactionStrategyJson();
  
      /**
-      * Gets the compaction strategy class name
+      * Sets the compaction strategy locally for this node
+      *
+      * Note that this will be set until an ALTER with compaction = {..} is executed or the
node is restarted
+      *
+      * @param options compaction options map
       */
-     public String getCompactionStrategyClass();
+     public void setLocalCompactionStrategy(Map<String, String> options);
+     public Map<String, String> getLocalCompactionStrategy();
 -    /**
 -     * Sets the compaction strategy by class name
 -     * @param className the name of the compaction strategy class
 -     */
 -    @Deprecated
 -    public void setCompactionStrategyClass(String className);
 -
 -    /**
 -     * Gets the compaction strategy class name
 -     */
 -    @Deprecated
 -    public String getCompactionStrategyClass();
  
      /**
       * Get the compression parameters

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 4f6dfa2,0000000..7204da0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -1,453 -1,0 +1,482 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.compaction;
 +
 +
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +
 +import com.google.common.collect.Iterables;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.notifications.*;
 +import org.apache.cassandra.schema.CompactionParams;
 +
 +/**
 + * Manages the compaction strategies.
 + *
 + * Currently has two instances of actual compaction strategies - one for repaired data and
one for
 + * unrepaired data. This is done to be able to totally separate the different sets of sstables.
 + */
 +public class CompactionStrategyManager implements INotificationConsumer
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
 +    private final ColumnFamilyStore cfs;
 +    private volatile AbstractCompactionStrategy repaired;
 +    private volatile AbstractCompactionStrategy unrepaired;
 +    private volatile boolean enabled = true;
 +    public boolean isActive = true;
 +    private volatile CompactionParams params;
++    /*
++        We keep a copy of the schema compaction parameters here to be able to decide if
we
++        should update the compaction strategy in maybeReloadCompactionStrategy() due to
an ALTER.
++
++        If a user changes the local compaction strategy and then later ALTERs a compaction
parameter,
++        we will use the new compaction parameters.
++     */
++    private CompactionParams schemaCompactionParams;
 +
 +    public CompactionStrategyManager(ColumnFamilyStore cfs)
 +    {
 +        cfs.getTracker().subscribe(this);
 +        logger.debug("{} subscribed to the data tracker.", this);
 +        this.cfs = cfs;
 +        reload(cfs.metadata);
 +        params = cfs.metadata.params.compaction;
 +        enabled = params.isEnabled();
 +    }
 +
 +    /**
 +     * Return the next background task
 +     *
 +     * Returns a task for the compaction strategy that needs it the most (most estimated
remaining tasks)
 +     *
 +     */
 +    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
 +    {
 +        if (!isEnabled())
 +            return null;
 +
 +        maybeReload(cfs.metadata);
 +
 +        if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
 +        {
 +            AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
 +            if (repairedTask != null)
 +                return repairedTask;
 +            return unrepaired.getNextBackgroundTask(gcBefore);
 +        }
 +        else
 +        {
 +            AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
 +            if (unrepairedTask != null)
 +                return unrepairedTask;
 +            return repaired.getNextBackgroundTask(gcBefore);
 +        }
 +    }
 +
 +    public boolean isEnabled()
 +    {
 +        return enabled && isActive;
 +    }
 +
 +    public synchronized void resume()
 +    {
 +        isActive = true;
 +    }
 +
 +    /**
 +     * pause compaction while we cancel all ongoing compactions
 +     *
 +     * Separate call from enable/disable to not have to save the enabled-state externally
 +      */
 +    public synchronized void pause()
 +    {
 +        isActive = false;
 +    }
 +
 +
 +    private void startup()
 +    {
 +        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
 +        {
 +            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
 +                getCompactionStrategyFor(sstable).addSSTable(sstable);
 +        }
 +        repaired.startup();
 +        unrepaired.startup();
 +    }
 +
 +    /**
 +     * return the compaction strategy for the given sstable
 +     *
 +     * returns differently based on the repaired status
 +     * @param sstable
 +     * @return
 +     */
 +    private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
 +    {
 +        if (sstable.isRepaired())
 +            return repaired;
 +        else
 +            return unrepaired;
 +    }
 +
 +    public void shutdown()
 +    {
 +        isActive = false;
 +        repaired.shutdown();
 +        unrepaired.shutdown();
 +    }
 +
 +    public synchronized void maybeReload(CFMetaData metadata)
 +    {
-         if (repaired != null && repaired.getClass().equals(metadata.params.compaction.klass())
-                 && unrepaired != null && unrepaired.getClass().equals(metadata.params.compaction.klass())
-                 && repaired.options.equals(metadata.params.compaction.options())
// todo: assumes all have the same options
-                 && unrepaired.options.equals(metadata.params.compaction.options()))
++        // compare the old schema configuration to the new one, ignore any locally set changes.
++        if (metadata.params.compaction.equals(schemaCompactionParams))
 +            return;
 +        reload(metadata);
 +    }
 +
 +    /**
 +     * Reload the compaction strategies
 +     *
 +     * Called after changing configuration and at startup.
 +     * @param metadata
 +     */
 +    public synchronized void reload(CFMetaData metadata)
 +    {
 +        boolean disabledWithJMX = !enabled && shouldBeEnabled();
-         if (repaired != null)
-             repaired.shutdown();
-         if (unrepaired != null)
-             unrepaired.shutdown();
-         repaired = metadata.createCompactionStrategyInstance(cfs);
-         unrepaired = metadata.createCompactionStrategyInstance(cfs);
-         params = metadata.params.compaction;
++        setStrategy(metadata.params.compaction);
++        schemaCompactionParams = metadata.params.compaction;
++
 +        if (disabledWithJMX || !shouldBeEnabled())
 +            disable();
 +        else
 +            enable();
 +        startup();
 +    }
 +
 +    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
 +    {
 +        cfs.getTracker().replaceFlushed(memtable, sstable);
 +        if (sstable != null)
 +            CompactionManager.instance.submitBackground(cfs);
 +    }
 +
 +    public int getUnleveledSSTables()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof
LeveledCompactionStrategy)
 +        {
 +            int count = 0;
 +            count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
 +            count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
 +            return count;
 +        }
 +        return 0;
 +    }
 +
 +    public synchronized int[] getSSTableCountPerLevel()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof
LeveledCompactionStrategy)
 +        {
 +            int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
 +            int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
 +            res = sumArrays(res, repairedCountPerLevel);
 +            int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
 +            res = sumArrays(res, unrepairedCountPerLevel);
 +            return res;
 +        }
 +        return null;
 +    }
 +
 +    private static int[] sumArrays(int[] a, int[] b)
 +    {
 +        int[] res = new int[Math.max(a.length, b.length)];
 +        for (int i = 0; i < res.length; i++)
 +        {
 +            if (i < a.length && i < b.length)
 +                res[i] = a[i] + b[i];
 +            else if (i < a.length)
 +                res[i] = a[i];
 +            else
 +                res[i] = b[i];
 +        }
 +        return res;
 +    }
 +
 +    public boolean shouldDefragment()
 +    {
 +        assert repaired.getClass().equals(unrepaired.getClass());
 +        return repaired.shouldDefragment();
 +    }
 +
 +
 +    public synchronized void handleNotification(INotification notification, Object sender)
 +    {
 +        if (notification instanceof SSTableAddedNotification)
 +        {
 +            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
 +            if (flushedNotification.added.isRepaired())
 +                repaired.addSSTable(flushedNotification.added);
 +            else
 +                unrepaired.addSSTable(flushedNotification.added);
 +        }
 +        else if (notification instanceof SSTableListChangedNotification)
 +        {
 +            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification)
notification;
 +            Set<SSTableReader> repairedRemoved = new HashSet<>();
 +            Set<SSTableReader> repairedAdded = new HashSet<>();
 +            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
 +            Set<SSTableReader> unrepairedAdded = new HashSet<>();
 +
 +            for (SSTableReader sstable : listChangedNotification.removed)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedRemoved.add(sstable);
 +                else
 +                    unrepairedRemoved.add(sstable);
 +            }
 +            for (SSTableReader sstable : listChangedNotification.added)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedAdded.add(sstable);
 +                else
 +                    unrepairedAdded.add(sstable);
 +            }
 +            if (!repairedRemoved.isEmpty())
 +            {
 +                repaired.replaceSSTables(repairedRemoved, repairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : repairedAdded)
 +                    repaired.addSSTable(sstable);
 +            }
 +
 +            if (!unrepairedRemoved.isEmpty())
 +            {
 +                unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : unrepairedAdded)
 +                    unrepaired.addSSTable(sstable);
 +            }
 +        }
 +        else if (notification instanceof SSTableRepairStatusChanged)
 +        {
 +            for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
 +            {
 +                if (sstable.isRepaired())
 +                {
 +                    unrepaired.removeSSTable(sstable);
 +                    repaired.addSSTable(sstable);
 +                }
 +                else
 +                {
 +                    repaired.removeSSTable(sstable);
 +                    unrepaired.addSSTable(sstable);
 +                }
 +            }
 +        }
 +        else if (notification instanceof SSTableDeletingNotification)
 +        {
 +            SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
 +            if (sstable.isRepaired())
 +                repaired.removeSSTable(sstable);
 +            else
 +                unrepaired.removeSSTable(sstable);
 +        }
 +    }
 +
 +    public void enable()
 +    {
 +        if (repaired != null)
 +            repaired.enable();
 +        if (unrepaired != null)
 +            unrepaired.enable();
 +        // enable this last to make sure the strategies are ready to get calls.
 +        enabled = true;
 +    }
 +
 +    public void disable()
 +    {
 +        // disable this first avoid asking disabled strategies for compaction tasks
 +        enabled = false;
 +        if (repaired != null)
 +            repaired.disable();
 +        if (unrepaired != null)
 +            unrepaired.disable();
 +    }
 +
 +    /**
 +     * Create ISSTableScanner from the given sstables
 +     *
 +     * Delegates the call to the compaction strategies to allow LCS to create a scanner
 +     * @param sstables
 +     * @param range
 +     * @return
 +     */
 +    @SuppressWarnings("resource")
 +    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader>
sstables,  Collection<Range<Token>> ranges)
 +    {
 +        List<SSTableReader> repairedSSTables = new ArrayList<>();
 +        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable.isRepaired())
 +                repairedSSTables.add(sstable);
 +            else
 +                unrepairedSSTables.add(sstable);
 +        }
 +
 +        Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
 +
 +        for (Range<Token> range : ranges)
 +        {
 +            AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables,
range);
 +            AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables,
range);
 +
 +            for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
 +            {
 +                if (!scanners.add(scanner))
 +                    scanner.close();
 +            }
 +        }
 +
 +        return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
 +    }
 +
 +    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader>
sstables)
 +    {
 +        return getScanners(sstables, Collections.singleton(null));
 +    }
 +
 +    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader>
sstablesToGroup)
 +    {
 +        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
 +    }
 +
 +    public long getMaxSSTableBytes()
 +    {
 +        return unrepaired.getMaxSSTableBytes();
 +    }
 +
 +    public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore,
long maxSSTableBytes)
 +    {
 +        return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn,
gcBefore, maxSSTableBytes);
 +    }
 +
 +    public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore,
final boolean splitOutput)
 +    {
 +        // runWithCompactionsDisabled cancels active compactions and disables them, then
we are able
 +        // to make the repaired/unrepaired strategies mark their own sstables as compacting.
Once the
 +        // sstables are marked the compactions are re-enabled
 +        return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
 +        {
 +            @Override
 +            public Collection<AbstractCompactionTask> call() throws Exception
 +            {
 +                synchronized (CompactionStrategyManager.this)
 +                {
 +                    Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore,
splitOutput);
 +                    Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore,
splitOutput);
 +
 +                    if (repairedTasks == null && unrepairedTasks == null)
 +                        return null;
 +
 +                    if (repairedTasks == null)
 +                        return unrepairedTasks;
 +                    if (unrepairedTasks == null)
 +                        return repairedTasks;
 +
 +                    List<AbstractCompactionTask> tasks = new ArrayList<>();
 +                    tasks.addAll(repairedTasks);
 +                    tasks.addAll(unrepairedTasks);
 +                    return tasks;
 +                }
 +            }
 +        }, false, false);
 +    }
 +
 +    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables,
int gcBefore)
 +    {
 +        return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables,
gcBefore);
 +    }
 +
 +    public int getEstimatedRemainingTasks()
 +    {
 +        int tasks = 0;
 +        tasks += repaired.getEstimatedRemainingTasks();
 +        tasks += unrepaired.getEstimatedRemainingTasks();
 +
 +        return tasks;
 +    }
 +
 +    public boolean shouldBeEnabled()
 +    {
 +        return params.isEnabled();
 +    }
 +
 +    public String getName()
 +    {
 +        return unrepaired.getName();
 +    }
 +
 +    public List<AbstractCompactionStrategy> getStrategies()
 +    {
 +        return Arrays.asList(repaired, unrepaired);
 +    }
++
++    public synchronized void setNewLocalCompactionStrategy(CompactionParams params)
++    {
++        logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
++        setStrategy(params);
++        if (shouldBeEnabled())
++            enable();
++        else
++            disable();
++        startup();
++    }
++
++    private void setStrategy(CompactionParams params)
++    {
++        if (repaired != null)
++            repaired.shutdown();
++        if (unrepaired != null)
++            unrepaired.shutdown();
++        repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
++        unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
++        this.params = params;
++    }
++
++    public CompactionParams getCompactionParams()
++    {
++        return params;
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 0db231e,2798689..63b21df
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@@ -141,7 -145,88 +145,82 @@@ public class CompactionsCQLTest extend
          assertTrue(minorWasTriggered(KEYSPACE, currentTable()));
      }
  
-     public boolean minorWasTriggered(String keyspace, String cf) throws Throwable
+     @Test
+     public void testSetLocalCompactionStrategy() throws Throwable
+     {
+         createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+         Map<String, String> localOptions = new HashMap<>();
+         localOptions.put("class", "DateTieredCompactionStrategy");
+         getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
 -        WrappingCompactionStrategy wrappingCompactionStrategy = (WrappingCompactionStrategy)
getCurrentColumnFamilyStore().getCompactionStrategy();
 -        assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class));
++        assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(),
DateTieredCompactionStrategy.class));
+         // altering something non-compaction related
+         execute("ALTER TABLE %s WITH gc_grace_seconds = 1000");
+         // should keep the local compaction strat
 -        assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class));
++        assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(),
DateTieredCompactionStrategy.class));
+         // altering a compaction option
+         execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy',
'min_threshold':3}");
+         // will use the new option
 -        assertTrue(verifyStrategies(wrappingCompactionStrategy, SizeTieredCompactionStrategy.class));
++        assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(),
SizeTieredCompactionStrategy.class));
+     }
+ 
+ 
+     @Test
+     public void testSetLocalCompactionStrategyDisable() throws Throwable
+     {
+         createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+         Map<String, String> localOptions = new HashMap<>();
+         localOptions.put("class", "DateTieredCompactionStrategy");
+         localOptions.put("enabled", "false");
+         getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
 -        assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++        assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+         localOptions.clear();
+         localOptions.put("class", "DateTieredCompactionStrategy");
+         // localOptions.put("enabled", "true"); - this is default!
+         getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
 -        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+     }
+ 
+ 
+     @Test
+     public void testSetLocalCompactionStrategyEnable() throws Throwable
+     {
+         createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+         Map<String, String> localOptions = new HashMap<>();
+         localOptions.put("class", "DateTieredCompactionStrategy");
+ 
+         getCurrentColumnFamilyStore().disableAutoCompaction();
 -        assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++        assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+ 
+         getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
 -        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+ 
+     }
+ 
+ 
+ 
+     @Test(expected = IllegalArgumentException.class)
+     public void testBadLocalCompactionStrategyOptions()
+     {
+         createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+         Map<String, String> localOptions = new HashMap<>();
+         localOptions.put("class","SizeTieredCompactionStrategy");
+         localOptions.put("sstable_size_in_mb","1234"); // not for STCS
+         getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+     }
+ 
 -    public boolean verifyStrategies(WrappingCompactionStrategy wrappingStrategy, Class<?
extends AbstractCompactionStrategy> expected)
++    public boolean verifyStrategies(CompactionStrategyManager manager, Class<? extends
AbstractCompactionStrategy> expected)
+     {
+         boolean found = false;
 -        for (AbstractCompactionStrategy actualStrategy : wrappingStrategy.getWrappedStrategies())
++        for (AbstractCompactionStrategy actualStrategy : manager.getStrategies())
+         {
+             if (!actualStrategy.getClass().equals(expected))
+                 return false;
+             found = true;
+         }
+         return found;
+     }
+ 
 -    private ColumnFamilyStore getCurrentColumnFamilyStore()
 -    {
 -        return Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
 -    }
 -
+     private boolean minorWasTriggered(String keyspace, String cf) throws Throwable
      {
          UntypedResultSet res = execute("SELECT * FROM system.compaction_history");
          boolean minorWasTriggered = false;


Mime
View raw message