Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 65CDE200BD0 for ; Wed, 30 Nov 2016 10:49:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6456B160B13; Wed, 30 Nov 2016 09:49:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1E0D1160B2C for ; Wed, 30 Nov 2016 10:49:56 +0100 (CET) Received: (qmail 13581 invoked by uid 500); 30 Nov 2016 09:49:55 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 13210 invoked by uid 99); 30 Nov 2016 09:49:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Nov 2016 09:49:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 78DF6F16B1; Wed, 30 Nov 2016 09:49:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Date: Wed, 30 Nov 2016 09:49:59 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/11] cassandra git commit: Remove pre-3.0 compatibility code for 4.0 archived-at: Wed, 30 Nov 2016 09:49:59 -0000 http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java deleted file mode 100644 index d0fc151..0000000 --- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java +++ /dev/null @@ -1,1099 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.schema; - -import java.nio.ByteBuffer; -import java.util.*; -import java.util.stream.Collectors; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.*; -import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.cql3.FieldIdentifier; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.cql3.functions.FunctionName; -import org.apache.cassandra.cql3.functions.UDAggregate; -import org.apache.cassandra.cql3.functions.UDFunction; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; -import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.db.rows.RowIterator; -import org.apache.cassandra.db.rows.UnfilteredRowIterators; -import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.utils.FBUtilities; - -import static java.lang.String.format; -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; -import static org.apache.cassandra.utils.FBUtilities.fromJsonMap; - -/** - * This majestic class performs migration from legacy (pre-3.0) system.schema_* schema tables to the new and glorious - * system_schema keyspace. - * - * The goal is to not lose any information in the migration - including the timestamps. - */ -@SuppressWarnings("deprecation") -public final class LegacySchemaMigrator -{ - private LegacySchemaMigrator() - { - } - - private static final Logger logger = LoggerFactory.getLogger(LegacySchemaMigrator.class); - - static final List LegacySchemaTables = - ImmutableList.of(SystemKeyspace.LegacyKeyspaces, - SystemKeyspace.LegacyColumnfamilies, - SystemKeyspace.LegacyColumns, - SystemKeyspace.LegacyTriggers, - SystemKeyspace.LegacyUsertypes, - SystemKeyspace.LegacyFunctions, - SystemKeyspace.LegacyAggregates); - - public static void migrate() - { - // read metadata from the legacy schema tables - Collection keyspaces = readSchema(); - - // if already upgraded, or starting a new 3.0 node, abort early - if (keyspaces.isEmpty()) - { - unloadLegacySchemaTables(); - return; - } - - // write metadata to the new schema tables - logger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})", - keyspaces.size(), - SchemaConstants.SCHEMA_KEYSPACE_NAME); - keyspaces.forEach(LegacySchemaMigrator::storeKeyspaceInNewSchemaTables); - keyspaces.forEach(LegacySchemaMigrator::migrateBuiltIndexesForKeyspace); - - // flush the new tables before truncating the old ones - SchemaKeyspace.flush(); - - // truncate the original tables (will be snapshotted now, and will have been snapshotted by pre-flight checks) - logger.info("Truncating legacy schema tables"); - truncateLegacySchemaTables(); - - // remove legacy schema tables from Schema, so that their presence doesn't give the users any wrong ideas - unloadLegacySchemaTables(); - - logger.info("Completed migration of legacy schema tables"); - } - - private static void migrateBuiltIndexesForKeyspace(Keyspace keyspace) - { - keyspace.tables.forEach(LegacySchemaMigrator::migrateBuiltIndexesForTable); - } - - private static void migrateBuiltIndexesForTable(Table table) - { - table.metadata.getIndexes().forEach((index) -> migrateIndexBuildStatus(table.metadata.ksName, - table.metadata.cfName, - index)); - } - - private static void migrateIndexBuildStatus(String keyspace, String table, IndexMetadata index) - { - if (SystemKeyspace.isIndexBuilt(keyspace, table + '.' + index.name)) - { - SystemKeyspace.setIndexBuilt(keyspace, index.name); - SystemKeyspace.setIndexRemoved(keyspace, table + '.' + index.name); - } - } - - static void unloadLegacySchemaTables() - { - KeyspaceMetadata systemKeyspace = Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME); - - Tables systemTables = systemKeyspace.tables; - for (CFMetaData table : LegacySchemaTables) - systemTables = systemTables.without(table.cfName); - - LegacySchemaTables.forEach(Schema.instance::unload); - LegacySchemaTables.forEach((cfm) -> org.apache.cassandra.db.Keyspace.openAndGetStore(cfm).invalidate()); - - Schema.instance.setKeyspaceMetadata(systemKeyspace.withSwapped(systemTables)); - } - - private static void truncateLegacySchemaTables() - { - LegacySchemaTables.forEach(table -> Schema.instance.getColumnFamilyStoreInstance(table.cfId).truncateBlocking()); - } - - private static void storeKeyspaceInNewSchemaTables(Keyspace keyspace) - { - logger.info("Migrating keyspace {}", keyspace); - - Mutation.SimpleBuilder builder = SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params, keyspace.timestamp); - for (Table table : keyspace.tables) - SchemaKeyspace.addTableToSchemaMutation(table.metadata, true, builder.timestamp(table.timestamp)); - - for (Type type : keyspace.types) - SchemaKeyspace.addTypeToSchemaMutation(type.metadata, builder.timestamp(type.timestamp)); - - for (Function function : keyspace.functions) - SchemaKeyspace.addFunctionToSchemaMutation(function.metadata, builder.timestamp(function.timestamp)); - - for (Aggregate aggregate : keyspace.aggregates) - SchemaKeyspace.addAggregateToSchemaMutation(aggregate.metadata, builder.timestamp(aggregate.timestamp)); - - builder.build().apply(); - } - - /* - * Read all keyspaces metadata (including nested tables, types, and functions), with their modification timestamps - */ - private static Collection readSchema() - { - String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_KEYSPACES); - Collection keyspaceNames = new ArrayList<>(); - query(query).forEach(row -> keyspaceNames.add(row.getString("keyspace_name"))); - keyspaceNames.removeAll(SchemaConstants.SYSTEM_KEYSPACE_NAMES); - - Collection keyspaces = new ArrayList<>(); - keyspaceNames.forEach(name -> keyspaces.add(readKeyspace(name))); - return keyspaces; - } - - private static Keyspace readKeyspace(String keyspaceName) - { - long timestamp = readKeyspaceTimestamp(keyspaceName); - KeyspaceParams params = readKeyspaceParams(keyspaceName); - - Collection tables = readTables(keyspaceName); - Collection types = readTypes(keyspaceName); - Collection functions = readFunctions(keyspaceName); - Functions.Builder functionsBuilder = Functions.builder(); - functions.forEach(udf -> functionsBuilder.add(udf.metadata)); - Collection aggregates = readAggregates(functionsBuilder.build(), keyspaceName); - - return new Keyspace(timestamp, keyspaceName, params, tables, types, functions, aggregates); - } - - /* - * Reading keyspace params - */ - - private static long readKeyspaceTimestamp(String keyspaceName) - { - String query = format("SELECT writeTime(durable_writes) AS timestamp FROM %s.%s WHERE keyspace_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_KEYSPACES); - return query(query, keyspaceName).one().getLong("timestamp"); - } - - private static KeyspaceParams readKeyspaceParams(String keyspaceName) - { - String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_KEYSPACES); - UntypedResultSet.Row row = query(query, keyspaceName).one(); - - boolean durableWrites = row.getBoolean("durable_writes"); - - Map replication = new HashMap<>(); - replication.putAll(fromJsonMap(row.getString("strategy_options"))); - replication.put(ReplicationParams.CLASS, row.getString("strategy_class")); - - return KeyspaceParams.create(durableWrites, replication); - } - - /* - * Reading tables - */ - - private static Collection
readTables(String keyspaceName) - { - String query = format("SELECT columnfamily_name FROM %s.%s WHERE keyspace_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_COLUMNFAMILIES); - Collection tableNames = new ArrayList<>(); - query(query, keyspaceName).forEach(row -> tableNames.add(row.getString("columnfamily_name"))); - - Collection
tables = new ArrayList<>(); - tableNames.forEach(name -> tables.add(readTable(keyspaceName, name))); - return tables; - } - - private static Table readTable(String keyspaceName, String tableName) - { - long timestamp = readTableTimestamp(keyspaceName, tableName); - CFMetaData metadata = readTableMetadata(keyspaceName, tableName); - return new Table(timestamp, metadata); - } - - private static long readTableTimestamp(String keyspaceName, String tableName) - { - String query = format("SELECT writeTime(type) AS timestamp FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_COLUMNFAMILIES); - return query(query, keyspaceName, tableName).one().getLong("timestamp"); - } - - private static CFMetaData readTableMetadata(String keyspaceName, String tableName) - { - String tableQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_COLUMNFAMILIES); - UntypedResultSet.Row tableRow = query(tableQuery, keyspaceName, tableName).one(); - - String columnsQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_COLUMNS); - UntypedResultSet columnRows = query(columnsQuery, keyspaceName, tableName); - - String triggersQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_TRIGGERS); - UntypedResultSet triggerRows = query(triggersQuery, keyspaceName, tableName); - - return decodeTableMetadata(tableRow, columnRows, triggerRows); - } - - private static CFMetaData decodeTableMetadata(UntypedResultSet.Row tableRow, - UntypedResultSet columnRows, - UntypedResultSet triggerRows) - { - String ksName = tableRow.getString("keyspace_name"); - String cfName = tableRow.getString("columnfamily_name"); - - AbstractType rawComparator = TypeParser.parse(tableRow.getString("comparator")); - AbstractType subComparator = tableRow.has("subcomparator") ? TypeParser.parse(tableRow.getString("subcomparator")) : null; - - boolean isSuper = "super".equals(tableRow.getString("type").toLowerCase(Locale.ENGLISH)); - boolean isCompound = rawComparator instanceof CompositeType || isSuper; - - /* - * Determine whether or not the table is *really* dense - * We cannot trust is_dense value of true (see CASSANDRA-11502, that fixed the issue for 2.2 only, and not retroactively), - * but we can trust is_dense value of false. - */ - Boolean rawIsDense = tableRow.has("is_dense") ? tableRow.getBoolean("is_dense") : null; - boolean isDense; - if (rawIsDense != null && !rawIsDense) - isDense = false; - else - isDense = calculateIsDense(rawComparator, columnRows); - - // now, if switched to sparse, remove redundant compact_value column and the last clustering column, - // directly copying CASSANDRA-11502 logic. See CASSANDRA-11315. - Iterable filteredColumnRows = !isDense && (rawIsDense == null || rawIsDense) - ? filterOutRedundantRowsForSparse(columnRows, isSuper, isCompound) - : columnRows; - - // We don't really use the default validator but as we have it for backward compatibility, we use it to know if it's a counter table - AbstractType defaultValidator = TypeParser.parse(tableRow.getString("default_validator")); - boolean isCounter = defaultValidator instanceof CounterColumnType; - - /* - * With CASSANDRA-5202 we stopped inferring the cf id from the combination of keyspace/table names, - * and started storing the generated uuids in system.schema_columnfamilies. - * - * In 3.0 we SHOULD NOT see tables like that (2.0-created, non-upgraded). - * But in the off-chance that we do, we generate the deterministic uuid here. - */ - UUID cfId = tableRow.has("cf_id") - ? tableRow.getUUID("cf_id") - : CFMetaData.generateLegacyCfId(ksName, cfName); - - boolean isCQLTable = !isSuper && !isDense && isCompound; - boolean isStaticCompactTable = !isDense && !isCompound; - - // Internally, compact tables have a specific layout, see CompactTables. But when upgrading from - // previous versions, they may not have the expected schema, so detect if we need to upgrade and do - // it in createColumnsFromColumnRows. - // We can remove this once we don't support upgrade from versions < 3.0. - boolean needsUpgrade = !isCQLTable && checkNeedsUpgrade(filteredColumnRows, isSuper, isStaticCompactTable); - - List columnDefs = createColumnsFromColumnRows(filteredColumnRows, - ksName, - cfName, - rawComparator, - subComparator, - isSuper, - isCQLTable, - isStaticCompactTable, - needsUpgrade); - - if (needsUpgrade) - { - addDefinitionForUpgrade(columnDefs, - ksName, - cfName, - isStaticCompactTable, - isSuper, - rawComparator, - subComparator, - defaultValidator); - } - - CFMetaData cfm = CFMetaData.create(ksName, - cfName, - cfId, - isDense, - isCompound, - isSuper, - isCounter, - false, // legacy schema did not contain views - columnDefs, - DatabaseDescriptor.getPartitioner()); - - Indexes indexes = createIndexesFromColumnRows(cfm, - filteredColumnRows, - ksName, - cfName, - rawComparator, - subComparator, - isSuper, - isCQLTable, - isStaticCompactTable, - needsUpgrade); - cfm.indexes(indexes); - - if (tableRow.has("dropped_columns")) - addDroppedColumns(cfm, rawComparator, tableRow.getMap("dropped_columns", UTF8Type.instance, LongType.instance)); - - return cfm.params(decodeTableParams(tableRow)) - .triggers(createTriggersFromTriggerRows(triggerRows)); - } - - /* - * We call dense a CF for which each component of the comparator is a clustering column, i.e. no - * component is used to store a regular column names. In other words, non-composite static "thrift" - * and CQL3 CF are *not* dense. - * We save whether the table is dense or not during table creation through CQL, but we don't have this - * information for table just created through thrift, nor for table prior to CASSANDRA-7744, so this - * method does its best to infer whether the table is dense or not based on other elements. - */ - private static boolean calculateIsDense(AbstractType comparator, UntypedResultSet columnRows) - { - /* - * As said above, this method is only here because we need to deal with thrift upgrades. - * Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once, - * then we'll have saved the "is_dense" value and will be good to go. - * - * But non-upgraded thrift CF (and pre-7744 CF) will have no value for "is_dense", so we need - * to infer that information without relying on it in that case. And for the most part this is - * easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not - * having a REGULAR definition may not mean dense because of CQL3 definitions that have only the - * PRIMARY KEY defined. - * - * So we need to recognize those special case CQL3 table with only a primary key. If we have some - * clustering columns, we're fine as said above. So the only problem is that we cannot decide for - * sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it - * has been created in CQL3 by say: - * CREATE TABLE test (k int PRIMARY KEY) - * in which case it should not be dense. However, we can limit our margin of error by assuming we are - * in the latter case only if the comparator is exactly CompositeType(UTF8Type). - */ - for (UntypedResultSet.Row columnRow : columnRows) - if ("regular".equals(columnRow.getString("type"))) - return false; - - int maxClusteringIdx = -1; - for (UntypedResultSet.Row columnRow : columnRows) - if ("clustering_key".equals(columnRow.getString("type"))) - maxClusteringIdx = Math.max(maxClusteringIdx, columnRow.has("component_index") ? columnRow.getInt("component_index") : 0); - - return maxClusteringIdx >= 0 - ? maxClusteringIdx == comparator.componentsCount() - 1 - : !isCQL3OnlyPKComparator(comparator); - } - - private static Iterable filterOutRedundantRowsForSparse(UntypedResultSet columnRows, boolean isSuper, boolean isCompound) - { - Collection filteredRows = new ArrayList<>(); - for (UntypedResultSet.Row columnRow : columnRows) - { - String kind = columnRow.getString("type"); - - if ("compact_value".equals(kind)) - continue; - - if ("clustering_key".equals(kind)) - { - int position = columnRow.has("component_index") ? columnRow.getInt("component_index") : 0; - if (isSuper && position != 0) - continue; - - if (!isSuper && !isCompound) - continue; - } - - filteredRows.add(columnRow); - } - - return filteredRows; - } - - private static boolean isCQL3OnlyPKComparator(AbstractType comparator) - { - if (!(comparator instanceof CompositeType)) - return false; - - CompositeType ct = (CompositeType)comparator; - return ct.types.size() == 1 && ct.types.get(0) instanceof UTF8Type; - } - - private static TableParams decodeTableParams(UntypedResultSet.Row row) - { - TableParams.Builder params = TableParams.builder(); - - params.readRepairChance(row.getDouble("read_repair_chance")) - .dcLocalReadRepairChance(row.getDouble("local_read_repair_chance")) - .gcGraceSeconds(row.getInt("gc_grace_seconds")); - - if (row.has("comment")) - params.comment(row.getString("comment")); - - if (row.has("memtable_flush_period_in_ms")) - params.memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms")); - - params.caching(CachingParams.fromMap(fromJsonMap(row.getString("caching")))); - - if (row.has("default_time_to_live")) - params.defaultTimeToLive(row.getInt("default_time_to_live")); - - if (row.has("speculative_retry")) - params.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))); - - Map compressionParameters = fromJsonMap(row.getString("compression_parameters")); - String crcCheckChance = compressionParameters.remove("crc_check_chance"); - //crc_check_chance was promoted from a compression property to a top-level property - if (crcCheckChance != null) - params.crcCheckChance(Double.parseDouble(crcCheckChance)); - - params.compression(CompressionParams.fromMap(compressionParameters)); - - params.compaction(compactionFromRow(row)); - - if (row.has("min_index_interval")) - params.minIndexInterval(row.getInt("min_index_interval")); - - if (row.has("max_index_interval")) - params.maxIndexInterval(row.getInt("max_index_interval")); - - if (row.has("bloom_filter_fp_chance")) - params.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance")); - - return params.build(); - } - - /* - * The method is needed - to migrate max_compaction_threshold and min_compaction_threshold - * to the compaction map, where they belong. - * - * We must use reflection to validate the options because not every compaction strategy respects and supports - * the threshold params (LCS doesn't, STCS and DTCS do). - */ - @SuppressWarnings("unchecked") - private static CompactionParams compactionFromRow(UntypedResultSet.Row row) - { - Class klass = - CFMetaData.createCompactionStrategy(row.getString("compaction_strategy_class")); - Map options = fromJsonMap(row.getString("compaction_strategy_options")); - - int minThreshold = row.getInt("min_compaction_threshold"); - int maxThreshold = row.getInt("max_compaction_threshold"); - - Map optionsWithThresholds = new HashMap<>(options); - optionsWithThresholds.putIfAbsent(CompactionParams.Option.MIN_THRESHOLD.toString(), Integer.toString(minThreshold)); - optionsWithThresholds.putIfAbsent(CompactionParams.Option.MAX_THRESHOLD.toString(), Integer.toString(maxThreshold)); - - try - { - Map unrecognizedOptions = - (Map) klass.getMethod("validateOptions", Map.class).invoke(null, optionsWithThresholds); - - if (unrecognizedOptions.isEmpty()) - options = optionsWithThresholds; - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - return CompactionParams.create(klass, options); - } - - // Should only be called on compact tables - private static boolean checkNeedsUpgrade(Iterable defs, boolean isSuper, boolean isStaticCompactTable) - { - if (isSuper) - { - // Check if we've added the "supercolumn map" column yet or not - for (UntypedResultSet.Row row : defs) - if (row.getString("column_name").isEmpty()) - return false; - return true; - } - - // For static compact tables, we need to upgrade if the regular definitions haven't been converted to static yet, - // i.e. if we don't have a static definition yet. - if (isStaticCompactTable) - return !hasKind(defs, ColumnDefinition.Kind.STATIC); - - // For dense compact tables, we need to upgrade if we don't have a compact value definition - return !hasRegularColumns(defs); - } - - private static boolean hasRegularColumns(Iterable columnRows) - { - for (UntypedResultSet.Row row : columnRows) - { - /* - * We need to special case and ignore the empty compact column (pre-3.0, COMPACT STORAGE, primary-key only tables), - * since deserializeKind() will otherwise just return a REGULAR. - * We want the proper EmptyType regular column to be added by addDefinitionForUpgrade(), so we need - * checkNeedsUpgrade() to return true in this case. - * See CASSANDRA-9874. - */ - if (isEmptyCompactValueColumn(row)) - return false; - - if (deserializeKind(row.getString("type")) == ColumnDefinition.Kind.REGULAR) - return true; - } - - return false; - } - - private static boolean isEmptyCompactValueColumn(UntypedResultSet.Row row) - { - return "compact_value".equals(row.getString("type")) && row.getString("column_name").isEmpty(); - } - - private static void addDefinitionForUpgrade(List defs, - String ksName, - String cfName, - boolean isStaticCompactTable, - boolean isSuper, - AbstractType rawComparator, - AbstractType subComparator, - AbstractType defaultValidator) - { - CompactTables.DefaultNames names = CompactTables.defaultNameGenerator(defs); - - if (isSuper) - { - defs.add(ColumnDefinition.regularDef(ksName, cfName, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true))); - } - else if (isStaticCompactTable) - { - defs.add(ColumnDefinition.clusteringDef(ksName, cfName, names.defaultClusteringName(), rawComparator, 0)); - defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), defaultValidator)); - } - else - { - // For dense compact tables, we get here if we don't have a compact value column, in which case we should add it - // (we use EmptyType to recognize that the compact value was not declared by the use (see CreateTableStatement too)) - defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), EmptyType.instance)); - } - } - - private static boolean hasKind(Iterable defs, ColumnDefinition.Kind kind) - { - for (UntypedResultSet.Row row : defs) - if (deserializeKind(row.getString("type")) == kind) - return true; - - return false; - } - - /* - * Prior to 3.0 we used to not store the type of the dropped columns, relying on all collection info being - * present in the comparator, forever. That allowed us to perform certain validations in AlterTableStatement - * (namely not allowing to re-add incompatible collection columns, with the same name, but a different type). - * - * In 3.0, we no longer preserve the original comparator, and reconstruct it from the columns instead. That means - * that we should preserve the type of the dropped columns now, and, during migration, fetch the types from - * the original comparator if necessary. - */ - private static void addDroppedColumns(CFMetaData cfm, AbstractType comparator, Map droppedTimes) - { - AbstractType last = comparator.getComponents().get(comparator.componentsCount() - 1); - Map collections = last instanceof ColumnToCollectionType - ? ((ColumnToCollectionType) last).defined - : Collections.emptyMap(); - - for (Map.Entry entry : droppedTimes.entrySet()) - { - String name = entry.getKey(); - ByteBuffer nameBytes = UTF8Type.instance.decompose(name); - long time = entry.getValue(); - - AbstractType type = collections.containsKey(nameBytes) - ? collections.get(nameBytes) - : BytesType.instance; - - cfm.getDroppedColumns().put(nameBytes, new CFMetaData.DroppedColumn(name, type, time, ColumnDefinition.Kind.REGULAR)); - } - } - - private static List createColumnsFromColumnRows(Iterable rows, - String keyspace, - String table, - AbstractType rawComparator, - AbstractType rawSubComparator, - boolean isSuper, - boolean isCQLTable, - boolean isStaticCompactTable, - boolean needsUpgrade) - { - List columns = new ArrayList<>(); - - for (UntypedResultSet.Row row : rows) - { - // Skip the empty compact value column. Make addDefinitionForUpgrade() re-add the proper REGULAR one. - if (isEmptyCompactValueColumn(row)) - continue; - - columns.add(createColumnFromColumnRow(row, - keyspace, - table, - rawComparator, - rawSubComparator, - isSuper, - isCQLTable, - isStaticCompactTable, - needsUpgrade)); - } - - return columns; - } - - private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row, - String keyspace, - String table, - AbstractType rawComparator, - AbstractType rawSubComparator, - boolean isSuper, - boolean isCQLTable, - boolean isStaticCompactTable, - boolean needsUpgrade) - { - String rawKind = row.getString("type"); - - ColumnDefinition.Kind kind = deserializeKind(rawKind); - if (needsUpgrade && isStaticCompactTable && kind == ColumnDefinition.Kind.REGULAR) - kind = ColumnDefinition.Kind.STATIC; - - int componentIndex = ColumnDefinition.NO_POSITION; - // Note that the component_index is not useful for non-primary key parts (it never really in fact since there is - // no particular ordering of non-PK columns, we only used to use it as a simplification but that's not needed - // anymore) - if (kind.isPrimaryKeyKind()) - // We use to not have a component index when there was a single partition key, we don't anymore (#10491) - componentIndex = row.has("component_index") ? row.getInt("component_index") : 0; - - // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we - // we need to use the comparator fromString method - AbstractType comparator = isCQLTable - ? UTF8Type.instance - : CompactTables.columnDefinitionComparator(rawKind, isSuper, rawComparator, rawSubComparator); - ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator); - - AbstractType validator = parseType(row.getString("validator")); - - // In the 2.x schema we didn't store UDT's with a FrozenType wrapper because they were implicitly frozen. After - // CASSANDRA-7423 (non-frozen UDTs), this is no longer true, so we need to freeze UDTs and nested freezable - // types (UDTs and collections) to properly migrate the schema. See CASSANDRA-11609 and CASSANDRA-11613. - if (validator.isUDT() && validator.isMultiCell()) - validator = validator.freeze(); - else - validator = validator.freezeNestedMulticellTypes(); - - return new ColumnDefinition(keyspace, table, name, validator, componentIndex, kind); - } - - private static Indexes createIndexesFromColumnRows(CFMetaData cfm, - Iterable rows, - String keyspace, - String table, - AbstractType rawComparator, - AbstractType rawSubComparator, - boolean isSuper, - boolean isCQLTable, - boolean isStaticCompactTable, - boolean needsUpgrade) - { - Indexes.Builder indexes = Indexes.builder(); - - for (UntypedResultSet.Row row : rows) - { - IndexMetadata.Kind kind = null; - if (row.has("index_type")) - kind = IndexMetadata.Kind.valueOf(row.getString("index_type")); - - if (kind == null) - continue; - - Map indexOptions = null; - if (row.has("index_options")) - indexOptions = fromJsonMap(row.getString("index_options")); - - if (row.has("index_name")) - { - String indexName = row.getString("index_name"); - - ColumnDefinition column = createColumnFromColumnRow(row, - keyspace, - table, - rawComparator, - rawSubComparator, - isSuper, - isCQLTable, - isStaticCompactTable, - needsUpgrade); - - indexes.add(IndexMetadata.fromLegacyMetadata(cfm, column, indexName, kind, indexOptions)); - } - else - { - logger.error("Failed to find index name for legacy migration of index on {}.{}", keyspace, table); - } - } - - return indexes.build(); - } - - private static ColumnDefinition.Kind deserializeKind(String kind) - { - if ("clustering_key".equalsIgnoreCase(kind)) - return ColumnDefinition.Kind.CLUSTERING; - - if ("compact_value".equalsIgnoreCase(kind)) - return ColumnDefinition.Kind.REGULAR; - - return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase()); - } - - private static Triggers createTriggersFromTriggerRows(UntypedResultSet rows) - { - Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder(); - rows.forEach(row -> triggers.add(createTriggerFromTriggerRow(row))); - return triggers.build(); - } - - private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row) - { - String name = row.getString("trigger_name"); - String classOption = row.getTextMap("trigger_options").get("class"); - return new TriggerMetadata(name, classOption); - } - - /* - * Reading user types - */ - - private static Collection readTypes(String keyspaceName) - { - String query = format("SELECT type_name FROM %s.%s WHERE keyspace_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_USERTYPES); - Collection typeNames = new ArrayList<>(); - query(query, keyspaceName).forEach(row -> typeNames.add(row.getString("type_name"))); - - Collection types = new ArrayList<>(); - typeNames.forEach(name -> types.add(readType(keyspaceName, name))); - return types; - } - - private static Type readType(String keyspaceName, String typeName) - { - long timestamp = readTypeTimestamp(keyspaceName, typeName); - UserType metadata = readTypeMetadata(keyspaceName, typeName); - return new Type(timestamp, metadata); - } - - /* - * Unfortunately there is not a single REGULAR column in system.schema_usertypes, so annoyingly we cannot - * use the writeTime() CQL function, and must resort to a lower level. - */ - private static long readTypeTimestamp(String keyspaceName, String typeName) - { - ColumnFamilyStore store = org.apache.cassandra.db.Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME) - .getColumnFamilyStore(SystemKeyspace.LEGACY_USERTYPES); - - ClusteringComparator comparator = store.metadata.comparator; - Slices slices = Slices.with(comparator, Slice.make(comparator, typeName)); - int nowInSec = FBUtilities.nowInSeconds(); - DecoratedKey key = store.metadata.decorateKey(AsciiType.instance.fromString(keyspaceName)); - SinglePartitionReadCommand command = SinglePartitionReadCommand.create(store.metadata, nowInSec, key, slices); - - try (ReadExecutionController controller = command.executionController(); - RowIterator partition = UnfilteredRowIterators.filter(command.queryMemtableAndDisk(store, controller), nowInSec)) - { - return partition.next().primaryKeyLivenessInfo().timestamp(); - } - } - - private static UserType readTypeMetadata(String keyspaceName, String typeName) - { - String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND type_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_USERTYPES); - UntypedResultSet.Row row = query(query, keyspaceName, typeName).one(); - - List names = - row.getList("field_names", UTF8Type.instance) - .stream() - .map(t -> FieldIdentifier.forInternalString(t)) - .collect(Collectors.toList()); - - List> types = - row.getList("field_types", UTF8Type.instance) - .stream() - .map(LegacySchemaMigrator::parseType) - .collect(Collectors.toList()); - - return new UserType(keyspaceName, bytes(typeName), names, types, true); - } - - /* - * Reading UDFs - */ - - private static Collection readFunctions(String keyspaceName) - { - String query = format("SELECT function_name, signature FROM %s.%s WHERE keyspace_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_FUNCTIONS); - HashMultimap> functionSignatures = HashMultimap.create(); - query(query, keyspaceName).forEach(row -> functionSignatures.put(row.getString("function_name"), row.getList("signature", UTF8Type.instance))); - - Collection functions = new ArrayList<>(); - functionSignatures.entries().forEach(pair -> functions.add(readFunction(keyspaceName, pair.getKey(), pair.getValue()))); - return functions; - } - - private static Function readFunction(String keyspaceName, String functionName, List signature) - { - long timestamp = readFunctionTimestamp(keyspaceName, functionName, signature); - UDFunction metadata = readFunctionMetadata(keyspaceName, functionName, signature); - return new Function(timestamp, metadata); - } - - private static long readFunctionTimestamp(String keyspaceName, String functionName, List signature) - { - String query = format("SELECT writeTime(return_type) AS timestamp " + - "FROM %s.%s " + - "WHERE keyspace_name = ? AND function_name = ? AND signature = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_FUNCTIONS); - return query(query, keyspaceName, functionName, signature).one().getLong("timestamp"); - } - - private static UDFunction readFunctionMetadata(String keyspaceName, String functionName, List signature) - { - String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND function_name = ? AND signature = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_FUNCTIONS); - UntypedResultSet.Row row = query(query, keyspaceName, functionName, signature).one(); - - FunctionName name = new FunctionName(keyspaceName, functionName); - - List argNames = new ArrayList<>(); - if (row.has("argument_names")) - for (String arg : row.getList("argument_names", UTF8Type.instance)) - argNames.add(new ColumnIdentifier(arg, true)); - - List> argTypes = new ArrayList<>(); - if (row.has("argument_types")) - for (String type : row.getList("argument_types", UTF8Type.instance)) - argTypes.add(parseType(type)); - - AbstractType returnType = parseType(row.getString("return_type")); - - String language = row.getString("language"); - String body = row.getString("body"); - boolean calledOnNullInput = row.getBoolean("called_on_null_input"); - - try - { - return UDFunction.create(name, argNames, argTypes, returnType, calledOnNullInput, language, body); - } - catch (InvalidRequestException e) - { - return UDFunction.createBrokenFunction(name, argNames, argTypes, returnType, calledOnNullInput, language, body, e); - } - } - - /* - * Reading UDAs - */ - - private static Collection readAggregates(Functions functions, String keyspaceName) - { - String query = format("SELECT aggregate_name, signature FROM %s.%s WHERE keyspace_name = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_AGGREGATES); - HashMultimap> aggregateSignatures = HashMultimap.create(); - query(query, keyspaceName).forEach(row -> aggregateSignatures.put(row.getString("aggregate_name"), row.getList("signature", UTF8Type.instance))); - - Collection aggregates = new ArrayList<>(); - aggregateSignatures.entries().forEach(pair -> aggregates.add(readAggregate(functions, keyspaceName, pair.getKey(), pair.getValue()))); - return aggregates; - } - - private static Aggregate readAggregate(Functions functions, String keyspaceName, String aggregateName, List signature) - { - long timestamp = readAggregateTimestamp(keyspaceName, aggregateName, signature); - UDAggregate metadata = readAggregateMetadata(functions, keyspaceName, aggregateName, signature); - return new Aggregate(timestamp, metadata); - } - - private static long readAggregateTimestamp(String keyspaceName, String aggregateName, List signature) - { - String query = format("SELECT writeTime(return_type) AS timestamp " + - "FROM %s.%s " + - "WHERE keyspace_name = ? AND aggregate_name = ? AND signature = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_AGGREGATES); - return query(query, keyspaceName, aggregateName, signature).one().getLong("timestamp"); - } - - private static UDAggregate readAggregateMetadata(Functions functions, String keyspaceName, String functionName, List signature) - { - String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND aggregate_name = ? AND signature = ?", - SchemaConstants.SYSTEM_KEYSPACE_NAME, - SystemKeyspace.LEGACY_AGGREGATES); - UntypedResultSet.Row row = query(query, keyspaceName, functionName, signature).one(); - - FunctionName name = new FunctionName(keyspaceName, functionName); - - List types = row.getList("argument_types", UTF8Type.instance); - - List> argTypes = new ArrayList<>(); - if (types != null) - { - argTypes = new ArrayList<>(types.size()); - for (String type : types) - argTypes.add(parseType(type)); - } - - AbstractType returnType = parseType(row.getString("return_type")); - - FunctionName stateFunc = new FunctionName(keyspaceName, row.getString("state_func")); - AbstractType stateType = parseType(row.getString("state_type")); - FunctionName finalFunc = row.has("final_func") ? new FunctionName(keyspaceName, row.getString("final_func")) : null; - ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null; - - try - { - return UDAggregate.create(functions, name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond); - } - catch (InvalidRequestException reason) - { - return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason); - } - } - - private static UntypedResultSet query(String query, Object... values) - { - return QueryProcessor.executeOnceInternal(query, values); - } - - private static AbstractType parseType(String str) - { - return TypeParser.parse(str); - } - - private static final class Keyspace - { - final long timestamp; - final String name; - final KeyspaceParams params; - final Collection
tables; - final Collection types; - final Collection functions; - final Collection aggregates; - - Keyspace(long timestamp, - String name, - KeyspaceParams params, - Collection
tables, - Collection types, - Collection functions, - Collection aggregates) - { - this.timestamp = timestamp; - this.name = name; - this.params = params; - this.tables = tables; - this.types = types; - this.functions = functions; - this.aggregates = aggregates; - } - } - - private static final class Table - { - final long timestamp; - final CFMetaData metadata; - - Table(long timestamp, CFMetaData metadata) - { - this.timestamp = timestamp; - this.metadata = metadata; - } - } - - private static final class Type - { - final long timestamp; - final UserType metadata; - - Type(long timestamp, UserType metadata) - { - this.timestamp = timestamp; - this.metadata = metadata; - } - } - - private static final class Function - { - final long timestamp; - final UDFunction metadata; - - Function(long timestamp, UDFunction metadata) - { - this.timestamp = timestamp; - this.metadata = metadata; - } - } - - private static final class Aggregate - { - final long timestamp; - final UDAggregate metadata; - - Aggregate(long timestamp, UDAggregate metadata) - { - this.timestamp = timestamp; - this.metadata = metadata; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 7aa926e..8944b7c 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -106,7 +106,7 @@ public abstract class AbstractReadExecutor if (traceState != null) traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); - MessageOut message = readCommand.createMessage(MessagingService.instance().getVersion(endpoint)); + MessageOut message = readCommand.createMessage(); MessagingService.instance().sendRRWithFailure(message, endpoint, handler); } @@ -291,8 +291,7 @@ public abstract class AbstractReadExecutor if (traceState != null) traceState.trace("speculating read retry on {}", extraReplica); logger.trace("speculating read retry on {}", extraReplica); - int version = MessagingService.instance().getVersion(extraReplica); - MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(version), extraReplica, handler); + MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler); speculated = true; cfs.metric.speculativeRetries.inc(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index a4e18c0..54fa7e2 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -454,10 +454,6 @@ public class CacheService implements CacheServiceMBean { public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException { - //Don't serialize old format entries since we didn't bother to implement serialization of both for simplicity - //https://issues.apache.org/jira/browse/CASSANDRA-10778 - if (!key.desc.version.storeRows()) return; - RowIndexEntry entry = CacheService.instance.keyCache.getInternal(key); if (entry == null) return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 5a97dfe..b41cc00 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -46,7 +46,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.batchlog.LegacyBatchlogMigrator; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; @@ -59,14 +58,12 @@ import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.StartupException; import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.hints.LegacyHintsMigrator; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.schema.LegacySchemaMigrator; import org.apache.cassandra.thrift.ThriftServer; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; @@ -205,18 +202,6 @@ public class CassandraDaemon exitOrFail(e.returnCode, e.getMessage(), e.getCause()); } - try - { - if (SystemKeyspace.snapshotOnVersionChange()) - { - SystemKeyspace.migrateDataDirs(); - } - } - catch (IOException e) - { - exitOrFail(3, e.getMessage(), e.getCause()); - } - // We need to persist this as soon as possible after startup checks. // This should be the first write to SystemKeyspace (CASSANDRA-11742) SystemKeyspace.persistLocalMetadata(); @@ -249,13 +234,6 @@ public class CassandraDaemon } }); - /* - * Migrate pre-3.0 keyspaces, tables, types, functions, and aggregates, to their new 3.0 storage. - * We don't (and can't) wait for commit log replay here, but we don't need to - all schema changes force - * explicit memtable flushes. - */ - LegacySchemaMigrator.migrate(); - // Populate token metadata before flushing, for token-aware sstable partitioning (#6696) StorageService.instance.populateTokenMetadata(); @@ -333,12 +311,6 @@ public class CassandraDaemon // Re-populate token metadata after commit log recover (new peers might be loaded onto system keyspace #10293) StorageService.instance.populateTokenMetadata(); - // migrate any legacy (pre-3.0) hints from system.hints table into the new store - new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate(); - - // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format) - LegacyBatchlogMigrator.migrate(); - // enable auto compaction for (Keyspace keyspace : Keyspace.all()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index be8eca1..48ad2c6 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -512,7 +512,7 @@ public class DataResolver extends ResponseResolver if (StorageProxy.canDoLocalRequest(source)) StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); else - MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler); + MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler); // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. handler.awaitResults(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 11c0b12..6e0fadb 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -247,10 +247,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size()); for (InetAddress endpoint : endpoints) - { - MessageOut message = command.createMessage(MessagingService.instance().getVersion(endpoint)); - MessagingService.instance().sendRR(message, endpoint, repairHandler); - } + MessagingService.instance().sendRR(command.createMessage(), endpoint, repairHandler); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/StartupChecks.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java index 83971dd..75f7788 100644 --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@ -259,14 +259,15 @@ public class StartupChecks FileVisitor sstableVisitor = new SimpleFileVisitor() { - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) { - if (!Descriptor.isValidFile(file.getFileName().toString())) + File file = path.toFile(); + if (!Descriptor.isValidFile(file)) return FileVisitResult.CONTINUE; try { - if (!Descriptor.fromFilename(file.toString()).isCompatible()) + if (!Descriptor.fromFilename(file).isCompatible()) invalid.add(file.toString()); } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index e0be68c..77862d6 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.batchlog.Batch; import org.apache.cassandra.batchlog.BatchlogManager; -import org.apache.cassandra.batchlog.LegacyBatchlogMigrator; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; @@ -909,10 +908,10 @@ public class StorageProxy implements StorageProxyMBean batchConsistencyLevel = consistency_level; } - final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel); + final Collection batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel); final UUID batchUUID = UUIDGen.getTimeUUID(); BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID, queryStartNanoTime)); + () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) @@ -969,33 +968,19 @@ public class StorageProxy implements StorageProxyMBean return replica.equals(FBUtilities.getBroadcastAddress()); } - private static void syncWriteToBatchlog(Collection mutations, BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime) + private static void syncWriteToBatchlog(Collection mutations, Collection endpoints, UUID uuid, long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException { - WriteResponseHandler handler = new WriteResponseHandler<>(endpoints.all, + WriteResponseHandler handler = new WriteResponseHandler<>(endpoints, Collections.emptyList(), - endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO, + endpoints.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO, Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME), null, WriteType.BATCH_LOG, queryStartNanoTime); Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations); - - if (!endpoints.current.isEmpty()) - syncWriteToBatchlog(handler, batch, endpoints.current); - - if (!endpoints.legacy.isEmpty()) - LegacyBatchlogMigrator.syncWriteToBatchlog(handler, batch, endpoints.legacy); - - handler.get(); - } - - private static void syncWriteToBatchlog(WriteResponseHandler handler, Batch batch, Collection endpoints) - throws WriteTimeoutException, WriteFailureException - { MessageOut message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer); - for (InetAddress target : endpoints) { logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size()); @@ -1005,15 +990,7 @@ public class StorageProxy implements StorageProxyMBean else MessagingService.instance().sendRR(message, target, handler); } - } - - private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime) - { - if (!endpoints.current.isEmpty()) - asyncRemoveFromBatchlog(endpoints.current, uuid); - - if (!endpoints.legacy.isEmpty()) - LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid, queryStartNanoTime); + handler.get(); } private static void asyncRemoveFromBatchlog(Collection endpoints, UUID uuid) @@ -1160,38 +1137,13 @@ public class StorageProxy implements StorageProxyMBean } /* - * A class to filter batchlog endpoints into legacy endpoints (version < 3.0) or not. - */ - private static final class BatchlogEndpoints - { - public final Collection all; - public final Collection current; - public final Collection legacy; - - BatchlogEndpoints(Collection endpoints) - { - all = endpoints; - current = new ArrayList<>(2); - legacy = new ArrayList<>(2); - - for (InetAddress ep : endpoints) - { - if (MessagingService.instance().getVersion(ep) >= MessagingService.VERSION_30) - current.add(ep); - else - legacy.add(ep); - } - } - } - - /* * Replicas are picked manually: * - replicas should be alive according to the failure detector * - replicas should be in the local datacenter * - choose min(2, number of qualifying candiates above) * - allow the local node to be the only replica only if it's a single-node DC */ - private static BatchlogEndpoints getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) + private static Collection getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); @@ -1202,12 +1154,12 @@ public class StorageProxy implements StorageProxyMBean if (chosenEndpoints.isEmpty()) { if (consistencyLevel == ConsistencyLevel.ANY) - return new BatchlogEndpoints(Collections.singleton(FBUtilities.getBroadcastAddress())); + return Collections.singleton(FBUtilities.getBroadcastAddress()); throw new UnavailableException(ConsistencyLevel.ONE, 1, 0); } - return new BatchlogEndpoints(chosenEndpoints); + return chosenEndpoints; } /** @@ -1816,9 +1768,8 @@ public class StorageProxy implements StorageProxyMBean for (InetAddress endpoint : executor.getContactedReplicas()) { - MessageOut message = command.createMessage(MessagingService.instance().getVersion(endpoint)); Tracing.trace("Enqueuing full data read to {}", endpoint); - MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler); + MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, repairHandler); } } } @@ -2218,9 +2169,8 @@ public class StorageProxy implements StorageProxyMBean { for (InetAddress endpoint : toQuery.filteredEndpoints) { - MessageOut message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint)); Tracing.trace("Enqueuing request to {}", endpoint); - MessagingService.instance().sendRRWithFailure(message, endpoint, handler); + MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), endpoint, handler); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 07eb1d8..62efed2 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -257,12 +257,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE legacyProgressSupport = new LegacyJMXProgressSupport(this, jmxObjectName); + ReadCommandVerbHandler readHandler = new ReadCommandVerbHandler(); + /* register the verb handlers */ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new RangeSliceVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, readHandler); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, readHandler); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, readHandler); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION, new CounterMutationVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE, new TruncateVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler()); @@ -2082,8 +2084,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public boolean isRpcReady(InetAddress endpoint) { - return MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_22 || - Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady(); + return Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady(); } public void setRpcReady(boolean value) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/paxos/Commit.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java index af94869..3b0364c 100644 --- a/src/java/org/apache/cassandra/service/paxos/Commit.java +++ b/src/java/org/apache/cassandra/service/paxos/Commit.java @@ -113,32 +113,20 @@ public class Commit { public void serialize(Commit commit, DataOutputPlus out, int version) throws IOException { - if (version < MessagingService.VERSION_30) - ByteBufferUtil.writeWithShortLength(commit.update.partitionKey().getKey(), out); - UUIDSerializer.serializer.serialize(commit.ballot, out, version); PartitionUpdate.serializer.serialize(commit.update, out, version); } public Commit deserialize(DataInputPlus in, int version) throws IOException { - ByteBuffer key = null; - if (version < MessagingService.VERSION_30) - key = ByteBufferUtil.readWithShortLength(in); - UUID ballot = UUIDSerializer.serializer.deserialize(in, version); - PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, key); + PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL); return new Commit(ballot, update); } public long serializedSize(Commit commit, int version) { - int size = 0; - if (version < MessagingService.VERSION_30) - size += ByteBufferUtil.serializedSizeWithShortLength(commit.update.partitionKey().getKey()); - - return size - + UUIDSerializer.serializer.serializedSize(commit.ballot, version) + return UUIDSerializer.serializer.serializedSize(commit.ballot, version) + PartitionUpdate.serializer.serializedSize(commit.update, version); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java index f843b8d..d8699c8 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java @@ -69,51 +69,22 @@ public class PrepareResponse { out.writeBoolean(response.promised); Commit.serializer.serialize(response.inProgressCommit, out, version); - - if (version < MessagingService.VERSION_30) - { - UUIDSerializer.serializer.serialize(response.mostRecentCommit.ballot, out, version); - PartitionUpdate.serializer.serialize(response.mostRecentCommit.update, out, version); - } - else - { - Commit.serializer.serialize(response.mostRecentCommit, out, version); - } + Commit.serializer.serialize(response.mostRecentCommit, out, version); } public PrepareResponse deserialize(DataInputPlus in, int version) throws IOException { boolean success = in.readBoolean(); Commit inProgress = Commit.serializer.deserialize(in, version); - Commit mostRecent; - if (version < MessagingService.VERSION_30) - { - UUID ballot = UUIDSerializer.serializer.deserialize(in, version); - PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, inProgress.update.partitionKey()); - mostRecent = new Commit(ballot, update); - } - else - { - mostRecent = Commit.serializer.deserialize(in, version); - } + Commit mostRecent = Commit.serializer.deserialize(in, version); return new PrepareResponse(success, inProgress, mostRecent); } public long serializedSize(PrepareResponse response, int version) { - long size = TypeSizes.sizeof(response.promised) - + Commit.serializer.serializedSize(response.inProgressCommit, version); - - if (version < MessagingService.VERSION_30) - { - size += UUIDSerializer.serializer.serializedSize(response.mostRecentCommit.ballot, version); - size += PartitionUpdate.serializer.serializedSize(response.mostRecentCommit.update, version); - } - else - { - size += Commit.serializer.serializedSize(response.mostRecentCommit, version); - } - return size; + return TypeSizes.sizeof(response.promised) + + Commit.serializer.serializedSize(response.inProgressCommit, version) + + Commit.serializer.serializedSize(response.mostRecentCommit, version); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 6465bf7..fab9372 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -196,16 +196,7 @@ public class StreamReader long totalSize, UUID sessionId) throws IOException { this.metadata = metadata; - // streaming pre-3.0 sstables require mark/reset support from source stream - if (version.correspondingMessagingVersion() < MessagingService.VERSION_30) - { - logger.trace("Initializing rewindable input stream for reading legacy sstable with {} bytes with following " + - "parameters: initial_mem_buffer_size={}, max_mem_buffer_size={}, max_spill_file_size={}.", - totalSize, INITIAL_MEM_BUFFER_SIZE, MAX_MEM_BUFFER_SIZE, MAX_SPILL_FILE_SIZE); - File bufferFile = getTempBufferFile(metadata, totalSize, sessionId); - this.in = new RewindableDataInputStreamPlus(in, INITIAL_MEM_BUFFER_SIZE, MAX_MEM_BUFFER_SIZE, bufferFile, MAX_SPILL_FILE_SIZE); - } else - this.in = new DataInputPlus.DataInputStreamPlus(in); + this.in = new DataInputPlus.DataInputStreamPlus(in); this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE); this.header = header; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 70b5765..2044d4d 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -35,6 +35,7 @@ import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.messages.FileMessageHeader; +import org.apache.cassandra.utils.ChecksumType; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -81,7 +82,7 @@ public class CompressedStreamReader extends StreamReader cfs.getColumnFamilyName()); CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, - inputVersion.compressedChecksumType(), cfs::getCrcCheckChance); + ChecksumType.CRC32, cfs::getCrcCheckChance); TrackedInputStream in = new TrackedInputStream(cis); StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata), http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index 232727d..b0639ea 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -189,13 +189,7 @@ public class FileMessageHeader UUIDSerializer.serializer.serialize(header.cfId, out, version); out.writeInt(header.sequenceNumber); out.writeUTF(header.version.toString()); - - //We can't stream to a node that doesn't understand a new sstable format - if (version < StreamMessage.VERSION_22 && header.format != SSTableFormat.Type.LEGACY && header.format != SSTableFormat.Type.BIG) - throw new UnsupportedOperationException("Can't stream non-legacy sstables to nodes < 2.2"); - - if (version >= StreamMessage.VERSION_22) - out.writeUTF(header.format.name); + out.writeUTF(header.format.name); out.writeLong(header.estimatedKeys); out.writeInt(header.sections.size()); @@ -212,8 +206,7 @@ public class FileMessageHeader out.writeLong(header.repairedAt); out.writeInt(header.sstableLevel); - if (version >= StreamMessage.VERSION_30 && header.version.storeRows()) - SerializationHeader.serializer.serialize(header.version, header.header, out); + SerializationHeader.serializer.serialize(header.version, header.header, out); return compressionInfo; } @@ -222,10 +215,7 @@ public class FileMessageHeader UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); int sequenceNumber = in.readInt(); Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF()); - - SSTableFormat.Type format = SSTableFormat.Type.LEGACY; - if (version >= StreamMessage.VERSION_22) - format = SSTableFormat.Type.validate(in.readUTF()); + SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF()); long estimatedKeys = in.readLong(); int count = in.readInt(); @@ -235,9 +225,7 @@ public class FileMessageHeader CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version); long repairedAt = in.readLong(); int sstableLevel = in.readInt(); - SerializationHeader.Component header = version >= StreamMessage.VERSION_30 && sstableVersion.storeRows() - ? SerializationHeader.serializer.deserialize(sstableVersion, in) - : null; + SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header); } @@ -247,10 +235,7 @@ public class FileMessageHeader long size = UUIDSerializer.serializer.serializedSize(header.cfId, version); size += TypeSizes.sizeof(header.sequenceNumber); size += TypeSizes.sizeof(header.version.toString()); - - if (version >= StreamMessage.VERSION_22) - size += TypeSizes.sizeof(header.format.name); - + size += TypeSizes.sizeof(header.format.name); size += TypeSizes.sizeof(header.estimatedKeys); size += TypeSizes.sizeof(header.sections.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index 7487aaf..3ce1958 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -33,8 +33,6 @@ import org.apache.cassandra.streaming.StreamSession; public abstract class StreamMessage { /** Streaming protocol version */ - public static final int VERSION_20 = 2; - public static final int VERSION_22 = 3; public static final int VERSION_30 = 4; public static final int CURRENT_VERSION = VERSION_30; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/tools/SSTableExport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java index 070434d..52d5ecf 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExport.java +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@ -93,8 +93,8 @@ public class SSTableExport */ public static CFMetaData metadataFromSSTable(Descriptor desc) throws IOException { - if (!desc.version.storeRows()) - throw new IOException("pre-3.0 SSTable is not supported."); + if (!desc.version.isCompatible()) + throw new IOException("Cannot process old and unsupported SSTable version."); EnumSet types = EnumSet.of(MetadataType.STATS, MetadataType.HEADER); Map sstableMetadata = desc.getMetadataSerializer().deserialize(desc, types); @@ -162,11 +162,6 @@ public class SSTableExport : cmd.getOptionValues(EXCLUDE_KEY_OPTION))); String ssTableFileName = new File(cmd.getArgs()[0]).getAbsolutePath(); - if (Descriptor.isLegacyFile(new File(ssTableFileName))) - { - System.err.println("Unsupported legacy sstable"); - System.exit(1); - } if (!new File(ssTableFileName).exists()) { System.err.println("Cannot find file " + ssTableFileName);