cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [3/6] cassandra git commit: Factor out TableParams from CFMetaData
Date Tue, 04 Aug 2015 09:16:34 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/CachingParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CachingParams.java b/src/java/org/apache/cassandra/schema/CachingParams.java
new file mode 100644
index 0000000..2b5ab7c
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/CachingParams.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+import static java.lang.String.format;
+
+// CQL: {'keys' : 'ALL'|'NONE', 'rows_per_partition': '200'|'NONE'|'ALL'}
+public final class CachingParams
+{
+    public enum Option
+    {
+        KEYS,
+        ROWS_PER_PARTITION;
+
+        @Override
+        public String toString()
+        {
+            return name().toLowerCase();
+        }
+    }
+
+    private static final String ALL = "ALL";
+    private static final String NONE = "NONE";
+
+    static final boolean DEFAULT_CACHE_KEYS = true;
+    static final int DEFAULT_ROWS_PER_PARTITION_TO_CACHE = 0;
+
+    public static final CachingParams CACHE_NOTHING = new CachingParams(false, 0);
+    public static final CachingParams CACHE_KEYS = new CachingParams(true, 0);
+    public static final CachingParams CACHE_EVERYTHING = new CachingParams(true, Integer.MAX_VALUE);
+
+    static final CachingParams DEFAULT = new CachingParams(DEFAULT_CACHE_KEYS, DEFAULT_ROWS_PER_PARTITION_TO_CACHE);
+
+    final boolean cacheKeys;
+    final int rowsPerPartitionToCache;
+
+    public CachingParams(boolean cacheKeys, int rowsPerPartitionToCache)
+    {
+        this.cacheKeys = cacheKeys;
+        this.rowsPerPartitionToCache = rowsPerPartitionToCache;
+    }
+
+    public boolean cacheKeys()
+    {
+        return cacheKeys;
+    }
+
+    public boolean cacheRows()
+    {
+        return rowsPerPartitionToCache > 0;
+    }
+
+    public boolean cacheAllRows()
+    {
+        return rowsPerPartitionToCache == Integer.MAX_VALUE;
+    }
+
+    public int rowsPerPartitionToCache()
+    {
+        return rowsPerPartitionToCache;
+    }
+
+    public static CachingParams fromMap(Map<String, String> map)
+    {
+        Map<String, String> copy = new HashMap<>(map);
+
+        String keys = copy.remove(Option.KEYS.toString());
+        boolean cacheKeys = keys != null && keysFromString(keys);
+
+        String rows = copy.remove(Option.ROWS_PER_PARTITION.toString());
+        int rowsPerPartitionToCache = rows == null
+                                    ? 0
+                                    : rowsPerPartitionFromString(rows);
+
+        if (!copy.isEmpty())
+        {
+            throw new ConfigurationException(format("Invalid caching sub-options %s: only '%s' and '%s' are allowed",
+                                                    copy.keySet(),
+                                                    Option.KEYS,
+                                                    Option.ROWS_PER_PARTITION));
+        }
+
+        return new CachingParams(cacheKeys, rowsPerPartitionToCache);
+    }
+
+    public Map<String, String> asMap()
+    {
+        return ImmutableMap.of(Option.KEYS.toString(),
+                               keysAsString(),
+                               Option.ROWS_PER_PARTITION.toString(),
+                               rowsPerPartitionAsString());
+    }
+
+    private static boolean keysFromString(String value)
+    {
+        if (value.equalsIgnoreCase(ALL))
+            return true;
+
+        if (value.equalsIgnoreCase(NONE))
+            return false;
+
+        throw new ConfigurationException(format("Invalid value '%s' for caching sub-option '%s': only '%s' and '%s' are allowed",
+                                                value,
+                                                Option.KEYS,
+                                                ALL,
+                                                NONE));
+    }
+
+    String keysAsString()
+    {
+        return cacheKeys ? ALL : NONE;
+    }
+
+    private static int rowsPerPartitionFromString(String value)
+    {
+        if (value.equalsIgnoreCase(ALL))
+            return Integer.MAX_VALUE;
+
+        if (value.equalsIgnoreCase(NONE))
+            return 0;
+
+        if (StringUtils.isNumeric(value))
+            return Integer.parseInt(value);
+
+        throw new ConfigurationException(format("Invalid value '%s' for caching sub-option '%s':"
+                                                + " only '%s', '%s', and integer values are allowed",
+                                                value,
+                                                Option.ROWS_PER_PARTITION,
+                                                ALL,
+                                                NONE));
+    }
+
+    String rowsPerPartitionAsString()
+    {
+        if (rowsPerPartitionToCache == 0)
+            return NONE;
+        else if (rowsPerPartitionToCache == Integer.MAX_VALUE)
+            return ALL;
+        else
+            return Integer.toString(rowsPerPartitionToCache);
+    }
+
+    @Override
+    public String toString()
+    {
+        return format("{'%s' : '%s', '%s' : '%s'}",
+                      Option.KEYS,
+                      keysAsString(),
+                      Option.ROWS_PER_PARTITION,
+                      rowsPerPartitionAsString());
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof CachingParams))
+            return false;
+
+        CachingParams c = (CachingParams) o;
+
+        return cacheKeys == c.cacheKeys && rowsPerPartitionToCache == c.rowsPerPartitionToCache;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(cacheKeys, rowsPerPartitionToCache);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/CompactionParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CompactionParams.java b/src/java/org/apache/cassandra/schema/CompactionParams.java
new file mode 100644
index 0000000..720efa3
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/CompactionParams.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
+import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static java.lang.String.format;
+
+public final class CompactionParams
+{
+    private static final Logger logger = LoggerFactory.getLogger(CompactionParams.class);
+
+    public enum Option
+    {
+        CLASS,
+        ENABLED,
+        MIN_THRESHOLD,
+        MAX_THRESHOLD;
+
+        @Override
+        public String toString()
+        {
+            return name().toLowerCase();
+        }
+    }
+
+    public static final int DEFAULT_MIN_THRESHOLD = 4;
+    public static final int DEFAULT_MAX_THRESHOLD = 32;
+
+    public static final boolean DEFAULT_ENABLED = true;
+
+    public static final Map<String, String> DEFAULT_THRESHOLDS =
+        ImmutableMap.of(Option.MIN_THRESHOLD.toString(), Integer.toString(DEFAULT_MIN_THRESHOLD),
+                        Option.MAX_THRESHOLD.toString(), Integer.toString(DEFAULT_MAX_THRESHOLD));
+
+    public static final CompactionParams DEFAULT =
+        new CompactionParams(SizeTieredCompactionStrategy.class, DEFAULT_THRESHOLDS, DEFAULT_ENABLED);
+
+    private final Class<? extends AbstractCompactionStrategy> klass;
+    private final ImmutableMap<String, String> options;
+    private final boolean isEnabled;
+
+    private CompactionParams(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options, boolean isEnabled)
+    {
+        this.klass = klass;
+        this.options = ImmutableMap.copyOf(options);
+        this.isEnabled = isEnabled;
+    }
+
+    public static CompactionParams create(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options)
+    {
+        boolean isEnabled = options.containsKey(Option.ENABLED.toString())
+                          ? Boolean.parseBoolean(options.get(Option.ENABLED.toString()))
+                          : DEFAULT_ENABLED;
+
+        Map<String, String> allOptions = new HashMap<>(options);
+        if (supportsThresholdParams(klass))
+        {
+            allOptions.putIfAbsent(Option.MIN_THRESHOLD.toString(), Integer.toString(DEFAULT_MIN_THRESHOLD));
+            allOptions.putIfAbsent(Option.MAX_THRESHOLD.toString(), Integer.toString(DEFAULT_MAX_THRESHOLD));
+        }
+
+        return new CompactionParams(klass, allOptions, isEnabled);
+    }
+
+    public static CompactionParams scts(Map<String, String> options)
+    {
+        return create(SizeTieredCompactionStrategy.class, options);
+    }
+
+    public static CompactionParams lcs(Map<String, String> options)
+    {
+        return create(LeveledCompactionStrategy.class, options);
+    }
+
+    public int minCompactionThreshold()
+    {
+        String threshold = options.get(Option.MIN_THRESHOLD.toString());
+        return threshold == null
+             ? DEFAULT_MIN_THRESHOLD
+             : Integer.parseInt(threshold);
+    }
+
+    public int maxCompactionThreshold()
+    {
+        String threshold = options.get(Option.MAX_THRESHOLD.toString());
+        return threshold == null
+             ? DEFAULT_MAX_THRESHOLD
+             : Integer.parseInt(threshold);
+    }
+
+    public void validate()
+    {
+        try
+        {
+            Map<?, ?> unknownOptions = (Map) klass.getMethod("validateOptions", Map.class).invoke(null, options);
+            if (!unknownOptions.isEmpty())
+            {
+                throw new ConfigurationException(format("Properties specified %s are not understood by %s",
+                                                        unknownOptions.keySet(),
+                                                        klass.getSimpleName()));
+            }
+        }
+        catch (NoSuchMethodException e)
+        {
+            logger.warn("Compaction strategy {} does not have a static validateOptions method. Validation ignored",
+                        klass.getName());
+        }
+        catch (InvocationTargetException e)
+        {
+            if (e.getTargetException() instanceof ConfigurationException)
+                throw (ConfigurationException) e.getTargetException();
+
+            Throwable cause = e.getCause() == null
+                            ? e
+                            : e.getCause();
+
+            throw new ConfigurationException(format("%s.validateOptions() threw an error: %s %s",
+                                                    klass.getName(),
+                                                    cause.getClass().getName(),
+                                                    cause.getMessage()),
+                                             e);
+        }
+        catch (IllegalAccessException e)
+        {
+            throw new ConfigurationException("Cannot access method validateOptions in " + klass.getName(), e);
+        }
+
+        String minThreshold = options.get(Option.MIN_THRESHOLD.toString());
+        if (minThreshold != null && !StringUtils.isNumeric(minThreshold))
+        {
+            throw new ConfigurationException(format("Invalid value %s for '%s' compaction sub-option - must be an integer",
+                                                    minThreshold,
+                                                    Option.MIN_THRESHOLD));
+        }
+
+        String maxThreshold = options.get(Option.MAX_THRESHOLD.toString());
+        if (maxThreshold != null && !StringUtils.isNumeric(maxThreshold))
+        {
+            throw new ConfigurationException(format("Invalid value %s for '%s' compaction sub-option - must be an integer",
+                                                    maxThreshold,
+                                                    Option.MAX_THRESHOLD));
+        }
+
+        if (minCompactionThreshold() <= 0 || maxCompactionThreshold() <= 0)
+        {
+            throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been removed,"
+                                             + " set the compaction option 'enabled' to false instead.");
+        }
+
+        if (minCompactionThreshold() <= 1)
+        {
+            throw new ConfigurationException(format("Min compaction threshold cannot be less than 2 (got %d)",
+                                                    minCompactionThreshold()));
+        }
+
+        if (minCompactionThreshold() > maxCompactionThreshold())
+        {
+            throw new ConfigurationException(format("Min compaction threshold (got %d) cannot be greater than max compaction threshold (got %d)",
+                                                    minCompactionThreshold(),
+                                                    maxCompactionThreshold()));
+        }
+    }
+
+    double defaultBloomFilterFbChance()
+    {
+        return klass.equals(LeveledCompactionStrategy.class) ? 0.1 : 0.01;
+    }
+
+    public Class<? extends AbstractCompactionStrategy> klass()
+    {
+        return klass;
+    }
+
+    /**
+     * All strategy options - excluding 'class'.
+     */
+    public Map<String, String> options()
+    {
+        return options;
+    }
+
+    public boolean isEnabled()
+    {
+        return isEnabled;
+    }
+
+    public static CompactionParams fromMap(Map<String, String> map)
+    {
+        Map<String, String> options = new HashMap<>(map);
+
+        String className = options.remove(Option.CLASS.toString());
+        if (className == null)
+        {
+            throw new ConfigurationException(format("Missing sub-option '%s' for the '%s' option",
+                                                    Option.CLASS,
+                                                    TableParams.Option.COMPACTION));
+        }
+
+        return create(classFromName(className), options);
+    }
+
+    private static Class<? extends AbstractCompactionStrategy> classFromName(String name)
+    {
+        String className = name.contains(".")
+                         ? name
+                         : "org.apache.cassandra.db.compaction." + name;
+        Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy");
+
+        if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass))
+        {
+            throw new ConfigurationException(format("Compaction strategy class %s is not derived from AbstractReplicationStrategy",
+                                                    className));
+        }
+
+        return strategyClass;
+    }
+
+    /*
+     * LCS doesn't, STCS and DTCS do
+     */
+    @SuppressWarnings("unchecked")
+    public static boolean supportsThresholdParams(Class<? extends AbstractCompactionStrategy> klass)
+    {
+        try
+        {
+            Map<String, String> unrecognizedOptions =
+                (Map<String, String>) klass.getMethod("validateOptions", Map.class)
+                                           .invoke(null, DEFAULT_THRESHOLDS);
+
+            return unrecognizedOptions.isEmpty();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Map<String, String> asMap()
+    {
+        Map<String, String> map = new HashMap<>(options());
+        map.put(Option.CLASS.toString(), klass.getName());
+        return map;
+    }
+
+    @Override
+    public String toString()
+    {
+        return MoreObjects.toStringHelper(this)
+                          .add("class", klass.getName())
+                          .add("options", options)
+                          .toString();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof CompactionParams))
+            return false;
+
+        CompactionParams cp = (CompactionParams) o;
+
+        return klass.equals(cp.klass) && options.equals(cp.options);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(klass, options);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/CompressionParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java
new file mode 100644
index 0000000..a73fcd1
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/CompressionParams.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.*;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.*;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+import static java.lang.String.format;
+
+@SuppressWarnings("deprecation")
+public final class CompressionParams
+{
+    private static final Logger logger = LoggerFactory.getLogger(CompressionParams.class);
+
+    private static volatile boolean hasLoggedSsTableCompressionWarning;
+    private static volatile boolean hasLoggedChunkLengthWarning;
+
+    public static final int DEFAULT_CHUNK_LENGTH = 65536;
+    public static final double DEFAULT_CRC_CHECK_CHANCE = 1.0;
+    public static final IVersionedSerializer<CompressionParams> serializer = new Serializer();
+
+    public static final String CLASS = "class";
+    public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb";
+    public static final String ENABLED = "enabled";
+
+    public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.instance,
+                                                                          DEFAULT_CHUNK_LENGTH,
+                                                                          Collections.emptyMap());
+
+    @Deprecated public static final String SSTABLE_COMPRESSION = "sstable_compression";
+    @Deprecated public static final String CHUNK_LENGTH_KB = "chunk_length_kb";
+
+    public static final String CRC_CHECK_CHANCE = "crc_check_chance";
+
+    public static final Set<String> GLOBAL_OPTIONS = ImmutableSet.of(CRC_CHECK_CHANCE);
+
+    private final ICompressor sstableCompressor;
+    private final Integer chunkLength;
+    private volatile double crcCheckChance;
+    private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be use by the compressor
+    private CFMetaData liveMetadata;
+
+    public static CompressionParams fromMap(Map<String, String> opts)
+    {
+        Map<String, String> options = copyOptions(opts);
+
+        String sstableCompressionClass;
+
+        if (!opts.isEmpty() && isEnabled(opts) && !containsSstableCompressionClass(opts))
+            throw new ConfigurationException(format("Missing sub-option '%s' for the 'compression' option.", CLASS));
+
+        if (!removeEnabled(options))
+        {
+            sstableCompressionClass = null;
+
+            if (!options.isEmpty())
+                throw new ConfigurationException(format("If the '%s' option is set to false no other options must be specified", ENABLED));
+        }
+        else
+        {
+            sstableCompressionClass = removeSstableCompressionClass(options);
+        }
+
+        Integer chunkLength = removeChunkLength(options);
+
+        CompressionParams cp = new CompressionParams(sstableCompressionClass, chunkLength, options);
+        cp.validate();
+
+        return cp;
+    }
+
+    public Class<? extends ICompressor> klass()
+    {
+        return sstableCompressor.getClass();
+    }
+
+    public static CompressionParams noCompression()
+    {
+        return new CompressionParams((ICompressor) null, DEFAULT_CHUNK_LENGTH, Collections.emptyMap());
+    }
+
+    public static CompressionParams snappy()
+    {
+        return snappy(null);
+    }
+
+    public static CompressionParams snappy(Integer chunkLength)
+    {
+        return new CompressionParams(SnappyCompressor.instance, chunkLength, Collections.emptyMap());
+    }
+
+    public static CompressionParams deflate()
+    {
+        return deflate(null);
+    }
+
+    public static CompressionParams deflate(Integer chunkLength)
+    {
+        return new CompressionParams(DeflateCompressor.instance, chunkLength, Collections.emptyMap());
+    }
+
+    public static CompressionParams lz4()
+    {
+        return lz4(null);
+    }
+
+    public static CompressionParams lz4(Integer chunkLength)
+    {
+        return new CompressionParams(LZ4Compressor.instance, chunkLength, Collections.emptyMap());
+    }
+
+    public CompressionParams(String sstableCompressorClass, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException
+    {
+        this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, otherOptions);
+    }
+
+    private CompressionParams(ICompressor sstableCompressor, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException
+    {
+        this.sstableCompressor = sstableCompressor;
+        this.chunkLength = chunkLength;
+        this.otherOptions = ImmutableMap.copyOf(otherOptions);
+        String chance = otherOptions.get(CRC_CHECK_CHANCE);
+        this.crcCheckChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE : parseCrcCheckChance(chance);
+    }
+
+    public CompressionParams copy()
+    {
+        return new CompressionParams(sstableCompressor, chunkLength, otherOptions);
+    }
+
+    public void setLiveMetadata(final CFMetaData liveMetadata)
+    {
+        if (liveMetadata == null)
+            return;
+
+        this.liveMetadata = liveMetadata;
+    }
+
+    public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException
+    {
+        validateCrcCheckChance(crcCheckChance);
+        this.crcCheckChance = crcCheckChance;
+
+        if (liveMetadata != null && this != liveMetadata.params.compression)
+            liveMetadata.params.compression.setCrcCheckChance(crcCheckChance);
+    }
+
+    /**
+     * Checks if compression is enabled.
+     * @return {@code true} if compression is enabled, {@code false} otherwise.
+     */
+    public boolean isEnabled()
+    {
+        return sstableCompressor != null;
+    }
+
+    /**
+     * Returns the SSTable compressor.
+     * @return the SSTable compressor or {@code null} if compression is disabled.
+     */
+    public ICompressor getSstableCompressor()
+    {
+        return sstableCompressor;
+    }
+
+    public ImmutableMap<String, String> getOtherOptions()
+    {
+        return otherOptions;
+    }
+
+    public double getCrcCheckChance()
+    {
+        return liveMetadata == null ? this.crcCheckChance : liveMetadata.params.compression.crcCheckChance;
+    }
+
+    private static double parseCrcCheckChance(String crcCheckChance) throws ConfigurationException
+    {
+        try
+        {
+            double chance = Double.parseDouble(crcCheckChance);
+            validateCrcCheckChance(chance);
+            return chance;
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("crc_check_chance should be a double");
+        }
+    }
+
+    private static void validateCrcCheckChance(double crcCheckChance) throws ConfigurationException
+    {
+        if (crcCheckChance < 0.0d || crcCheckChance > 1.0d)
+            throw new ConfigurationException("crc_check_chance should be between 0.0 and 1.0");
+    }
+
+    public int chunkLength()
+    {
+        return chunkLength == null ? DEFAULT_CHUNK_LENGTH : chunkLength;
+    }
+
+    private static Class<?> parseCompressorClass(String className) throws ConfigurationException
+    {
+        if (className == null || className.isEmpty())
+            return null;
+
+        className = className.contains(".") ? className : "org.apache.cassandra.io.compress." + className;
+        try
+        {
+            return Class.forName(className);
+        }
+        catch (Exception e)
+        {
+            throw new ConfigurationException("Could not create Compression for type " + className, e);
+        }
+    }
+
+    private static ICompressor createCompressor(Class<?> compressorClass, Map<String, String> compressionOptions) throws ConfigurationException
+    {
+        if (compressorClass == null)
+        {
+            if (!compressionOptions.isEmpty())
+                throw new ConfigurationException("Unknown compression options (" + compressionOptions.keySet() + ") since no compression class found");
+            return null;
+        }
+
+        try
+        {
+            Method method = compressorClass.getMethod("create", Map.class);
+            ICompressor compressor = (ICompressor)method.invoke(null, compressionOptions);
+            // Check for unknown options
+            AbstractSet<String> supportedOpts = Sets.union(compressor.supportedOptions(), GLOBAL_OPTIONS);
+            for (String provided : compressionOptions.keySet())
+                if (!supportedOpts.contains(provided))
+                    throw new ConfigurationException("Unknown compression options " + provided);
+            return compressor;
+        }
+        catch (NoSuchMethodException e)
+        {
+            throw new ConfigurationException("create method not found", e);
+        }
+        catch (SecurityException e)
+        {
+            throw new ConfigurationException("Access forbiden", e);
+        }
+        catch (IllegalAccessException e)
+        {
+            throw new ConfigurationException("Cannot access method create in " + compressorClass.getName(), e);
+        }
+        catch (InvocationTargetException e)
+        {
+            if (e.getTargetException() instanceof ConfigurationException)
+                throw (ConfigurationException) e.getTargetException();
+
+            Throwable cause = e.getCause() == null
+                            ? e
+                            : e.getCause();
+
+            throw new ConfigurationException(format("%s.create() threw an error: %s %s",
+                                                    compressorClass.getSimpleName(),
+                                                    cause.getClass().getName(),
+                                                    cause.getMessage()),
+                                             e);
+        }
+        catch (ExceptionInInitializerError e)
+        {
+            throw new ConfigurationException("Cannot initialize class " + compressorClass.getName());
+        }
+    }
+
+    public static ICompressor createCompressor(ParameterizedClass compression) throws ConfigurationException {
+        return createCompressor(parseCompressorClass(compression.class_name), copyOptions(compression.parameters));
+    }
+
+    private static Map<String, String> copyOptions(Map<? extends CharSequence, ? extends CharSequence> co)
+    {
+        if (co == null || co.isEmpty())
+            return Collections.<String, String>emptyMap();
+
+        Map<String, String> compressionOptions = new HashMap<>();
+        for (Map.Entry<? extends CharSequence, ? extends CharSequence> entry : co.entrySet())
+            compressionOptions.put(entry.getKey().toString(), entry.getValue().toString());
+        return compressionOptions;
+    }
+
+    /**
+     * Parse the chunk length (in KB) and returns it as bytes.
+     * 
+     * @param chLengthKB the length of the chunk to parse
+     * @return the chunk length in bytes
+     * @throws ConfigurationException if the chunk size is too large
+     */
+    private static Integer parseChunkLength(String chLengthKB) throws ConfigurationException
+    {
+        if (chLengthKB == null)
+            return null;
+
+        try
+        {
+            int parsed = Integer.parseInt(chLengthKB);
+            if (parsed > Integer.MAX_VALUE / 1024)
+                throw new ConfigurationException(format("Value of %s is too large (%s)", CHUNK_LENGTH_IN_KB,parsed));
+            return 1024 * parsed;
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH_IN_KB, e);
+        }
+    }
+
+    /**
+     * Removes the chunk length option from the specified set of option.
+     *
+     * @param options the options
+     * @return the chunk length value
+     */
+    private static Integer removeChunkLength(Map<String, String> options)
+    {
+        if (options.containsKey(CHUNK_LENGTH_IN_KB))
+        {
+            if (options.containsKey(CHUNK_LENGTH_KB))
+            {
+                throw new ConfigurationException(format("The '%s' option must not be used if the chunk length is already specified by the '%s' option",
+                                                        CHUNK_LENGTH_KB,
+                                                        CHUNK_LENGTH_IN_KB));
+            }
+
+            return parseChunkLength(options.remove(CHUNK_LENGTH_IN_KB));
+        }
+
+        if (options.containsKey(CHUNK_LENGTH_KB))
+        {
+            if (options.containsKey(CHUNK_LENGTH_KB) && !hasLoggedChunkLengthWarning)
+            {
+                hasLoggedChunkLengthWarning = true;
+                logger.warn(format("The %s option has been deprecated. You should use %s instead",
+                                   CHUNK_LENGTH_KB,
+                                   CHUNK_LENGTH_IN_KB));
+            }
+
+            return parseChunkLength(options.remove(CHUNK_LENGTH_KB));
+        }
+
+        return null;
+    }
+
+    /**
+     * Returns {@code true} if the specified options contains the name of the compression class to be used,
+     * {@code false} otherwise.
+     *
+     * @param options the options
+     * @return {@code true} if the specified options contains the name of the compression class to be used,
+     * {@code false} otherwise.
+     */
+    public static boolean containsSstableCompressionClass(Map<String, String> options)
+    {
+        return options.containsKey(CLASS) || options.containsKey(SSTABLE_COMPRESSION);
+    }
+
+    /**
+     * Removes the option specifying the name of the compression class
+     *
+     * @param options the options
+     * @return the name of the compression class
+     */
+    private static String removeSstableCompressionClass(Map<String, String> options)
+    {
+        if (options.containsKey(CLASS))
+        {
+            if (options.containsKey(SSTABLE_COMPRESSION))
+                throw new ConfigurationException(format("The '%s' option must not be used if the compression algorithm is already specified by the '%s' option",
+                                                        SSTABLE_COMPRESSION,
+                                                        CLASS));
+
+            String clazz = options.remove(CLASS);
+            if (clazz.isEmpty())
+                throw new ConfigurationException(format("The '%s' option must not be empty. To disable compression use 'enabled' : false", CLASS));
+
+            return clazz;
+        }
+
+        if (options.containsKey(SSTABLE_COMPRESSION) && !hasLoggedSsTableCompressionWarning)
+        {
+            hasLoggedSsTableCompressionWarning = true;
+            logger.warn(format("The %s option has been deprecated. You should use %s instead",
+                               SSTABLE_COMPRESSION,
+                               CLASS));
+        }
+
+        return options.remove(SSTABLE_COMPRESSION);
+    }
+
+    /**
+     * Returns {@code true} if the options contains the {@code enabled} option and that its value is
+     * {@code true}, otherwise returns {@code false}.
+     *
+     * @param options the options
+     * @return {@code true} if the options contains the {@code enabled} option and that its value is
+     * {@code true}, otherwise returns {@code false}.
+     */
+    public static boolean isEnabled(Map<String, String> options)
+    {
+        String enabled = options.get(ENABLED);
+        return enabled == null || Boolean.parseBoolean(enabled);
+    }
+
+    /**
+     * Removes the {@code enabled} option from the specified options.
+     *
+     * @param options the options
+     * @return the value of the {@code enabled} option
+     */
+    private static boolean removeEnabled(Map<String, String> options)
+    {
+        String enabled = options.remove(ENABLED);
+        return enabled == null || Boolean.parseBoolean(enabled);
+    }
+
+    // chunkLength must be a power of 2 because we assume so when
+    // computing the chunk number from an uncompressed file offset (see
+    // CompressedRandomAccessReader.decompresseChunk())
+    public void validate() throws ConfigurationException
+    {
+        // if chunk length was not set (chunkLength == null), this is fine, default will be used
+        if (chunkLength != null)
+        {
+            if (chunkLength <= 0)
+                throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH_IN_KB);
+
+            int c = chunkLength;
+            boolean found = false;
+            while (c != 0)
+            {
+                if ((c & 0x01) != 0)
+                {
+                    if (found)
+                        throw new ConfigurationException(CHUNK_LENGTH_IN_KB + " must be a power of 2");
+                    else
+                        found = true;
+                }
+                c >>= 1;
+            }
+        }
+
+        validateCrcCheckChance(crcCheckChance);
+    }
+
+    public Map<String, String> asMap()
+    {
+        if (!isEnabled())
+            return Collections.singletonMap(ENABLED, "false");
+
+        Map<String, String> options = new HashMap<>(otherOptions);
+        options.put(CLASS, sstableCompressor.getClass().getName());
+        options.put(CHUNK_LENGTH_IN_KB, chunkLengthInKB());
+
+        return options;
+    }
+
+    public String chunkLengthInKB()
+    {
+        return String.valueOf(chunkLength() / 1024);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (obj == this)
+        {
+            return true;
+        }
+        else if (obj == null || obj.getClass() != getClass())
+        {
+            return false;
+        }
+
+        CompressionParams cp = (CompressionParams) obj;
+        return new EqualsBuilder()
+            .append(sstableCompressor, cp.sstableCompressor)
+            .append(chunkLength(), cp.chunkLength())
+            .append(otherOptions, cp.otherOptions)
+            .isEquals();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return new HashCodeBuilder(29, 1597)
+            .append(sstableCompressor)
+            .append(chunkLength())
+            .append(otherOptions)
+            .toHashCode();
+    }
+
+    static class Serializer implements IVersionedSerializer<CompressionParams>
+    {
+        public void serialize(CompressionParams parameters, DataOutputPlus out, int version) throws IOException
+        {
+            out.writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
+            out.writeInt(parameters.otherOptions.size());
+            for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
+            {
+                out.writeUTF(entry.getKey());
+                out.writeUTF(entry.getValue());
+            }
+            out.writeInt(parameters.chunkLength());
+        }
+
+        public CompressionParams deserialize(DataInputPlus in, int version) throws IOException
+        {
+            String compressorName = in.readUTF();
+            int optionCount = in.readInt();
+            Map<String, String> options = new HashMap<>();
+            for (int i = 0; i < optionCount; ++i)
+            {
+                String key = in.readUTF();
+                String value = in.readUTF();
+                options.put(key, value);
+            }
+            int chunkLength = in.readInt();
+            CompressionParams parameters;
+            try
+            {
+                parameters = new CompressionParams(compressorName, chunkLength, options);
+            }
+            catch (ConfigurationException e)
+            {
+                throw new RuntimeException("Cannot create CompressionParams for parameters", e);
+            }
+            return parameters;
+        }
+
+        public long serializedSize(CompressionParams parameters, int version)
+        {
+            long size = TypeSizes.sizeof(parameters.sstableCompressor.getClass().getSimpleName());
+            size += TypeSizes.sizeof(parameters.otherOptions.size());
+            for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
+            {
+                size += TypeSizes.sizeof(entry.getKey());
+                size += TypeSizes.sizeof(entry.getValue());
+            }
+            size += TypeSizes.sizeof(parameters.chunkLength());
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/KeyspaceParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceParams.java b/src/java/org/apache/cassandra/schema/KeyspaceParams.java
index a8de2bd..6cdf27f 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceParams.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceParams.java
@@ -17,15 +17,9 @@
  */
 package org.apache.cassandra.schema;
 
-import java.util.HashMap;
 import java.util.Map;
 
 import com.google.common.base.Objects;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.service.StorageService;
 
 /**
  * An immutable class representing keyspace parameters (durability and replication).
@@ -47,9 +41,9 @@ public final class KeyspaceParams
     }
 
     public final boolean durableWrites;
-    public final Replication replication;
+    public final ReplicationParams replication;
 
-    public KeyspaceParams(boolean durableWrites, Replication replication)
+    public KeyspaceParams(boolean durableWrites, ReplicationParams replication)
     {
         this.durableWrites = durableWrites;
         this.replication = replication;
@@ -57,22 +51,22 @@ public final class KeyspaceParams
 
     public static KeyspaceParams create(boolean durableWrites, Map<String, String> replication)
     {
-        return new KeyspaceParams(durableWrites, Replication.fromMap(replication));
+        return new KeyspaceParams(durableWrites, ReplicationParams.fromMap(replication));
     }
 
     public static KeyspaceParams local()
     {
-        return new KeyspaceParams(true, Replication.local());
+        return new KeyspaceParams(true, ReplicationParams.local());
     }
 
     public static KeyspaceParams simple(int replicationFactor)
     {
-        return new KeyspaceParams(true, Replication.simple(replicationFactor));
+        return new KeyspaceParams(true, ReplicationParams.simple(replicationFactor));
     }
 
     public static KeyspaceParams simpleTransient(int replicationFactor)
     {
-        return new KeyspaceParams(false, Replication.simple(replicationFactor));
+        return new KeyspaceParams(false, ReplicationParams.simple(replicationFactor));
     }
 
     public void validate(String name)
@@ -108,81 +102,4 @@ public final class KeyspaceParams
                       .add(Option.REPLICATION.toString(), replication)
                       .toString();
     }
-
-    public static final class Replication
-    {
-        public static String CLASS = "class";
-
-        public final Class<? extends AbstractReplicationStrategy> klass;
-        public final ImmutableMap<String, String> options;
-
-        private Replication(Class<? extends AbstractReplicationStrategy> klass, Map<String, String> options)
-        {
-            this.klass = klass;
-            this.options = ImmutableMap.copyOf(options);
-        }
-
-        private static Replication local()
-        {
-            return new Replication(LocalStrategy.class, ImmutableMap.of());
-        }
-
-        private static Replication simple(int replicationFactor)
-        {
-            return new Replication(SimpleStrategy.class, ImmutableMap.of("replication_factor", Integer.toString(replicationFactor)));
-        }
-
-        public void validate(String name)
-        {
-            // Attempt to instantiate the ARS, which will throw a ConfigurationException if the options aren't valid.
-            TokenMetadata tmd = StorageService.instance.getTokenMetadata();
-            IEndpointSnitch eps = DatabaseDescriptor.getEndpointSnitch();
-            AbstractReplicationStrategy.validateReplicationStrategy(name, klass, tmd, eps, options);
-        }
-
-        public static Replication fromMap(Map<String, String> map)
-        {
-            Map<String, String> options = new HashMap<>(map);
-            String className = options.remove(CLASS);
-            Class<? extends AbstractReplicationStrategy> klass = AbstractReplicationStrategy.getClass(className);
-            return new Replication(klass, options);
-        }
-
-        public Map<String, String> asMap()
-        {
-            Map<String, String> map = new HashMap<>(options);
-            map.put(CLASS, klass.getName());
-            return map;
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o)
-                return true;
-
-            if (!(o instanceof Replication))
-                return false;
-
-            Replication r = (Replication) o;
-
-            return klass.equals(r.klass) && options.equals(r.options);
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return Objects.hashCode(klass, options);
-        }
-
-        @Override
-        public String toString()
-        {
-            Objects.ToStringHelper helper = Objects.toStringHelper(this);
-            helper.add(CLASS, klass.getName());
-            for (Map.Entry<String, String> entry : options.entrySet())
-                helper.add(entry.getKey(), entry.getValue());
-            return helper.toString();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 41da481..8dac03b 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -26,18 +26,17 @@ import com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -189,7 +188,7 @@ public final class LegacySchemaMigrator
 
         Map<String, String> replication = new HashMap<>();
         replication.putAll(fromJsonMap(row.getString("strategy_options")));
-        replication.put(KeyspaceParams.Replication.CLASS, row.getString("strategy_class"));
+        replication.put(ReplicationParams.CLASS, row.getString("strategy_class"));
 
         return KeyspaceParams.create(durableWrites, replication);
     }
@@ -317,41 +316,86 @@ public final class LegacySchemaMigrator
                                            columnDefs,
                                            DatabaseDescriptor.getPartitioner());
 
-        cfm.readRepairChance(tableRow.getDouble("read_repair_chance"));
-        cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance"));
-        cfm.gcGraceSeconds(tableRow.getInt("gc_grace_seconds"));
-        cfm.minCompactionThreshold(tableRow.getInt("min_compaction_threshold"));
-        cfm.maxCompactionThreshold(tableRow.getInt("max_compaction_threshold"));
-        if (tableRow.has("comment"))
-            cfm.comment(tableRow.getString("comment"));
-        if (tableRow.has("memtable_flush_period_in_ms"))
-            cfm.memtableFlushPeriod(tableRow.getInt("memtable_flush_period_in_ms"));
-        cfm.caching(CachingOptions.fromString(tableRow.getString("caching")));
-        if (tableRow.has("default_time_to_live"))
-            cfm.defaultTimeToLive(tableRow.getInt("default_time_to_live"));
-        if (tableRow.has("speculative_retry"))
-            cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(tableRow.getString("speculative_retry")));
-        cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(tableRow.getString("compaction_strategy_class")));
-        cfm.compressionParameters(CompressionParameters.fromMap(fromJsonMap(tableRow.getString("compression_parameters"))));
-        cfm.compactionStrategyOptions(fromJsonMap(tableRow.getString("compaction_strategy_options")));
-
-        if (tableRow.has("min_index_interval"))
-            cfm.minIndexInterval(tableRow.getInt("min_index_interval"));
-
-        if (tableRow.has("max_index_interval"))
-            cfm.maxIndexInterval(tableRow.getInt("max_index_interval"));
-
-        if (tableRow.has("bloom_filter_fp_chance"))
-            cfm.bloomFilterFpChance(tableRow.getDouble("bloom_filter_fp_chance"));
-        else
-            cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
-
         if (tableRow.has("dropped_columns"))
             addDroppedColumns(cfm, rawComparator, tableRow.getMap("dropped_columns", UTF8Type.instance, LongType.instance));
 
-        cfm.triggers(createTriggersFromTriggerRows(triggerRows));
+        return cfm.params(decodeTableParams(tableRow))
+                  .triggers(createTriggersFromTriggerRows(triggerRows));
+    }
+
+    private static TableParams decodeTableParams(UntypedResultSet.Row row)
+    {
+        TableParams.Builder params = TableParams.builder();
+
+        params.readRepairChance(row.getDouble("read_repair_chance"))
+              .dcLocalReadRepairChance(row.getDouble("local_read_repair_chance"))
+              .gcGraceSeconds(row.getInt("gc_grace_seconds"));
+
+        if (row.has("comment"))
+            params.comment(row.getString("comment"));
+
+        if (row.has("memtable_flush_period_in_ms"))
+            params.memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"));
+
+        params.caching(CachingParams.fromMap(fromJsonMap(row.getString("caching"))));
+
+        if (row.has("default_time_to_live"))
+            params.defaultTimeToLive(row.getInt("default_time_to_live"));
+
+        if (row.has("speculative_retry"))
+            params.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
+
+        params.compression(CompressionParams.fromMap(fromJsonMap(row.getString("compression_parameters"))));
+
+        params.compaction(compactionFromRow(row));
+
+        if (row.has("min_index_interval"))
+            params.minIndexInterval(row.getInt("min_index_interval"));
+
+        if (row.has("max_index_interval"))
+            params.maxIndexInterval(row.getInt("max_index_interval"));
+
+        if (row.has("bloom_filter_fp_chance"))
+            params.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"));
+
+        return params.build();
+    }
 
-        return cfm;
+    /*
+     * The method is needed - to migrate max_compaction_threshold and min_compaction_threshold
+     * to the compaction map, where they belong.
+     *
+     * We must use reflection to validate the options because not every compaction strategy respects and supports
+     * the threshold params (LCS doesn't, STCS and DTCS do).
+     */
+    @SuppressWarnings("unchecked")
+    private static CompactionParams compactionFromRow(UntypedResultSet.Row row)
+    {
+        Class<? extends AbstractCompactionStrategy> klass =
+            CFMetaData.createCompactionStrategy(row.getString("compaction_strategy_class"));
+        Map<String, String> options = fromJsonMap(row.getString("compaction_strategy_options"));
+
+        int minThreshold = row.getInt("min_compaction_threshold");
+        int maxThreshold = row.getInt("max_compaction_threshold");
+
+        Map<String, String> optionsWithThresholds = new HashMap<>(options);
+        optionsWithThresholds.putIfAbsent(CompactionParams.Option.MIN_THRESHOLD.toString(), Integer.toString(minThreshold));
+        optionsWithThresholds.putIfAbsent(CompactionParams.Option.MAX_THRESHOLD.toString(), Integer.toString(maxThreshold));
+
+        try
+        {
+            Map<String, String> unrecognizedOptions =
+                (Map<String, String>) klass.getMethod("validateOptions", Map.class).invoke(null, optionsWithThresholds);
+
+            if (unrecognizedOptions.isEmpty())
+                options = optionsWithThresholds;
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        return CompactionParams.create(klass, options);
     }
 
     // Should only be called on compact tables
@@ -627,10 +671,7 @@ public final class LegacySchemaMigrator
                               SystemKeyspace.NAME,
                               SystemKeyspace.LEGACY_FUNCTIONS);
         HashMultimap<String, List<String>> functionSignatures = HashMultimap.create();
-        query(query, keyspaceName).forEach(row ->
-        {
-            functionSignatures.put(row.getString("function_name"), row.getList("signature", UTF8Type.instance));
-        });
+        query(query, keyspaceName).forEach(row -> functionSignatures.put(row.getString("function_name"), row.getList("signature", UTF8Type.instance)));
 
         Collection<Function> functions = new ArrayList<>();
         functionSignatures.entries().forEach(pair -> functions.add(readFunction(keyspaceName, pair.getKey(), pair.getValue())));
@@ -699,10 +740,7 @@ public final class LegacySchemaMigrator
                               SystemKeyspace.NAME,
                               SystemKeyspace.LEGACY_AGGREGATES);
         HashMultimap<String, List<String>> aggregateSignatures = HashMultimap.create();
-        query(query, keyspaceName).forEach(row ->
-        {
-            aggregateSignatures.put(row.getString("aggregate_name"), row.getList("signature", UTF8Type.instance));
-        });
+        query(query, keyspaceName).forEach(row -> aggregateSignatures.put(row.getString("aggregate_name"), row.getList("signature", UTF8Type.instance)));
 
         Collection<Aggregate> aggregates = new ArrayList<>();
         aggregateSignatures.entries().forEach(pair -> aggregates.add(readAggregate(keyspaceName, pair.getKey(), pair.getValue())));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/ReplicationParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/ReplicationParams.java b/src/java/org/apache/cassandra/schema/ReplicationParams.java
new file mode 100644
index 0000000..cdeb4c2
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/ReplicationParams.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.*;
+import org.apache.cassandra.service.StorageService;
+
+public final class ReplicationParams
+{
+    public static final String CLASS = "class";
+
+    public final Class<? extends AbstractReplicationStrategy> klass;
+    public final ImmutableMap<String, String> options;
+
+    private ReplicationParams(Class<? extends AbstractReplicationStrategy> klass, Map<String, String> options)
+    {
+        this.klass = klass;
+        this.options = ImmutableMap.copyOf(options);
+    }
+
+    static ReplicationParams local()
+    {
+        return new ReplicationParams(LocalStrategy.class, ImmutableMap.of());
+    }
+
+    static ReplicationParams simple(int replicationFactor)
+    {
+        return new ReplicationParams(SimpleStrategy.class, ImmutableMap.of("replication_factor", Integer.toString(replicationFactor)));
+    }
+
+    public void validate(String name)
+    {
+        // Attempt to instantiate the ARS, which will throw a ConfigurationException if the options aren't valid.
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        IEndpointSnitch eps = DatabaseDescriptor.getEndpointSnitch();
+        AbstractReplicationStrategy.validateReplicationStrategy(name, klass, tmd, eps, options);
+    }
+
+    public static ReplicationParams fromMap(Map<String, String> map)
+    {
+        Map<String, String> options = new HashMap<>(map);
+        String className = options.remove(CLASS);
+        Class<? extends AbstractReplicationStrategy> klass = AbstractReplicationStrategy.getClass(className);
+        return new ReplicationParams(klass, options);
+    }
+
+    public Map<String, String> asMap()
+    {
+        Map<String, String> map = new HashMap<>(options);
+        map.put(CLASS, klass.getName());
+        return map;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof ReplicationParams))
+            return false;
+
+        ReplicationParams r = (ReplicationParams) o;
+
+        return klass.equals(r.klass) && options.equals(r.options);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(klass, options);
+    }
+
+    @Override
+    public String toString()
+    {
+        MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
+        helper.add(CLASS, klass.getName());
+        for (Map.Entry<String, String> entry : options.entrySet())
+            helper.add(entry.getKey(), entry.getValue());
+        return helper.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index ba6a2e1..5791db7 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -31,21 +31,18 @@ import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.cql3.statements.CFPropDefs;
+import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -819,20 +816,9 @@ public final class SchemaKeyspace
     {
         RowUpdateBuilder adder = new RowUpdateBuilder(Tables, timestamp, mutation).clustering(table.cfName);
 
-        adder.add("bloom_filter_fp_chance", table.getBloomFilterFpChance())
-             .add("comment", table.getComment())
-             .add("dclocal_read_repair_chance", table.getDcLocalReadRepairChance())
-             .add("default_time_to_live", table.getDefaultTimeToLive())
-             .add("gc_grace_seconds", table.getGcGraceSeconds())
-             .add("id", table.cfId)
-             .add("max_index_interval", table.getMaxIndexInterval())
-             .add("memtable_flush_period_in_ms", table.getMemtableFlushPeriod())
-             .add("min_index_interval", table.getMinIndexInterval())
-             .add("read_repair_chance", table.getReadRepairChance())
-             .add("speculative_retry", table.getSpeculativeRetry().toString())
-             .map("caching", table.getCaching().asMap())
-             .map("compaction", buildCompactionMap(table))
-             .map("compression", table.compressionParameters().asMap())
+        addTableParamsToSchemaMutation(table.params, adder);
+
+        adder.add("id", table.cfId)
              .set("flags", CFMetaData.flagsToStrings(table.flags()))
              .build();
 
@@ -852,38 +838,21 @@ public final class SchemaKeyspace
         }
     }
 
-    /*
-     * The method is needed - temporarily - to migrate max_compaction_threshold and min_compaction_threshold
-     * to the compaction map, where they belong.
-     *
-     * We must use reflection to validate the options because not every compaction strategy respects and supports
-     * the threshold params (LCS doesn't, STCS and DTCS don't).
-     */
-    @SuppressWarnings("unchecked")
-    private static Map<String, String> buildCompactionMap(CFMetaData cfm)
+    private static void addTableParamsToSchemaMutation(TableParams params, RowUpdateBuilder adder)
     {
-        Map<String, String> options = new HashMap<>(cfm.compactionStrategyOptions);
-
-        Map<String, String> optionsWithThresholds = new HashMap<>(options);
-        options.putIfAbsent(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, Integer.toString(cfm.getMinCompactionThreshold()));
-        options.putIfAbsent(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, Integer.toString(cfm.getMaxCompactionThreshold()));
-
-        try
-        {
-            Map<String, String> unrecognizedOptions = (Map<String, String>) cfm.compactionStrategyClass
-                                                                               .getMethod("validateOptions", Map.class)
-                                                                               .invoke(null, optionsWithThresholds);
-            if (unrecognizedOptions.isEmpty())
-                options = optionsWithThresholds;
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        options.put("class", cfm.compactionStrategyClass.getName());
-
-        return options;
+        adder.add("bloom_filter_fp_chance", params.bloomFilterFpChance)
+             .add("comment", params.comment)
+             .add("dclocal_read_repair_chance", params.dcLocalReadRepairChance)
+             .add("default_time_to_live", params.defaultTimeToLive)
+             .add("gc_grace_seconds", params.gcGraceSeconds)
+             .add("max_index_interval", params.maxIndexInterval)
+             .add("memtable_flush_period_in_ms", params.memtableFlushPeriodInMs)
+             .add("min_index_interval", params.minIndexInterval)
+             .add("read_repair_chance", params.readRepairChance)
+             .add("speculative_retry", params.speculativeRetry.toString())
+             .map("caching", params.caching.asMap())
+             .map("compaction", params.compaction.asMap())
+             .map("compression", params.compression.asMap());
     }
 
     public static Mutation makeUpdateTableMutation(KeyspaceMetadata keyspace,
@@ -1085,49 +1054,38 @@ public final class SchemaKeyspace
         boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
         boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
         boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
-        boolean isMaterializedView = flags.contains(CFMetaData.Flag.MATERIALIZEDVIEW);
-
-        CFMetaData cfm = CFMetaData.create(keyspace,
-                                           table,
-                                           id,
-                                           isDense,
-                                           isCompound,
-                                           isSuper,
-                                           isCounter,
-                                           isMaterializedView,
-                                           columns,
-                                           DatabaseDescriptor.getPartitioner());
-
-        Map<String, String> compaction = new HashMap<>(row.getTextMap("compaction"));
-        Class<? extends AbstractCompactionStrategy> compactionStrategyClass =
-            CFMetaData.createCompactionStrategy(compaction.remove("class"));
-
-        int minCompactionThreshold = compaction.containsKey(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD)
-                                   ? Integer.parseInt(compaction.get(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD))
-                                   : CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD;
-
-        int maxCompactionThreshold = compaction.containsKey(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD)
-                                   ? Integer.parseInt(compaction.get(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD))
-                                   : CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD;
-
-        cfm.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"))
-           .caching(CachingOptions.fromMap(row.getTextMap("caching")))
-           .comment(row.getString("comment"))
-           .compactionStrategyClass(compactionStrategyClass)
-           .compactionStrategyOptions(compaction)
-           .compressionParameters(CompressionParameters.fromMap(row.getTextMap("compression")))
-           .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance"))
-           .defaultTimeToLive(row.getInt("default_time_to_live"))
-           .gcGraceSeconds(row.getInt("gc_grace_seconds"))
-           .maxCompactionThreshold(maxCompactionThreshold)
-           .maxIndexInterval(row.getInt("max_index_interval"))
-           .memtableFlushPeriod(row.getInt("memtable_flush_period_in_ms"))
-           .minCompactionThreshold(minCompactionThreshold)
-           .minIndexInterval(row.getInt("min_index_interval"))
-           .readRepairChance(row.getDouble("read_repair_chance"))
-           .speculativeRetry(CFMetaData.SpeculativeRetry.fromString(row.getString("speculative_retry")));
-
-        return cfm;
+        boolean isMaterializedView = flags.contains(CFMetaData.Flag.VIEW);
+
+        return CFMetaData.create(keyspace,
+                                 table,
+                                 id,
+                                 isDense,
+                                 isCompound,
+                                 isSuper,
+                                 isCounter,
+                                 isMaterializedView,
+                                 columns,
+                                 DatabaseDescriptor.getPartitioner())
+                         .params(createTableParamsFromRow(row));
+    }
+
+    private static TableParams createTableParamsFromRow(UntypedResultSet.Row row)
+    {
+        return TableParams.builder()
+                          .bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"))
+                          .caching(CachingParams.fromMap(row.getTextMap("caching")))
+                          .comment(row.getString("comment"))
+                          .compaction(CompactionParams.fromMap(row.getTextMap("compaction")))
+                          .compression(CompressionParams.fromMap(row.getTextMap("compression")))
+                          .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance"))
+                          .defaultTimeToLive(row.getInt("default_time_to_live"))
+                          .gcGraceSeconds(row.getInt("gc_grace_seconds"))
+                          .maxIndexInterval(row.getInt("max_index_interval"))
+                          .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"))
+                          .minIndexInterval(row.getInt("min_index_interval"))
+                          .readRepairChance(row.getDouble("read_repair_chance"))
+                          .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")))
+                          .build();
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/SpeculativeRetryParam.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SpeculativeRetryParam.java b/src/java/org/apache/cassandra/schema/SpeculativeRetryParam.java
new file mode 100644
index 0000000..58c6375
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SpeculativeRetryParam.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.text.DecimalFormat;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+import static java.lang.String.format;
+
+public final class SpeculativeRetryParam
+{
+    public enum Kind
+    {
+        NONE, CUSTOM, PERCENTILE, ALWAYS
+    }
+
+    public static final SpeculativeRetryParam NONE = none();
+    public static final SpeculativeRetryParam ALWAYS = always();
+    public static final SpeculativeRetryParam DEFAULT = percentile(99);
+
+    private final Kind kind;
+    private final double value;
+
+    // pre-processed (divided by 100 for PERCENTILE), multiplied by 1M for CUSTOM (to nanos)
+    private final double threshold;
+
+    private SpeculativeRetryParam(Kind kind, double value)
+    {
+        this.kind = kind;
+        this.value = value;
+
+        if (kind == Kind.PERCENTILE)
+            threshold = value / 100;
+        else if (kind == Kind.CUSTOM)
+            threshold = TimeUnit.MILLISECONDS.toNanos((long) value);
+        else
+            threshold = value;
+    }
+
+    public Kind kind()
+    {
+        return kind;
+    }
+
+    public double threshold()
+    {
+        return threshold;
+    }
+
+    public static SpeculativeRetryParam none()
+    {
+        return new SpeculativeRetryParam(Kind.NONE, 0);
+    }
+
+    public static SpeculativeRetryParam always()
+    {
+        return new SpeculativeRetryParam(Kind.ALWAYS, 0);
+    }
+
+    public static SpeculativeRetryParam custom(double value)
+    {
+        return new SpeculativeRetryParam(Kind.CUSTOM, value);
+    }
+
+    public static SpeculativeRetryParam percentile(double value)
+    {
+        return new SpeculativeRetryParam(Kind.PERCENTILE, value);
+    }
+
+    public static SpeculativeRetryParam fromString(String value)
+    {
+        if (value.toLowerCase().endsWith("ms"))
+        {
+            try
+            {
+                return custom(Double.parseDouble(value.substring(0, value.length() - "ms".length())));
+            }
+            catch (IllegalArgumentException e)
+            {
+                throw new ConfigurationException(format("Invalid value %s for option '%s'", value, TableParams.Option.SPECULATIVE_RETRY));
+            }
+        }
+
+        if (value.toUpperCase().endsWith(Kind.PERCENTILE.toString()))
+        {
+            double threshold;
+            try
+            {
+                threshold = Double.parseDouble(value.substring(0, value.length() - Kind.PERCENTILE.toString().length()));
+            }
+            catch (IllegalArgumentException e)
+            {
+                throw new ConfigurationException(format("Invalid value %s for option '%s'", value, TableParams.Option.SPECULATIVE_RETRY));
+            }
+
+            if (threshold >= 0.0 && threshold <= 100.0)
+                return percentile(threshold);
+
+            throw new ConfigurationException(format("Invalid value %s for PERCENTILE option '%s': must be between 0.0 and 100.0",
+                                                    value,
+                                                    TableParams.Option.SPECULATIVE_RETRY));
+        }
+
+        if (value.equals(Kind.NONE.toString()))
+            return NONE;
+
+        if (value.equals(Kind.ALWAYS.toString()))
+            return ALWAYS;
+
+        throw new ConfigurationException(format("Invalid value %s for option '%s'", value, TableParams.Option.SPECULATIVE_RETRY));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof SpeculativeRetryParam))
+            return false;
+        SpeculativeRetryParam srp = (SpeculativeRetryParam) o;
+        return kind == srp.kind && threshold == srp.threshold;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(kind, threshold);
+    }
+
+    @Override
+    public String toString()
+    {
+        switch (kind)
+        {
+            case CUSTOM:
+                return format("%sms", value);
+            case PERCENTILE:
+                return format("%sPERCENTILE", new DecimalFormat("#.#####").format(value));
+            default: // NONE and ALWAYS
+                return kind.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java
new file mode 100644
index 0000000..3b3a88e
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+import static java.lang.String.format;
+
+public final class TableParams
+{
+    public static final TableParams DEFAULT = TableParams.builder().build();
+
+    public enum Option
+    {
+        BLOOM_FILTER_FP_CHANCE,
+        CACHING,
+        COMMENT,
+        COMPACTION,
+        COMPRESSION,
+        DCLOCAL_READ_REPAIR_CHANCE,
+        DEFAULT_TIME_TO_LIVE,
+        GC_GRACE_SECONDS,
+        MAX_INDEX_INTERVAL,
+        MEMTABLE_FLUSH_PERIOD_IN_MS,
+        MIN_INDEX_INTERVAL,
+        READ_REPAIR_CHANCE,
+        SPECULATIVE_RETRY;
+
+        @Override
+        public String toString()
+        {
+            return name().toLowerCase();
+        }
+    }
+
+    public static final String DEFAULT_COMMENT = "";
+    public static final double DEFAULT_READ_REPAIR_CHANCE = 0.0;
+    public static final double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.1;
+    public static final int DEFAULT_GC_GRACE_SECONDS = 864000; // 10 days
+    public static final int DEFAULT_DEFAULT_TIME_TO_LIVE = 0;
+    public static final int DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS = 0;
+    public static final int DEFAULT_MIN_INDEX_INTERVAL = 128;
+    public static final int DEFAULT_MAX_INDEX_INTERVAL = 2048;
+
+    public final String comment;
+    public final double readRepairChance;
+    public final double dcLocalReadRepairChance;
+    public final double bloomFilterFpChance;
+    public final int gcGraceSeconds;
+    public final int defaultTimeToLive;
+    public final int memtableFlushPeriodInMs;
+    public final int minIndexInterval;
+    public final int maxIndexInterval;
+    public final SpeculativeRetryParam speculativeRetry;
+    public final CachingParams caching;
+    public final CompactionParams compaction;
+    public final CompressionParams compression;
+
+    private TableParams(Builder builder)
+    {
+        comment = builder.comment;
+        readRepairChance = builder.readRepairChance;
+        dcLocalReadRepairChance = builder.dcLocalReadRepairChance;
+        bloomFilterFpChance = builder.bloomFilterFpChance == null
+                            ? builder.compaction.defaultBloomFilterFbChance()
+                            : builder.bloomFilterFpChance;
+        gcGraceSeconds = builder.gcGraceSeconds;
+        defaultTimeToLive = builder.defaultTimeToLive;
+        memtableFlushPeriodInMs = builder.memtableFlushPeriodInMs;
+        minIndexInterval = builder.minIndexInterval;
+        maxIndexInterval = builder.maxIndexInterval;
+        speculativeRetry = builder.speculativeRetry;
+        caching = builder.caching;
+        compaction = builder.compaction;
+        compression = builder.compression;
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static Builder builder(TableParams params)
+    {
+        return new Builder().bloomFilterFpChance(params.bloomFilterFpChance)
+                            .caching(params.caching)
+                            .comment(params.comment)
+                            .compaction(params.compaction)
+                            .compression(params.compression)
+                            .dcLocalReadRepairChance(params.dcLocalReadRepairChance)
+                            .defaultTimeToLive(params.defaultTimeToLive)
+                            .gcGraceSeconds(params.gcGraceSeconds)
+                            .maxIndexInterval(params.maxIndexInterval)
+                            .memtableFlushPeriodInMs(params.memtableFlushPeriodInMs)
+                            .minIndexInterval(params.minIndexInterval)
+                            .readRepairChance(params.readRepairChance)
+                            .speculativeRetry(params.speculativeRetry);
+    }
+
+    public void validate()
+    {
+        compaction.validate();
+        compression.validate();
+
+        if (bloomFilterFpChance <= 0 || bloomFilterFpChance > 1)
+        {
+            fail("%s must be larger than 0.0 and less than or equal to 1.0 (got %s)",
+                 Option.BLOOM_FILTER_FP_CHANCE,
+                 bloomFilterFpChance);
+        }
+
+        if (dcLocalReadRepairChance < 0 || dcLocalReadRepairChance > 1.0)
+        {
+            fail("%s must be larger than or equal to 0 and smaller than or equal to 1.0 (got %s)",
+                 Option.DCLOCAL_READ_REPAIR_CHANCE,
+                 dcLocalReadRepairChance);
+        }
+
+        if (readRepairChance < 0 || readRepairChance > 1.0)
+        {
+            fail("%s must be larger than or equal to 0 and smaller than or equal to 1.0 (got %s)",
+                 Option.READ_REPAIR_CHANCE,
+                 readRepairChance);
+        }
+
+        if (defaultTimeToLive < 0)
+            fail("%s must be greater than or equal to 0 (got %s)", Option.DEFAULT_TIME_TO_LIVE, defaultTimeToLive);
+
+        if (gcGraceSeconds < 0)
+            fail("%s must be greater than or equal to 0 (got %s)", Option.GC_GRACE_SECONDS, gcGraceSeconds);
+
+        if (minIndexInterval < 1)
+            fail("%s must be greater than or equal to 1 (got %s)", Option.MIN_INDEX_INTERVAL, minIndexInterval);
+
+        if (maxIndexInterval < minIndexInterval)
+        {
+            fail("%s must be greater than or equal to %s (%s) (got %s)",
+                 Option.MAX_INDEX_INTERVAL,
+                 Option.MIN_INDEX_INTERVAL,
+                 minIndexInterval,
+                 maxIndexInterval);
+        }
+
+        if (memtableFlushPeriodInMs < 0)
+            fail("%s must be greater than or equal to 0 (got %s)", Option.MEMTABLE_FLUSH_PERIOD_IN_MS, memtableFlushPeriodInMs);
+    }
+
+    private static void fail(String format, Object... args)
+    {
+        throw new ConfigurationException(format(format, args));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof TableParams))
+            return false;
+
+        TableParams p = (TableParams) o;
+
+        return comment.equals(p.comment)
+            && readRepairChance == p.readRepairChance
+            && dcLocalReadRepairChance == p.dcLocalReadRepairChance
+            && bloomFilterFpChance == p.bloomFilterFpChance
+            && gcGraceSeconds == p.gcGraceSeconds
+            && defaultTimeToLive == p.defaultTimeToLive
+            && memtableFlushPeriodInMs == p.memtableFlushPeriodInMs
+            && minIndexInterval == p.minIndexInterval
+            && maxIndexInterval == p.maxIndexInterval
+            && speculativeRetry.equals(p.speculativeRetry)
+            && caching.equals(p.caching)
+            && compaction.equals(p.compaction)
+            && compression.equals(p.compression);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(comment,
+                                readRepairChance,
+                                dcLocalReadRepairChance,
+                                bloomFilterFpChance,
+                                gcGraceSeconds,
+                                defaultTimeToLive,
+                                memtableFlushPeriodInMs,
+                                minIndexInterval,
+                                maxIndexInterval,
+                                speculativeRetry,
+                                caching,
+                                compaction,
+                                compression);
+    }
+
+    @Override
+    public String toString()
+    {
+        return MoreObjects.toStringHelper(this)
+                          .add(Option.COMMENT.toString(), comment)
+                          .add(Option.READ_REPAIR_CHANCE.toString(), readRepairChance)
+                          .add(Option.DCLOCAL_READ_REPAIR_CHANCE.toString(), dcLocalReadRepairChance)
+                          .add(Option.BLOOM_FILTER_FP_CHANCE.toString(), bloomFilterFpChance)
+                          .add(Option.GC_GRACE_SECONDS.toString(), gcGraceSeconds)
+                          .add(Option.DEFAULT_TIME_TO_LIVE.toString(), defaultTimeToLive)
+                          .add(Option.MEMTABLE_FLUSH_PERIOD_IN_MS.toString(), memtableFlushPeriodInMs)
+                          .add(Option.MIN_INDEX_INTERVAL.toString(), minIndexInterval)
+                          .add(Option.MAX_INDEX_INTERVAL.toString(), maxIndexInterval)
+                          .add(Option.SPECULATIVE_RETRY.toString(), speculativeRetry)
+                          .add(Option.CACHING.toString(), caching)
+                          .add(Option.COMPACTION.toString(), compaction)
+                          .add(Option.COMPRESSION.toString(), compression)
+                          .toString();
+    }
+
+    public static final class Builder
+    {
+        private String comment = DEFAULT_COMMENT;
+        private double readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
+        private double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
+        private Double bloomFilterFpChance;
+        private int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
+        private int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
+        private int memtableFlushPeriodInMs = DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS;
+        private int minIndexInterval = DEFAULT_MIN_INDEX_INTERVAL;
+        private int maxIndexInterval = DEFAULT_MAX_INDEX_INTERVAL;
+        private SpeculativeRetryParam speculativeRetry = SpeculativeRetryParam.DEFAULT;
+        private CachingParams caching = CachingParams.DEFAULT;
+        private CompactionParams compaction = CompactionParams.DEFAULT;
+        private CompressionParams compression = CompressionParams.DEFAULT;
+
+        public Builder()
+        {
+        }
+
+        public TableParams build()
+        {
+            return new TableParams(this);
+        }
+
+        public Builder comment(String val)
+        {
+            comment = val;
+            return this;
+        }
+
+        public Builder readRepairChance(double val)
+        {
+            readRepairChance = val;
+            return this;
+        }
+
+        public Builder dcLocalReadRepairChance(double val)
+        {
+            dcLocalReadRepairChance = val;
+            return this;
+        }
+
+        public Builder bloomFilterFpChance(double val)
+        {
+            bloomFilterFpChance = val;
+            return this;
+        }
+
+        public Builder gcGraceSeconds(int val)
+        {
+            gcGraceSeconds = val;
+            return this;
+        }
+
+        public Builder defaultTimeToLive(int val)
+        {
+            defaultTimeToLive = val;
+            return this;
+        }
+
+        public Builder memtableFlushPeriodInMs(int val)
+        {
+            memtableFlushPeriodInMs = val;
+            return this;
+        }
+
+        public Builder minIndexInterval(int val)
+        {
+            minIndexInterval = val;
+            return this;
+        }
+
+        public Builder maxIndexInterval(int val)
+        {
+            maxIndexInterval = val;
+            return this;
+        }
+
+        public Builder speculativeRetry(SpeculativeRetryParam val)
+        {
+            speculativeRetry = val;
+            return this;
+        }
+
+        public Builder caching(CachingParams val)
+        {
+            caching = val;
+            return this;
+        }
+
+        public Builder compaction(CompactionParams val)
+        {
+            compaction = val;
+            return this;
+        }
+
+        public Builder compression(CompressionParams val)
+        {
+            compression = val;
+            return this;
+        }
+    }
+}


Mime
View raw message