cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [13/15] git commit: merge from 1.2
Date Mon, 26 Aug 2013 23:02:10 GMT
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37c7d239
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37c7d239
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37c7d239

Branch: refs/heads/cassandra-2.0
Commit: 37c7d2396e7c25ed39ce1c7c5548b180b822c701
Parents: 5d7e8c8 a4dd7aa
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Mon Aug 26 18:01:43 2013 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Mon Aug 26 18:01:43 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 ++++
 NEWS.txt                                        |  9 +++++++++
 .../org/apache/cassandra/config/Config.java     |  2 ++
 .../cassandra/config/DatabaseDescriptor.java    | 20 ++++++++++++++++++++
 .../cql3/statements/AlterTableStatement.java    | 15 +++++++++++++++
 src/java/org/apache/cassandra/db/Memtable.java  | 17 ++++++++---------
 .../db/compaction/CompactionManager.java        | 12 +++++-------
 .../org/apache/cassandra/utils/Allocator.java   |  4 ++++
 .../apache/cassandra/utils/HeapAllocator.java   | 16 +++++++++++++++-
 9 files changed, 82 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/37c7d239/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index da98872,45e6497..3f8815e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,54 -1,13 +1,57 @@@
 -1.2.10
 +2.0.1
 + * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
 + * Log Merkle tree stats (CASSANDRA-2698)
 + * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
 + * Improve offheap memcpy performance (CASSANDRA-5884)
 + * Use a range aware scanner for cleanup (CASSANDRA-2524)
 + * Cleanup doesn't need to inspect sstables that contain only local data 
 +   (CASSANDRA-5722)
 + * Add ability for CQL3 to list partition keys (CASSANDRA-4536)
 + * Improve native protocol serialization (CASSANDRA-5664)
++Merged from 1.2:
+  * Allow disabling SlabAllocator (CASSANDRA-5935)
+  * Make user-defined compaction JMX blocking (CASSANDRA-4952)
  
  
 -1.2.9
 +2.0.0
 + * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138)
 + * Fix periodic memtable flushing behavior with clean memtables (CASSANDRA-5931)
 + * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928)
 + * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938)
 +Merged from 1.2:
   * Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900)
 - * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases
 -   (CASSANDRA-5800)
 - * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831)
 + * Don't announce schema version until we've loaded the changes locally
 +   (CASSANDRA-5904)
 + * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903)
 + * Properly handle parsing huge map and set literals (CASSANDRA-5893)
 +
 +
 +2.0.0-rc2
 + * enable vnodes by default (CASSANDRA-5869)
 + * fix CAS contention timeout (CASSANDRA-5830)
 + * fix HsHa to respect max frame size (CASSANDRA-4573)
 + * Fix (some) 2i on composite components omissions (CASSANDRA-5851)
 + * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880)
 +Merged from 1.2:
 + * Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
 + * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868)
 + * cqlsh: add support for multiline comments (CASSANDRA-5798)
 + * Handle CQL3 SELECT duplicate IN restrictions on clustering columns
 +   (CASSANDRA-5856)
 +
 +
 +2.0.0-rc1
 + * improve DecimalSerializer performance (CASSANDRA-5837)
 + * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690)
 + * fix schema-related trigger issues (CASSANDRA-5774)
 + * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138)
 + * Fix assertion error during repair (CASSANDRA-5801)
 + * Fix range tombstone bug (CASSANDRA-5805)
 + * DC-local CAS (CASSANDRA-5797)
 + * Add a native_protocol_version column to the system.local table (CASSANRDA-5819)
 + * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822)
 + * Fix buffer underflow on socket close (CASSANDRA-5792)
 +Merged from 1.2:
   * fix bulk-loading compressed sstables (CASSANDRA-5820)
   * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter 
     (CASSANDRA-5824)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37c7d239/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index f896671,1f3f675..51d9d4c
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -101,8 -23,21 +101,17 @@@ Feature
        (cli and cqlsh history, .cqlshrc) are now centralized in ~/.cassandra, as well.
      - A new sstablesplit utility allows to split large sstables offline.
  
 -Defaults
 ---------
 -    - After performance testing for CASSANDRA-5727, the default LCS filesize
 -      has been changed from 5MB to 160MB.
  
  
+ 1.2.8
+ =====
+ 
+ Upgrading
+ ---------
+     - Nothing specific to this release, but please see 1.2.7 if you are upgrading
+       from a previous version.
+ 
+ 
  1.2.7
  =====
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37c7d239/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index 61d1d51,74b941d..99fd833
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -163,15 -165,14 +163,17 @@@ public class Confi
      public long row_cache_size_in_mb = 0;
      public volatile int row_cache_save_period = 0;
      public int row_cache_keys_to_save = Integer.MAX_VALUE;
 -    public String row_cache_provider = SerializingCacheProvider.class.getSimpleName();
 +    public String memory_allocator = NativeAllocator.class.getSimpleName();
      public boolean populate_io_cache_on_flush = false;
  
 -    public boolean inter_dc_tcp_nodelay = true;
 +    public boolean inter_dc_tcp_nodelay = false;
 +
 +    private static boolean isClientMode = false;
 +
 +    public boolean preheat_kernel_page_cache = false;
  
+     public String memtable_allocator = "SlabAllocator";
+ 
 -    private static boolean loadYaml = true;
      private static boolean outboundBindAny = false;
  
      public static boolean getOutboundBindAny()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37c7d239/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a9b0d41,1412888..1e1b9a2
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -49,7 -50,13 +49,8 @@@ import org.apache.cassandra.net.Messagi
  import org.apache.cassandra.scheduler.IRequestScheduler;
  import org.apache.cassandra.scheduler.NoScheduler;
  import org.apache.cassandra.service.CacheService;
 -import org.apache.cassandra.service.MigrationManager;
+ import org.apache.cassandra.utils.Allocator;
  import org.apache.cassandra.utils.FBUtilities;
 -import org.yaml.snakeyaml.Loader;
 -import org.yaml.snakeyaml.TypeDescription;
 -import org.yaml.snakeyaml.Yaml;
 -import org.yaml.snakeyaml.error.YAMLException;
  
  public class DatabaseDescriptor
  {
@@@ -83,393 -92,432 +84,400 @@@
      private static String localDC;
      private static Comparator<InetAddress> localComparator;
  
+     private static Class<? extends Allocator> memtableAllocator;
+ 
 -    /**
 -     * Inspect the classpath to find storage configuration file
 -     */
 -    static URL getStorageConfigURL() throws ConfigurationException
 +    static
      {
 -        String configUrl = System.getProperty("cassandra.config");
 -        if (configUrl == null)
 -            configUrl = DEFAULT_CONFIGURATION;
 -
 -        URL url;
 +        // In client mode, we use a default configuration. Note that the fields of this class will be
 +        // left unconfigured however (the partitioner or localDC will be null for instance) so this
 +        // should be used with care.
          try
          {
 -            url = new URL(configUrl);
 -            url.openStream().close(); // catches well-formed but bogus URLs
 +            if (Config.isClientMode())
 +            {
 +                conf = new Config();
 +                // at least we have to set memoryAllocator to open SSTable in client mode
 +                memoryAllocator = FBUtilities.newOffHeapAllocator(conf.memory_allocator);
 +            }
 +            else
 +            {
 +                applyConfig(loadConfig());
 +            }
 +        }
 +        catch (ConfigurationException e)
 +        {
 +            logger.error("Fatal configuration error", e);
 +            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start. See log for stacktrace.");
 +            System.exit(1);
          }
          catch (Exception e)
          {
 -            ClassLoader loader = DatabaseDescriptor.class.getClassLoader();
 -            url = loader.getResource(configUrl);
 -            if (url == null)
 -                throw new ConfigurationException("Cannot locate " + configUrl);
 +            logger.error("Fatal error during configuration loading", e);
 +            System.err.println(e.getMessage() + "\nFatal error during configuration loading; unable to start. See log for stacktrace.");
 +            System.exit(1);
          }
 -
 -        return url;
      }
  
 -    static
 +    @VisibleForTesting
 +    static Config loadConfig() throws ConfigurationException
      {
 -        if (Config.getLoadYaml())
 -            loadYaml();
 -        else
 -            conf = new Config();
 +        String loaderClass = System.getProperty("cassandra.config.loader");
 +        ConfigurationLoader loader = loaderClass == null
 +                                   ? new YamlConfigurationLoader()
 +                                   : FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
 +        return loader.loadConfig();
      }
 -    static void loadYaml()
 +
 +    private static void applyConfig(Config config) throws ConfigurationException
      {
 -        try
 -        {
 -            URL url = getStorageConfigURL();
 -            logger.info("Loading settings from " + url);
 -            InputStream input;
 -            try
 -            {
 -                input = url.openStream();
 -            }
 -            catch (IOException e)
 -            {
 -                // getStorageConfigURL should have ruled this out
 -                throw new AssertionError(e);
 -            }
 -            org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
 -            TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
 -            seedDesc.putMapPropertyType("parameters", String.class, String.class);
 -            constructor.addTypeDescription(seedDesc);
 -            Yaml yaml = new Yaml(new Loader(constructor));
 -            conf = (Config)yaml.load(input);
 +        conf = config;
  
 -            logger.info("Data files directories: " + Arrays.toString(conf.data_file_directories));
 -            logger.info("Commit log directory: " + conf.commitlog_directory);
 +        logger.info("Data files directories: " + Arrays.toString(conf.data_file_directories));
 +        logger.info("Commit log directory: " + conf.commitlog_directory);
  
 -            if (conf.commitlog_sync == null)
 -            {
 -                throw new ConfigurationException("Missing required directive CommitLogSync");
 -            }
 +        if (conf.commitlog_sync == null)
 +        {
 +            throw new ConfigurationException("Missing required directive CommitLogSync");
 +        }
  
 -            if (conf.commitlog_sync == Config.CommitLogSync.batch)
 -            {
 -                if (conf.commitlog_sync_batch_window_in_ms == null)
 -                {
 -                    throw new ConfigurationException("Missing value for commitlog_sync_batch_window_in_ms: Double expected.");
 -                }
 -                else if (conf.commitlog_sync_period_in_ms != null)
 -                {
 -                    throw new ConfigurationException("Batch sync specified, but commitlog_sync_period_in_ms found. Only specify commitlog_sync_batch_window_in_ms when using batch sync");
 -                }
 -                logger.debug("Syncing log with a batch window of " + conf.commitlog_sync_batch_window_in_ms);
 -            }
 -            else
 +        if (conf.commitlog_sync == Config.CommitLogSync.batch)
 +        {
 +            if (conf.commitlog_sync_batch_window_in_ms == null)
              {
 -                if (conf.commitlog_sync_period_in_ms == null)
 -                {
 -                    throw new ConfigurationException("Missing value for commitlog_sync_period_in_ms: Integer expected");
 -                }
 -                else if (conf.commitlog_sync_batch_window_in_ms != null)
 -                {
 -                    throw new ConfigurationException("commitlog_sync_period_in_ms specified, but commitlog_sync_batch_window_in_ms found.  Only specify commitlog_sync_period_in_ms when using periodic sync.");
 -                }
 -                logger.debug("Syncing log with a period of " + conf.commitlog_sync_period_in_ms);
 +                throw new ConfigurationException("Missing value for commitlog_sync_batch_window_in_ms: Double expected.");
              }
 -
 -            if (conf.commitlog_total_space_in_mb == null)
 -                conf.commitlog_total_space_in_mb = System.getProperty("os.arch").contains("64") ? 1024 : 32;
 -
 -            /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */
 -            if (conf.disk_access_mode == Config.DiskAccessMode.auto)
 +            else if (conf.commitlog_sync_period_in_ms != null)
              {
 -                conf.disk_access_mode = System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : Config.DiskAccessMode.standard;
 -                indexAccessMode = conf.disk_access_mode;
 -                logger.info("DiskAccessMode 'auto' determined to be " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +                throw new ConfigurationException("Batch sync specified, but commitlog_sync_period_in_ms found. Only specify commitlog_sync_batch_window_in_ms when using batch sync");
              }
 -            else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only)
 +            logger.debug("Syncing log with a batch window of " + conf.commitlog_sync_batch_window_in_ms);
 +        }
 +        else
 +        {
 +            if (conf.commitlog_sync_period_in_ms == null)
              {
 -                conf.disk_access_mode = Config.DiskAccessMode.standard;
 -                indexAccessMode = Config.DiskAccessMode.mmap;
 -                logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +                throw new ConfigurationException("Missing value for commitlog_sync_period_in_ms: Integer expected");
              }
 -            else
 +            else if (conf.commitlog_sync_batch_window_in_ms != null)
              {
 -                indexAccessMode = conf.disk_access_mode;
 -                logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +                throw new ConfigurationException("commitlog_sync_period_in_ms specified, but commitlog_sync_batch_window_in_ms found.  Only specify commitlog_sync_period_in_ms when using periodic sync.");
              }
 +            logger.debug("Syncing log with a period of " + conf.commitlog_sync_period_in_ms);
 +        }
  
 -            logger.info("disk_failure_policy is " + conf.disk_failure_policy);
 +        if (conf.commitlog_total_space_in_mb == null)
 +            conf.commitlog_total_space_in_mb = System.getProperty("os.arch").contains("64") ? 1024 : 32;
  
 -            /* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer */
 -            if (conf.authenticator != null)
 -                authenticator = FBUtilities.newAuthenticator(conf.authenticator);
 +        /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */
 +        if (conf.disk_access_mode == Config.DiskAccessMode.auto)
 +        {
 +            conf.disk_access_mode = System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : Config.DiskAccessMode.standard;
 +            indexAccessMode = conf.disk_access_mode;
 +            logger.info("DiskAccessMode 'auto' determined to be " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +        }
 +        else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only)
 +        {
 +            conf.disk_access_mode = Config.DiskAccessMode.standard;
 +            indexAccessMode = Config.DiskAccessMode.mmap;
 +            logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +        }
 +        else
 +        {
 +            indexAccessMode = conf.disk_access_mode;
 +            logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +        }
  
 -            if (conf.authority != null)
 -            {
 -                logger.warn("Please rename 'authority' to 'authorizer' in cassandra.yaml");
 -                if (!conf.authority.equals("org.apache.cassandra.auth.AllowAllAuthority"))
 -                    throw new ConfigurationException("IAuthority interface has been deprecated,"
 -                                                     + " please implement IAuthorizer instead.");
 -            }
 +        logger.info("disk_failure_policy is " + conf.disk_failure_policy);
  
 -            if (conf.authorizer != null)
 -                authorizer = FBUtilities.newAuthorizer(conf.authorizer);
 +        /* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer */
 +        if (conf.authenticator != null)
 +            authenticator = FBUtilities.newAuthenticator(conf.authenticator);
  
 -            if (authenticator instanceof AllowAllAuthenticator && !(authorizer instanceof AllowAllAuthorizer))
 -                throw new ConfigurationException("AllowAllAuthenticator can't be used with " +  conf.authorizer);
 +        if (conf.authorizer != null)
 +            authorizer = FBUtilities.newAuthorizer(conf.authorizer);
  
 -            if (conf.internode_authenticator != null)
 -                internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator");
 -            else
 -                internodeAuthenticator = new AllowAllInternodeAuthenticator();
 +        if (authenticator instanceof AllowAllAuthenticator && !(authorizer instanceof AllowAllAuthorizer))
 +            throw new ConfigurationException("AllowAllAuthenticator can't be used with " +  conf.authorizer);
  
 -            authenticator.validateConfiguration();
 -            authorizer.validateConfiguration();
 -            internodeAuthenticator.validateConfiguration();
 +        if (conf.internode_authenticator != null)
 +            internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator");
 +        else
 +            internodeAuthenticator = new AllowAllInternodeAuthenticator();
  
 -            /* Hashing strategy */
 -            if (conf.partitioner == null)
 -            {
 -                throw new ConfigurationException("Missing directive: partitioner");
 -            }
 +        authenticator.validateConfiguration();
 +        authorizer.validateConfiguration();
 +        internodeAuthenticator.validateConfiguration();
  
 -            try
 -            {
 -                partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner));
 -            }
 -            catch (Exception e)
 -            {
 -                throw new ConfigurationException("Invalid partitioner class " + conf.partitioner);
 -            }
 -            paritionerName = partitioner.getClass().getCanonicalName();
 +        /* Hashing strategy */
 +        if (conf.partitioner == null)
 +        {
 +            throw new ConfigurationException("Missing directive: partitioner");
 +        }
 +        try
 +        {
 +            partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner));
 +        }
 +        catch (Exception e)
 +        {
 +            throw new ConfigurationException("Invalid partitioner class " + conf.partitioner);
 +        }
 +        paritionerName = partitioner.getClass().getCanonicalName();
  
 -            /* phi convict threshold for FailureDetector */
 -            if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 16)
 -            {
 -                throw new ConfigurationException("phi_convict_threshold must be between 5 and 16");
 -            }
 +        /* phi convict threshold for FailureDetector */
 +        if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 16)
 +        {
 +            throw new ConfigurationException("phi_convict_threshold must be between 5 and 16");
 +        }
  
 -            /* Thread per pool */
 -            if (conf.concurrent_reads != null && conf.concurrent_reads < 2)
 -            {
 -                throw new ConfigurationException("concurrent_reads must be at least 2");
 -            }
 +        /* Thread per pool */
 +        if (conf.concurrent_reads != null && conf.concurrent_reads < 2)
 +        {
 +            throw new ConfigurationException("concurrent_reads must be at least 2");
 +        }
  
 -            if (conf.concurrent_writes != null && conf.concurrent_writes < 2)
 -            {
 -                throw new ConfigurationException("concurrent_writes must be at least 2");
 -            }
 +        if (conf.concurrent_writes != null && conf.concurrent_writes < 2)
 +        {
 +            throw new ConfigurationException("concurrent_writes must be at least 2");
 +        }
  
 -            if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2)
 -            {
 -                throw new ConfigurationException("concurrent_replicates must be at least 2");
 -            }
 +        if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2)
 +        {
 +            throw new ConfigurationException("concurrent_replicates must be at least 2");
 +        }
 +
 +        if (conf.memtable_total_space_in_mb == null)
 +            conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
 +        if (conf.memtable_total_space_in_mb <= 0)
 +            throw new ConfigurationException("memtable_total_space_in_mb must be positive");
 +        logger.info("Global memtable threshold is enabled at {}MB", conf.memtable_total_space_in_mb);
  
 -            if (conf.memtable_total_space_in_mb == null)
 -                conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
 -            if (conf.memtable_total_space_in_mb <= 0)
 -                throw new ConfigurationException("memtable_total_space_in_mb must be positive");
 -            logger.info("Global memtable threshold is enabled at {}MB", conf.memtable_total_space_in_mb);
 +        /* Memtable flush writer threads */
 +        if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
 +        {
 +            throw new ConfigurationException("memtable_flush_writers must be at least 1");
 +        }
 +        else if (conf.memtable_flush_writers == null)
 +        {
 +            conf.memtable_flush_writers = conf.data_file_directories.length;
 +        }
  
 -            /* Memtable flush writer threads */
 -            if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
 +        /* Local IP or hostname to bind services to */
 +        if (conf.listen_address != null)
 +        {
 +            try
              {
 -                throw new ConfigurationException("memtable_flush_writers must be at least 1");
 +                listenAddress = InetAddress.getByName(conf.listen_address);
              }
 -            else if (conf.memtable_flush_writers == null)
 +            catch (UnknownHostException e)
              {
 -                conf.memtable_flush_writers = conf.data_file_directories.length;
 +                throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'");
              }
 +        }
  
 -            /* Local IP or hostname to bind services to */
 -            if (conf.listen_address != null)
 +        /* Gossip Address to broadcast */
 +        if (conf.broadcast_address != null)
 +        {
 +            if (conf.broadcast_address.equals("0.0.0.0"))
              {
 -                try
 -                {
 -                    listenAddress = InetAddress.getByName(conf.listen_address);
 -                }
 -                catch (UnknownHostException e)
 -                {
 -                    throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'");
 -                }
 +                throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!");
              }
  
 -            /* Gossip Address to broadcast */
 -            if (conf.broadcast_address != null)
 +            try
              {
 -                if (conf.broadcast_address.equals("0.0.0.0"))
 -                {
 -                    throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!");
 -                }
 -
 -                try
 -                {
 -                    broadcastAddress = InetAddress.getByName(conf.broadcast_address);
 -                }
 -                catch (UnknownHostException e)
 -                {
 -                    throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'");
 -                }
 +                broadcastAddress = InetAddress.getByName(conf.broadcast_address);
              }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'");
 +            }
 +        }
  
 -            /* Local IP or hostname to bind RPC server to */
 -            if (conf.rpc_address != null)
 +        /* Local IP or hostname to bind RPC server to */
 +        if (conf.rpc_address != null)
 +        {
 +            try
              {
 -                try
 -                {
 -                    rpcAddress = InetAddress.getByName(conf.rpc_address);
 -                }
 -                catch (UnknownHostException e)
 -                {
 -                    throw new ConfigurationException("Unknown host in rpc_address " + conf.rpc_address);
 -                }
 +                rpcAddress = InetAddress.getByName(conf.rpc_address);
              }
 -            else
 +            catch (UnknownHostException e)
              {
 -                rpcAddress = FBUtilities.getLocalAddress();
 +                throw new ConfigurationException("Unknown host in rpc_address " + conf.rpc_address);
              }
 +        }
 +        else
 +        {
 +            rpcAddress = FBUtilities.getLocalAddress();
 +        }
  
 -            if (conf.thrift_framed_transport_size_in_mb <= 0)
 -                throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive");
 +        if (conf.thrift_framed_transport_size_in_mb <= 0)
 +            throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive");
  
 -            /* end point snitch */
 -            if (conf.endpoint_snitch == null)
 -            {
 -                throw new ConfigurationException("Missing endpoint_snitch directive");
 -            }
 -            snitch = createEndpointSnitch(conf.endpoint_snitch);
 -            EndpointSnitchInfo.create();
 +        /* end point snitch */
 +        if (conf.endpoint_snitch == null)
 +        {
 +            throw new ConfigurationException("Missing endpoint_snitch directive");
 +        }
 +        snitch = createEndpointSnitch(conf.endpoint_snitch);
 +        EndpointSnitchInfo.create();
  
 -            localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 -            localComparator = new Comparator<InetAddress>()
 +        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 +        localComparator = new Comparator<InetAddress>()
 +        {
 +            public int compare(InetAddress endpoint1, InetAddress endpoint2)
              {
 -                public int compare(InetAddress endpoint1, InetAddress endpoint2)
 -                {
 -                    boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
 -                    boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
 -                    if (local1 && !local2)
 -                        return -1;
 -                    if (local2 && !local1)
 -                        return 1;
 -                    return 0;
 -                }
 -            };
 +                boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
 +                boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
 +                if (local1 && !local2)
 +                    return -1;
 +                if (local2 && !local1)
 +                    return 1;
 +                return 0;
 +            }
 +        };
  
 -            /* Request Scheduler setup */
 -            requestSchedulerOptions = conf.request_scheduler_options;
 -            if (conf.request_scheduler != null)
 +        /* Request Scheduler setup */
 +        requestSchedulerOptions = conf.request_scheduler_options;
 +        if (conf.request_scheduler != null)
 +        {
 +            try
              {
 -                try
 +                if (requestSchedulerOptions == null)
                  {
 -                    if (requestSchedulerOptions == null)
 -                    {
 -                        requestSchedulerOptions = new RequestSchedulerOptions();
 -                    }
 -                    Class<?> cls = Class.forName(conf.request_scheduler);
 -                    requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
 +                    requestSchedulerOptions = new RequestSchedulerOptions();
                  }
 -                catch (ClassNotFoundException e)
 -                {
 -                    throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler);
 -                }
 -                catch (Exception e)
 -                {
 -                    throw new ConfigurationException("Unable to instantiate request scheduler", e);
 -                }
 -            }
 -            else
 -            {
 -                requestScheduler = new NoScheduler();
 +                Class<?> cls = Class.forName(conf.request_scheduler);
 +                requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
              }
 -
 -            if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
 +            catch (ClassNotFoundException e)
              {
 -                requestSchedulerId = conf.request_scheduler_id;
 +                throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler);
              }
 -            else
 +            catch (Exception e)
              {
 -                // Default to Keyspace
 -                requestSchedulerId = RequestSchedulerId.keyspace;
 +                throw new ConfigurationException("Unable to instantiate request scheduler", e);
              }
 +        }
 +        else
 +        {
 +            requestScheduler = new NoScheduler();
 +        }
  
 -            if (logger.isDebugEnabled() && conf.auto_bootstrap != null)
 -            {
 -                logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap);
 -            }
 +        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
 +        {
 +            requestSchedulerId = conf.request_scheduler_id;
 +        }
 +        else
 +        {
 +            // Default to Keyspace
 +            requestSchedulerId = RequestSchedulerId.keyspace;
 +        }
  
 -            logger.info((conf.multithreaded_compaction ? "" : "Not ") + "using multi-threaded compaction");
 +        if (logger.isDebugEnabled() && conf.auto_bootstrap != null)
 +        {
 +            logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap);
 +        }
  
 -            if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0)
 -            {
 -                throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
 -            }
 +        logger.info((conf.multithreaded_compaction ? "" : "Not ") + "using multi-threaded compaction");
  
 -            if (conf.concurrent_compactors == null)
 -                conf.concurrent_compactors = FBUtilities.getAvailableProcessors();
 +        if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0)
 +        {
 +            throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
 +        }
  
 -            if (conf.concurrent_compactors <= 0)
 -                throw new ConfigurationException("concurrent_compactors should be strictly greater than 0");
 +        if (conf.concurrent_compactors == null)
 +            conf.concurrent_compactors = FBUtilities.getAvailableProcessors();
  
 -            /* data file and commit log directories. they get created later, when they're needed. */
 -            if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
 -            {
 -                for (String datadir : conf.data_file_directories)
 -                {
 -                    if (datadir.equals(conf.commitlog_directory))
 -                        throw new ConfigurationException("commitlog_directory must not be the same as any data_file_directories");
 -                    if (datadir.equals(conf.saved_caches_directory))
 -                        throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories");
 -                }
 +        if (conf.concurrent_compactors <= 0)
 +            throw new ConfigurationException("concurrent_compactors should be strictly greater than 0");
  
 -                if (conf.commitlog_directory.equals(conf.saved_caches_directory))
 -                    throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory");
 -            }
 -            else
 +        /* data file and commit log directories. they get created later, when they're needed. */
 +        if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
 +        {
 +            for (String datadir : conf.data_file_directories)
              {
 -                if (conf.commitlog_directory == null)
 -                    throw new ConfigurationException("commitlog_directory missing");
 -                if (conf.data_file_directories == null)
 -                    throw new ConfigurationException("data_file_directories missing; at least one data directory must be specified");
 -                if (conf.saved_caches_directory == null)
 -                    throw new ConfigurationException("saved_caches_directory missing");
 +                if (datadir.equals(conf.commitlog_directory))
 +                    throw new ConfigurationException("commitlog_directory must not be the same as any data_file_directories");
 +                if (datadir.equals(conf.saved_caches_directory))
 +                    throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories");
              }
  
 -            if (conf.initial_token != null)
 -                for (String token : tokensFromString(conf.initial_token))
 -                    partitioner.getTokenFactory().validate(token);
 +            if (conf.commitlog_directory.equals(conf.saved_caches_directory))
 +                throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory");
 +        }
 +        else
 +        {
 +            if (conf.commitlog_directory == null)
 +                throw new ConfigurationException("commitlog_directory missing");
 +            if (conf.data_file_directories == null)
 +                throw new ConfigurationException("data_file_directories missing; at least one data directory must be specified");
 +            if (conf.saved_caches_directory == null)
 +                throw new ConfigurationException("saved_caches_directory missing");
 +        }
  
 -            try
 -            {
 -                // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB)
 -                keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
 -                                    ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
 -                                    : conf.key_cache_size_in_mb;
 +        if (conf.initial_token != null)
 +            for (String token : tokensFromString(conf.initial_token))
 +                partitioner.getTokenFactory().validate(token);
  
 -                if (keyCacheSizeInMB < 0)
 -                    throw new NumberFormatException(); // to escape duplicating error message
 -            }
 -            catch (NumberFormatException e)
 -            {
 -                throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
 -                                                 + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
 -            }
 +        try
 +        {
 +            // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB)
 +            keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
 +                ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
 +                : conf.key_cache_size_in_mb;
 +
 +            if (keyCacheSizeInMB < 0)
 +                throw new NumberFormatException(); // to escape duplicating error message
 +        }
 +        catch (NumberFormatException e)
 +        {
 +            throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
 +                    + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
 +        }
  
 -            rowCacheProvider = FBUtilities.newCacheProvider(conf.row_cache_provider);
 +        memoryAllocator = FBUtilities.newOffHeapAllocator(conf.memory_allocator);
  
 -            if(conf.encryption_options != null)
 -            {
 -                logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
 -                //operate under the assumption that server_encryption_options is not set in yaml rather than both
 -                conf.server_encryption_options = conf.encryption_options;
 -            }
 +        if(conf.encryption_options != null)
 +        {
 +            logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
 +            //operate under the assumption that server_encryption_options is not set in yaml rather than both
 +            conf.server_encryption_options = conf.encryption_options;
 +        }
  
 -            String allocatorClass = conf.memtable_allocator;
 -            if (!allocatorClass.contains("."))
 -                allocatorClass = "org.apache.cassandra.utils." + allocatorClass;
 -            memtableAllocator = FBUtilities.classForName(allocatorClass, "allocator");
++        String allocatorClass = conf.memtable_allocator;
++        if (!allocatorClass.contains("."))
++            allocatorClass = "org.apache.cassandra.utils." + allocatorClass;
++        memtableAllocator = FBUtilities.classForName(allocatorClass, "allocator");
+ 
 -            // Hardcoded system tables
 -            List<KSMetaData> systemKeyspaces = Arrays.asList(KSMetaData.systemKeyspace(), KSMetaData.traceKeyspace());
 -            assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size();
 -            for (KSMetaData ksmd : systemKeyspaces)
 -            {
 -                // install the definition
 -                for (CFMetaData cfm : ksmd.cfMetaData().values())
 -                    Schema.instance.load(cfm);
 -                Schema.instance.setTableDefinition(ksmd);
 -            }
 +        // Hardcoded system keyspaces
 +        List<KSMetaData> systemKeyspaces = Arrays.asList(KSMetaData.systemKeyspace(), KSMetaData.traceKeyspace());
 +        assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size();
 +        for (KSMetaData ksmd : systemKeyspaces)
 +        {
 +            // install the definition
 +            for (CFMetaData cfm : ksmd.cfMetaData().values())
 +                Schema.instance.load(cfm);
 +            Schema.instance.setKeyspaceDefinition(ksmd);
 +        }
  
 -            /* Load the seeds for node contact points */
 -            if (conf.seed_provider == null)
 -            {
 -                throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.");
 -            }
 -            try
 -            {
 -                Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name);
 -                seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
 -            }
 -            // there are about 5 checked exceptions that could be thrown here.
 -            catch (Exception e)
 -            {
 -                logger.error("Fatal configuration error", e);
 -                System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
 -                System.exit(1);
 -            }
 -            if (seedProvider.getSeeds().size() == 0)
 -                throw new ConfigurationException("The seed provider lists no seeds.");
 +        /* Load the seeds for node contact points */
 +        if (conf.seed_provider == null)
 +        {
 +            throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.");
          }
 -        catch (ConfigurationException e)
 +        try
          {
 -            logger.error("Fatal configuration error", e);
 -            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
 -            System.exit(1);
 +            Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name);
 +            seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
          }
 -        catch (YAMLException e)
 +        // there are about 5 checked exceptions that could be thrown here.
 +        catch (Exception e)
          {
 -            logger.error("Fatal configuration error error", e);
 -            System.err.println(e.getMessage() + "\nInvalid yaml; unable to start server.  See log for stacktrace.");
 +            logger.error("Fatal configuration error", e);
 +            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
              System.exit(1);
          }
 +        if (seedProvider.getSeeds().size() == 0)
 +            throw new ConfigurationException("The seed provider lists no seeds.");
      }
  
      private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException
@@@ -1215,8 -1295,19 +1223,20 @@@
          return conf.inter_dc_tcp_nodelay;
      }
  
 +    public static boolean shouldPreheatPageCache()
 +    {
 +        return conf.preheat_kernel_page_cache;
 +    }
++
+     public static Allocator getMemtableAllocator()
+     {
+         try
+         {
+             return memtableAllocator.newInstance();
+         }
 -        catch (InstantiationException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 -        catch (IllegalAccessException e)
++        catch (InstantiationException | IllegalAccessException e)
+         {
+             throw new RuntimeException(e);
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37c7d239/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 68bbd94,a247a4d..3c88a33
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@@ -152,7 -160,17 +158,16 @@@ public class AlterTableStatement extend
                          break;
                      case COLUMN_METADATA:
                          ColumnDefinition column = cfm.getColumnDefinition(columnName.key);
+                         // Thrift allows to change a column validator so CFMetaData.validateCompatility will let it slide
+                         // if we change to an incompatible type (contrarily to the comparator case). But we don't want to
+                         // allow it for CQL3 (see #5882) so validating it explicitly here
+                         if (!validator.getType().isCompatibleWith(column.getValidator()))
+                             throw new ConfigurationException(String.format("Cannot change %s from type %s to type %s: types are incompatible.",
+                                                                            columnName,
+                                                                            column.getValidator().asCQL3Type(),
+                                                                            validator));
+ 
                          column.setValidator(validator.getType());
 -                        cfm.addColumnDefinition(column);
                          break;
                  }
                  break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37c7d239/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 3490a35,df06cfb..2b3ca1e
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -36,8 -30,17 +29,12 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
  import org.apache.cassandra.concurrent.NamedThreadFactory;
+ import org.apache.cassandra.concurrent.StageManager;
+ import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 -import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
  import org.apache.cassandra.db.commitlog.ReplayPosition;
 -import org.apache.cassandra.db.filter.AbstractColumnIterator;
 -import org.apache.cassandra.db.filter.NamesQueryFilter;
 -import org.apache.cassandra.db.filter.SliceQueryFilter;
+ import org.apache.cassandra.db.index.SecondaryIndexManager;
  import org.apache.cassandra.db.marshal.AbstractType;
  import org.apache.cassandra.io.sstable.SSTableMetadata;
  import org.apache.cassandra.io.sstable.SSTableReader;
@@@ -104,19 -109,18 +103,19 @@@ public class Memtabl
      // We index the memtable by RowPosition only for the purpose of being able
      // to select key range using Token.KeyBound. However put() ensures that we
      // actually only store DecoratedKey.
 -    private final ConcurrentNavigableMap<RowPosition, ColumnFamily> columnFamilies = new ConcurrentSkipListMap<RowPosition, ColumnFamily>();
 +    private final ConcurrentNavigableMap<RowPosition, AtomicSortedColumns> rows = new ConcurrentSkipListMap<RowPosition, AtomicSortedColumns>();
      public final ColumnFamilyStore cfs;
 -    private final long creationTime;
 +    private final long creationTime = System.currentTimeMillis();
 +    private final long creationNano = System.nanoTime();
  
-     private final SlabAllocator allocator = new SlabAllocator();
+     private final Allocator allocator = DatabaseDescriptor.getMemtableAllocator();
      // We really only need one column by allocator but one by memtable is not a big waste and avoids needing allocators to know about CFS
 -    private final Function<IColumn, IColumn> localCopyFunction = new Function<IColumn, IColumn>()
 +    private final Function<Column, Column> localCopyFunction = new Function<Column, Column>()
      {
 -        public IColumn apply(IColumn c)
 +        public Column apply(Column c)
          {
              return c.localCopy(cfs, allocator);
 -        };
 +        }
      };
  
      // Record the comparator of the CFS at the creation of the memtable. This

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37c7d239/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 8e8220f,44b973f..2384e0b
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -38,8 -43,9 +38,7 @@@ import org.apache.cassandra.config.CFMe
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.db.commitlog.ReplayPosition;
  import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
 -import org.apache.cassandra.db.index.SecondaryIndex;
  import org.apache.cassandra.db.index.SecondaryIndexBuilder;
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Range;
@@@ -47,16 -53,16 +46,13 @@@ import org.apache.cassandra.dht.Token
  import org.apache.cassandra.io.sstable.*;
  import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.metrics.CompactionMetrics;
- import org.apache.cassandra.service.StorageService;
 -import org.apache.cassandra.service.AntiEntropyService;
 -import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.repair.Validator;
- import org.apache.cassandra.utils.CloseableIterator;
- import org.apache.cassandra.utils.CounterId;
- import org.apache.cassandra.utils.Pair;
- import org.apache.cassandra.utils.WrappedRunnable;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.*;
  
  /**
 - * A singleton which manages a private executor of ongoing compactions. A readwrite lock
 - * controls whether compactions can proceed: an external consumer can completely stop
 - * compactions by acquiring the write half of the lock via getCompactionLock().
 - *
 + * A singleton which manages a private executor of ongoing compactions.
 + * <p/>
   * Scheduling for compaction is accomplished by swapping sstables to be compacted into
   * a set via DataTracker. New scheduling attempts will ignore currently compacting
   * sstables.
@@@ -310,23 -380,29 +306,25 @@@ public class CompactionManager implemen
          {
              // extract keyspace and columnfamily name from filename
              Descriptor desc = Descriptor.fromFilename(filename.trim());
 -            if (!desc.ksname.equals(ksname))
 -            {
 -                throw new IllegalArgumentException("Given keyspace " + ksname + " does not match with file " + filename);
 -            }
 -            if (cfname == null)
 -            {
 -                cfname = desc.cfname;
 -            }
 -            else if (!cfname.equals(desc.cfname))
 +            if (Schema.instance.getCFMetaData(desc) == null)
              {
 -                throw new IllegalArgumentException("All provided sstables should be for the same column family");
 +                logger.warn("Schema does not exist for file {}. Skipping.", filename);
 +                continue;
              }
 -            File directory = new File(ksname + File.separator + cfname);
 +            File directory = new File(desc.ksname + File.separator + desc.cfname);
 +            // group by keyspace/columnfamily
              Pair<Descriptor, String> p = Descriptor.fromFilename(directory, filename.trim());
 -            if (!p.right.equals(Component.DATA.name()))
 -            {
 -                throw new IllegalArgumentException(filename + " does not appear to be a data file");
 -            }
 -            descriptors.add(p.left);
 +            Pair<String, String> key = Pair.create(p.left.ksname, p.left.cfname);
 +            descriptors.put(key, p.left);
          }
  
 -        ColumnFamilyStore cfs = Table.open(ksname).getColumnFamilyStore(cfname);
 -        FBUtilities.waitOnFuture(submitUserDefined(cfs, descriptors, getDefaultGcBefore(cfs)));
++        List<Future<?>> futures = new ArrayList<>();
 +        for (Pair<String, String> key : descriptors.keySet())
 +        {
 +            ColumnFamilyStore cfs = Keyspace.open(key.left).getColumnFamilyStore(key.right);
-             submitUserDefined(cfs, descriptors.get(key), getDefaultGcBefore(cfs));
++            futures.add(submitUserDefined(cfs, descriptors.get(key), getDefaultGcBefore(cfs)));
 +        }
++        FBUtilities.waitOnFutures(futures);
      }
  
      public Future<?> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor> dataFiles, final int gcBefore)


Mime
View raw message