cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sn...@apache.org
Subject [5/5] cassandra git commit: Merge branch 'cassandra-2.2' into trunk
Date Sun, 12 Jul 2015 09:02:13 GMT
Merge branch 'cassandra-2.2' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/418c7936
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/418c7936
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/418c7936

Branch: refs/heads/trunk
Commit: 418c7936fb61ca3e385326bddb55ee4a81e97d53
Parents: c734cb8 fc202a7
Author: Robert Stupp <snazy@snazy.de>
Authored: Sun Jul 12 10:54:36 2015 +0200
Committer: Robert Stupp <snazy@snazy.de>
Committed: Sun Jul 12 10:54:36 2015 +0200

----------------------------------------------------------------------
 doc/cql3/CQL.textile                            |  4 +-
 .../statements/CreateAggregateStatement.java    | 13 ++----
 .../cassandra/schema/LegacySchemaMigrator.java  | 21 +--------
 .../apache/cassandra/schema/SchemaKeyspace.java | 16 ++-----
 .../validation/operations/AggregationTest.java  | 49 --------------------
 .../schema/LegacySchemaMigratorTest.java        |  4 +-
 6 files changed, 13 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/418c7936/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
index 16d9fc5,1d73e3f..0bb13e5
--- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
@@@ -88,9 -88,9 +88,9 @@@ public final class CreateAggregateState
          AbstractType<?> stateType = prepareType("state type", stateTypeRaw);
  
          List<AbstractType<?>> stateArgs = stateArguments(stateType, argTypes);
-         stateFunc = validateFunctionKeyspace(stateFunc, stateArgs);
+         stateFunc = validateFunctionKeyspace(stateFunc);
  
 -        Function f = Functions.find(stateFunc, stateArgs);
 +        Function f = Schema.instance.findFunction(stateFunc, stateArgs).orElse(null);
          if (!(f instanceof ScalarFunction))
              throw new InvalidRequestException("State function " + stateFuncSig(stateFunc,
stateTypeRaw, argRawTypes) + " does not exist or is not a scalar function");
          stateFunction = (ScalarFunction)f;
@@@ -102,8 -102,8 +102,8 @@@
          if (finalFunc != null)
          {
              List<AbstractType<?>> finalArgs = Collections.<AbstractType<?>>singletonList(stateType);
-             finalFunc = validateFunctionKeyspace(finalFunc, finalArgs);
+             finalFunc = validateFunctionKeyspace(finalFunc);
 -            f = Functions.find(finalFunc, finalArgs);
 +            f = Schema.instance.findFunction(finalFunc, finalArgs).orElse(null);
              if (!(f instanceof ScalarFunction))
                  throw new InvalidRequestException("Final function " + finalFunc + '(' +
stateTypeRaw + ") does not exist or is not a scalar function");
              finalFunction = (ScalarFunction) f;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/418c7936/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 996b5ff,0000000..dc9e168
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@@ -1,806 -1,0 +1,789 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.ImmutableList;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.functions.FunctionName;
 +import org.apache.cassandra.cql3.functions.UDAggregate;
 +import org.apache.cassandra.cql3.functions.UDFunction;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.db.rows.RowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.io.compress.CompressionParameters;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +
 +import static java.lang.String.format;
 +import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 +import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
 +
 +/**
 + * This majestic class performs migration from legacy (pre-3.0) system.schema_* schema tables
to the new and glorious
 + * system_schema keyspace.
 + *
 + * The goal is to not lose any information in the migration - including the timestamps.
 + */
 +@SuppressWarnings("deprecation")
 +public final class LegacySchemaMigrator
 +{
 +    private LegacySchemaMigrator()
 +    {
 +    }
 +
 +    private static final Logger logger = LoggerFactory.getLogger(LegacySchemaMigrator.class);
 +
 +    static final List<CFMetaData> LegacySchemaTables =
 +        ImmutableList.of(SystemKeyspace.LegacyKeyspaces,
 +                         SystemKeyspace.LegacyColumnfamilies,
 +                         SystemKeyspace.LegacyColumns,
 +                         SystemKeyspace.LegacyTriggers,
 +                         SystemKeyspace.LegacyUsertypes,
 +                         SystemKeyspace.LegacyFunctions,
 +                         SystemKeyspace.LegacyAggregates);
 +
 +    public static void migrate()
 +    {
 +        // read metadata from the legacy schema tables
 +        Collection<Keyspace> keyspaces = readSchema();
 +
 +        // if already upgraded, or starting a new 3.0 node, abort early
 +        if (keyspaces.isEmpty())
 +        {
 +            unloadLegacySchemaTables();
 +            return;
 +        }
 +
 +        // write metadata to the new schema tables
 +        logger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace
({})",
 +                    keyspaces.size(),
 +                    SchemaKeyspace.NAME);
 +        keyspaces.forEach(LegacySchemaMigrator::storeKeyspaceInNewSchemaTables);
 +
 +        // flush the new tables before truncating the old ones
 +        SchemaKeyspace.flush();
 +
 +        // truncate the original tables (will be snapshotted now, and will have been snapshotted
by pre-flight checks)
 +        logger.info("Truncating legacy schema tables");
 +        truncateLegacySchemaTables();
 +
 +        // remove legacy schema tables from Schema, so that their presence doesn't give
the users any wrong ideas
 +        unloadLegacySchemaTables();
 +
 +        logger.info("Completed migration of legacy schema tables");
 +    }
 +
 +    static void unloadLegacySchemaTables()
 +    {
 +        KeyspaceMetadata systemKeyspace = Schema.instance.getKSMetaData(SystemKeyspace.NAME);
 +
 +        Tables systemTables = systemKeyspace.tables;
 +        for (CFMetaData table : LegacySchemaTables)
 +            systemTables = systemTables.without(table.cfName);
 +
 +        LegacySchemaTables.forEach(Schema.instance::unload);
 +
 +        Schema.instance.setKeyspaceMetadata(systemKeyspace.withSwapped(systemTables));
 +    }
 +
 +    private static void truncateLegacySchemaTables()
 +    {
 +        LegacySchemaTables.forEach(table -> Schema.instance.getColumnFamilyStoreInstance(table.cfId).truncateBlocking());
 +    }
 +
 +    private static void storeKeyspaceInNewSchemaTables(Keyspace keyspace)
 +    {
 +        Mutation mutation = SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params,
keyspace.timestamp);
 +
 +        for (Table table : keyspace.tables)
 +            SchemaKeyspace.addTableToSchemaMutation(table.metadata, table.timestamp, true,
mutation);
 +
 +        for (Type type : keyspace.types)
 +            SchemaKeyspace.addTypeToSchemaMutation(type.metadata, type.timestamp, mutation);
 +
 +        for (Function function : keyspace.functions)
 +            SchemaKeyspace.addFunctionToSchemaMutation(function.metadata, function.timestamp,
mutation);
 +
 +        for (Aggregate aggregate : keyspace.aggregates)
 +            SchemaKeyspace.addAggregateToSchemaMutation(aggregate.metadata, aggregate.timestamp,
mutation);
 +
 +        mutation.apply();
 +    }
 +
 +    /*
 +     * Read all keyspaces metadata (including nested tables, types, and functions), with
their modification timestamps
 +     */
 +    private static Collection<Keyspace> readSchema()
 +    {
 +        String query = format("SELECT keyspace_name FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.LEGACY_KEYSPACES);
 +        Collection<String> keyspaceNames = new ArrayList<>();
 +        query(query).forEach(row -> keyspaceNames.add(row.getString("keyspace_name")));
 +        keyspaceNames.removeAll(Schema.SYSTEM_KEYSPACE_NAMES);
 +
 +        Collection<Keyspace> keyspaces = new ArrayList<>();
 +        keyspaceNames.forEach(name -> keyspaces.add(readKeyspace(name)));
 +        return keyspaces;
 +    }
 +
 +    private static Keyspace readKeyspace(String keyspaceName)
 +    {
 +        long timestamp = readKeyspaceTimestamp(keyspaceName);
 +        KeyspaceParams params = readKeyspaceParams(keyspaceName);
 +
 +        Collection<Table> tables = readTables(keyspaceName);
 +        Collection<Type> types = readTypes(keyspaceName);
 +        Collection<Function> functions = readFunctions(keyspaceName);
 +        Collection<Aggregate> aggregates = readAggregates(keyspaceName);
 +
 +        return new Keyspace(timestamp, keyspaceName, params, tables, types, functions, aggregates);
 +    }
 +
 +    /*
 +     * Reading keyspace params
 +     */
 +
 +    private static long readKeyspaceTimestamp(String keyspaceName)
 +    {
 +        String query = format("SELECT writeTime(durable_writes) AS timestamp FROM %s.%s
WHERE keyspace_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_KEYSPACES);
 +        return query(query, keyspaceName).one().getLong("timestamp");
 +    }
 +
 +    private static KeyspaceParams readKeyspaceParams(String keyspaceName)
 +    {
 +        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_KEYSPACES);
 +        UntypedResultSet.Row row = query(query, keyspaceName).one();
 +
 +        boolean durableWrites = row.getBoolean("durable_writes");
 +
 +        Map<String, String> replication = new HashMap<>();
 +        replication.putAll(fromJsonMap(row.getString("strategy_options")));
 +        replication.put(KeyspaceParams.Replication.CLASS, row.getString("strategy_class"));
 +
 +        return KeyspaceParams.create(durableWrites, replication);
 +    }
 +
 +    /*
 +     * Reading tables
 +     */
 +
 +    private static Collection<Table> readTables(String keyspaceName)
 +    {
 +        String query = format("SELECT columnfamily_name FROM %s.%s WHERE keyspace_name =
?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_COLUMNFAMILIES);
 +        Collection<String> tableNames = new ArrayList<>();
 +        query(query, keyspaceName).forEach(row -> tableNames.add(row.getString("columnfamily_name")));
 +
 +        Collection<Table> tables = new ArrayList<>();
 +        tableNames.forEach(name -> tables.add(readTable(keyspaceName, name)));
 +        return tables;
 +    }
 +
 +    private static Table readTable(String keyspaceName, String tableName)
 +    {
 +        long timestamp = readTableTimestamp(keyspaceName, tableName);
 +        CFMetaData metadata = readTableMetadata(keyspaceName, tableName);
 +        return new Table(timestamp, metadata);
 +    }
 +
 +    private static long readTableTimestamp(String keyspaceName, String tableName)
 +    {
 +        String query = format("SELECT writeTime(type) AS timestamp FROM %s.%s WHERE keyspace_name
= ? AND columnfamily_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_COLUMNFAMILIES);
 +        return query(query, keyspaceName, tableName).one().getLong("timestamp");
 +    }
 +
 +    private static CFMetaData readTableMetadata(String keyspaceName, String tableName)
 +    {
 +        String tableQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name
= ?",
 +                                   SystemKeyspace.NAME,
 +                                   SystemKeyspace.LEGACY_COLUMNFAMILIES);
 +        UntypedResultSet.Row tableRow = query(tableQuery, keyspaceName, tableName).one();
 +
 +        String columnsQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name
= ?",
 +                                     SystemKeyspace.NAME,
 +                                     SystemKeyspace.LEGACY_COLUMNS);
 +        UntypedResultSet columnRows = query(columnsQuery, keyspaceName, tableName);
 +
 +        String triggersQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name
= ?",
 +                                      SystemKeyspace.NAME,
 +                                      SystemKeyspace.LEGACY_TRIGGERS);
 +        UntypedResultSet triggerRows = query(triggersQuery, keyspaceName, tableName);
 +
 +        return decodeTableMetadata(tableRow, columnRows, triggerRows);
 +    }
 +
 +    private static CFMetaData decodeTableMetadata(UntypedResultSet.Row tableRow,
 +                                                  UntypedResultSet columnRows,
 +                                                  UntypedResultSet triggerRows)
 +    {
 +        String ksName = tableRow.getString("keyspace_name");
 +        String cfName = tableRow.getString("columnfamily_name");
 +
 +        AbstractType<?> rawComparator = TypeParser.parse(tableRow.getString("comparator"));
 +        AbstractType<?> subComparator = tableRow.has("subcomparator") ? TypeParser.parse(tableRow.getString("subcomparator"))
: null;
 +
 +        boolean isSuper = "super".equals(tableRow.getString("type").toLowerCase());
 +        boolean isDense = tableRow.getBoolean("is_dense");
 +        boolean isCompound = rawComparator instanceof CompositeType;
 +
 +        // We don't really use the default validator but as we have it for backward compatibility,
we use it to know if it's a counter table
 +        AbstractType<?> defaultValidator = TypeParser.parse(tableRow.getString("default_validator"));
 +        boolean isCounter = defaultValidator instanceof CounterColumnType;
 +
 +        /*
 +         * With CASSANDRA-5202 we stopped inferring the cf id from the combination of keyspace/table
names,
 +         * and started storing the generated uuids in system.schema_columnfamilies.
 +         *
 +         * In 3.0 we SHOULD NOT see tables like that (2.0-created, non-upgraded).
 +         * But in the off-chance that we do, we generate the deterministic uuid here.
 +         */
 +        UUID cfId = tableRow.has("cf_id")
 +                  ? tableRow.getUUID("cf_id")
 +                  : CFMetaData.generateLegacyCfId(ksName, cfName);
 +
 +        boolean isCQLTable = !isSuper && !isDense && isCompound;
 +        boolean isStaticCompactTable = !isDense && !isCompound;
 +
 +        // Internally, compact tables have a specific layout, see CompactTables. But when
upgrading from
 +        // previous versions, they may not have the expected schema, so detect if we need
to upgrade and do
 +        // it in createColumnsFromColumnRows.
 +        // We can remove this once we don't support upgrade from versions < 3.0.
 +        boolean needsUpgrade = !isCQLTable && checkNeedsUpgrade(columnRows, isSuper,
isStaticCompactTable);
 +
 +        List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(columnRows,
 +                                                                        ksName,
 +                                                                        cfName,
 +                                                                        rawComparator,
 +                                                                        subComparator,
 +                                                                        isSuper,
 +                                                                        isCQLTable,
 +                                                                        isStaticCompactTable,
 +                                                                        needsUpgrade);
 +
 +        if (needsUpgrade)
 +            addDefinitionForUpgrade(columnDefs, ksName, cfName, isStaticCompactTable, isSuper,
rawComparator, subComparator, defaultValidator);
 +
 +        CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper,
isCounter, columnDefs);
 +
 +        cfm.readRepairChance(tableRow.getDouble("read_repair_chance"));
 +        cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance"));
 +        cfm.gcGraceSeconds(tableRow.getInt("gc_grace_seconds"));
 +        cfm.minCompactionThreshold(tableRow.getInt("min_compaction_threshold"));
 +        cfm.maxCompactionThreshold(tableRow.getInt("max_compaction_threshold"));
 +        if (tableRow.has("comment"))
 +            cfm.comment(tableRow.getString("comment"));
 +        if (tableRow.has("memtable_flush_period_in_ms"))
 +            cfm.memtableFlushPeriod(tableRow.getInt("memtable_flush_period_in_ms"));
 +        cfm.caching(CachingOptions.fromString(tableRow.getString("caching")));
 +        if (tableRow.has("default_time_to_live"))
 +            cfm.defaultTimeToLive(tableRow.getInt("default_time_to_live"));
 +        if (tableRow.has("speculative_retry"))
 +            cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(tableRow.getString("speculative_retry")));
 +        cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(tableRow.getString("compaction_strategy_class")));
 +        cfm.compressionParameters(CompressionParameters.create(fromJsonMap(tableRow.getString("compression_parameters"))));
 +        cfm.compactionStrategyOptions(fromJsonMap(tableRow.getString("compaction_strategy_options")));
 +
 +        if (tableRow.has("min_index_interval"))
 +            cfm.minIndexInterval(tableRow.getInt("min_index_interval"));
 +
 +        if (tableRow.has("max_index_interval"))
 +            cfm.maxIndexInterval(tableRow.getInt("max_index_interval"));
 +
 +        if (tableRow.has("bloom_filter_fp_chance"))
 +            cfm.bloomFilterFpChance(tableRow.getDouble("bloom_filter_fp_chance"));
 +        else
 +            cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
 +
 +        if (tableRow.has("dropped_columns"))
 +            addDroppedColumns(cfm, tableRow.getMap("dropped_columns", UTF8Type.instance,
LongType.instance), Collections.emptyMap());
 +
 +        cfm.triggers(createTriggersFromTriggerRows(triggerRows));
 +
 +        return cfm;
 +    }
 +
 +    // Should only be called on compact tables
 +    private static boolean checkNeedsUpgrade(UntypedResultSet defs, boolean isSuper, boolean
isStaticCompactTable)
 +    {
 +        if (isSuper)
 +        {
 +            // Check if we've added the "supercolumn map" column yet or not
 +            for (UntypedResultSet.Row row : defs)
 +                if (row.getString("column_name").isEmpty())
 +                    return false;
 +            return true;
 +        }
 +
 +        // For static compact tables, we need to upgrade if the regular definitions haven't
been converted to static yet,
 +        // i.e. if we don't have a static definition yet.
 +        if (isStaticCompactTable)
 +            return !hasKind(defs, ColumnDefinition.Kind.STATIC);
 +
 +        // For dense compact tables, we need to upgrade if we don't have a compact value
definition
 +        return !hasKind(defs, ColumnDefinition.Kind.REGULAR);
 +    }
 +
 +    private static void addDefinitionForUpgrade(List<ColumnDefinition> defs,
 +                                                String ksName,
 +                                                String cfName,
 +                                                boolean isStaticCompactTable,
 +                                                boolean isSuper,
 +                                                AbstractType<?> rawComparator,
 +                                                AbstractType<?> subComparator,
 +                                                AbstractType<?> defaultValidator)
 +    {
 +        CompactTables.DefaultNames names = CompactTables.defaultNameGenerator(defs);
 +
 +        if (isSuper)
 +        {
 +            defs.add(ColumnDefinition.regularDef(ksName, cfName, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR,
MapType.getInstance(subComparator, defaultValidator, true), null));
 +        }
 +        else if (isStaticCompactTable)
 +        {
 +            defs.add(ColumnDefinition.clusteringKeyDef(ksName, cfName, names.defaultClusteringName(),
rawComparator, null));
 +            defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(),
defaultValidator, null));
 +        }
 +        else
 +        {
 +            // For dense compact tables, we get here if we don't have a compact value column,
in which case we should add it
 +            // (we use EmptyType to recognize that the compact value was not declared by
the use (see CreateTableStatement too))
 +            defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(),
EmptyType.instance, null));
 +        }
 +    }
 +
 +    private static boolean hasKind(UntypedResultSet defs, ColumnDefinition.Kind kind)
 +    {
 +        for (UntypedResultSet.Row row : defs)
 +        {
 +            if (deserializeKind(row.getString("type")) == kind)
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    private static void addDroppedColumns(CFMetaData cfm, Map<String, Long> droppedTimes,
Map<String, String> types)
 +    {
 +        for (Map.Entry<String, Long> entry : droppedTimes.entrySet())
 +        {
 +            String name = entry.getKey();
 +            long time = entry.getValue();
 +            AbstractType<?> type = types.containsKey(name) ? TypeParser.parse(types.get(name))
: null;
 +            cfm.getDroppedColumns().put(ColumnIdentifier.getInterned(name, true), new CFMetaData.DroppedColumn(type,
time));
 +        }
 +    }
 +
 +    private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet
rows,
 +                                                                      String keyspace,
 +                                                                      String table,
 +                                                                      AbstractType<?>
rawComparator,
 +                                                                      AbstractType<?>
rawSubComparator,
 +                                                                      boolean isSuper,
 +                                                                      boolean isCQLTable,
 +                                                                      boolean isStaticCompactTable,
 +                                                                      boolean needsUpgrade)
 +    {
 +        List<ColumnDefinition> columns = new ArrayList<>();
 +        for (UntypedResultSet.Row row : rows)
 +            columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, rawSubComparator,
isSuper, isCQLTable, isStaticCompactTable, needsUpgrade));
 +        return columns;
 +    }
 +
 +    private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row,
 +                                                              String keyspace,
 +                                                              String table,
 +                                                              AbstractType<?> rawComparator,
 +                                                              AbstractType<?> rawSubComparator,
 +                                                              boolean isSuper,
 +                                                              boolean isCQLTable,
 +                                                              boolean isStaticCompactTable,
 +                                                              boolean needsUpgrade)
 +    {
 +        ColumnDefinition.Kind kind = deserializeKind(row.getString("type"));
 +        if (needsUpgrade && isStaticCompactTable && kind == ColumnDefinition.Kind.REGULAR)
 +            kind = ColumnDefinition.Kind.STATIC;
 +
 +        Integer componentIndex = null;
 +        if (row.has("component_index"))
 +            componentIndex = row.getInt("component_index");
 +
 +        // Note: we save the column name as string, but we should not assume that it is
an UTF8 name, we
 +        // we need to use the comparator fromString method
 +        AbstractType<?> comparator = isCQLTable
 +                                   ? UTF8Type.instance
 +                                   : CompactTables.columnDefinitionComparator(kind, isSuper,
rawComparator, rawSubComparator);
 +        ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")),
comparator);
 +
 +        AbstractType<?> validator = parseType(row.getString("validator"));
 +
 +        IndexType indexType = null;
 +        if (row.has("index_type"))
 +            indexType = IndexType.valueOf(row.getString("index_type"));
 +
 +        Map<String, String> indexOptions = null;
 +        if (row.has("index_options"))
 +            indexOptions = fromJsonMap(row.getString("index_options"));
 +
 +        String indexName = null;
 +        if (row.has("index_name"))
 +            indexName = row.getString("index_name");
 +
 +        return new ColumnDefinition(keyspace, table, name, validator, indexType, indexOptions,
indexName, componentIndex, kind);
 +    }
 +
 +    private static ColumnDefinition.Kind deserializeKind(String kind)
 +    {
 +        if ("clustering_key".equalsIgnoreCase(kind))
 +            return ColumnDefinition.Kind.CLUSTERING_COLUMN;
 +        if ("compact_value".equalsIgnoreCase(kind))
 +            return ColumnDefinition.Kind.REGULAR;
 +        return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
 +    }
 +
 +    private static Triggers createTriggersFromTriggerRows(UntypedResultSet rows)
 +    {
 +        Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder();
 +        rows.forEach(row -> triggers.add(createTriggerFromTriggerRow(row)));
 +        return triggers.build();
 +    }
 +
 +    private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row)
 +    {
 +        String name = row.getString("trigger_name");
 +        String classOption = row.getMap("trigger_options", UTF8Type.instance, UTF8Type.instance).get("class");
 +        return new TriggerMetadata(name, classOption);
 +    }
 +
 +    /*
 +     * Reading user types
 +     */
 +
 +    private static Collection<Type> readTypes(String keyspaceName)
 +    {
 +        String query = format("SELECT type_name FROM %s.%s WHERE keyspace_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_USERTYPES);
 +        Collection<String> typeNames = new ArrayList<>();
 +        query(query, keyspaceName).forEach(row -> typeNames.add(row.getString("type_name")));
 +
 +        Collection<Type> types = new ArrayList<>();
 +        typeNames.forEach(name -> types.add(readType(keyspaceName, name)));
 +        return types;
 +    }
 +
 +    private static Type readType(String keyspaceName, String typeName)
 +    {
 +        long timestamp = readTypeTimestamp(keyspaceName, typeName);
 +        UserType metadata = readTypeMetadata(keyspaceName, typeName);
 +        return new Type(timestamp, metadata);
 +    }
 +
 +    /*
 +     * Unfortunately there is not a single REGULAR column in system.schema_usertypes, so
annoyingly we cannot
 +     * use the writeTime() CQL function, and must resort to a lower level.
 +     */
 +    private static long readTypeTimestamp(String keyspaceName, String typeName)
 +    {
 +        ColumnFamilyStore store = org.apache.cassandra.db.Keyspace.open(SystemKeyspace.NAME)
 +                                                                  .getColumnFamilyStore(SystemKeyspace.LEGACY_USERTYPES);
 +
 +        ClusteringComparator comparator = store.metadata.comparator;
 +        Slices slices = Slices.with(comparator, Slice.make(comparator, typeName));
 +        int nowInSec = FBUtilities.nowInSeconds();
 +        DecoratedKey key = StorageService.getPartitioner().decorateKey(AsciiType.instance.fromString(keyspaceName));
 +        SinglePartitionReadCommand command = SinglePartitionSliceCommand.create(store.metadata,
nowInSec, key, slices);
 +
 +        try (OpOrder.Group op = store.readOrdering.start();
 +             RowIterator partition = UnfilteredRowIterators.filter(command.queryMemtableAndDisk(store,
op), nowInSec))
 +        {
 +            return partition.next().primaryKeyLivenessInfo().timestamp();
 +        }
 +    }
 +
 +    private static UserType readTypeMetadata(String keyspaceName, String typeName)
 +    {
 +        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND type_name
= ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_USERTYPES);
 +        UntypedResultSet.Row row = query(query, keyspaceName, typeName).one();
 +
 +        List<ByteBuffer> names =
 +            row.getList("field_names", UTF8Type.instance)
 +               .stream()
 +               .map(ByteBufferUtil::bytes)
 +               .collect(Collectors.toList());
 +
 +        List<AbstractType<?>> types =
 +            row.getList("field_types", UTF8Type.instance)
 +               .stream()
 +               .map(LegacySchemaMigrator::parseType)
 +               .collect(Collectors.toList());
 +
 +        return new UserType(keyspaceName, bytes(typeName), names, types);
 +    }
 +
 +    /*
 +     * Reading UDFs
 +     */
 +
 +    private static Collection<Function> readFunctions(String keyspaceName)
 +    {
 +        String query = format("SELECT function_name, signature FROM %s.%s WHERE keyspace_name
= ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_FUNCTIONS);
 +        HashMultimap<String, List<String>> functionSignatures = HashMultimap.create();
 +        query(query, keyspaceName).forEach(row ->
 +        {
 +            functionSignatures.put(row.getString("function_name"), row.getList("signature",
UTF8Type.instance));
 +        });
 +
 +        Collection<Function> functions = new ArrayList<>();
 +        functionSignatures.entries().forEach(pair -> functions.add(readFunction(keyspaceName,
pair.getKey(), pair.getValue())));
 +        return functions;
 +    }
 +
 +    private static Function readFunction(String keyspaceName, String functionName, List<String>
signature)
 +    {
 +        long timestamp = readFunctionTimestamp(keyspaceName, functionName, signature);
 +        UDFunction metadata = readFunctionMetadata(keyspaceName, functionName, signature);
 +        return new Function(timestamp, metadata);
 +    }
 +
 +    private static long readFunctionTimestamp(String keyspaceName, String functionName,
List<String> signature)
 +    {
 +        String query = format("SELECT writeTime(return_type) AS timestamp " +
 +                              "FROM %s.%s " +
 +                              "WHERE keyspace_name = ? AND function_name = ? AND signature
= ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_FUNCTIONS);
 +        return query(query, keyspaceName, functionName, signature).one().getLong("timestamp");
 +    }
 +
 +    private static UDFunction readFunctionMetadata(String keyspaceName, String functionName,
List<String> signature)
 +    {
 +        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND function_name
= ? AND signature = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_FUNCTIONS);
 +        UntypedResultSet.Row row = query(query, keyspaceName, functionName, signature).one();
 +
 +        FunctionName name = new FunctionName(keyspaceName, functionName);
 +
 +        List<ColumnIdentifier> argNames = new ArrayList<>();
 +        if (row.has("argument_names"))
 +            for (String arg : row.getList("argument_names", UTF8Type.instance))
 +                argNames.add(new ColumnIdentifier(arg, true));
 +
 +        List<AbstractType<?>> argTypes = new ArrayList<>();
 +        if (row.has("argument_types"))
 +            for (String type : row.getList("argument_types", UTF8Type.instance))
 +                argTypes.add(parseType(type));
 +
 +        AbstractType<?> returnType = parseType(row.getString("return_type"));
 +
 +        String language = row.getString("language");
 +        String body = row.getString("body");
 +        boolean calledOnNullInput = row.getBoolean("called_on_null_input");
 +
 +        try
 +        {
 +            return UDFunction.create(name, argNames, argTypes, returnType, calledOnNullInput,
language, body);
 +        }
 +        catch (InvalidRequestException e)
 +        {
 +            return UDFunction.createBrokenFunction(name, argNames, argTypes, returnType,
calledOnNullInput, language, body, e);
 +        }
 +    }
 +
 +    /*
 +     * Reading UDAs
 +     */
 +
 +    private static Collection<Aggregate> readAggregates(String keyspaceName)
 +    {
 +        String query = format("SELECT aggregate_name, signature FROM %s.%s WHERE keyspace_name
= ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_AGGREGATES);
 +        HashMultimap<String, List<String>> aggregateSignatures = HashMultimap.create();
 +        query(query, keyspaceName).forEach(row ->
 +        {
 +            aggregateSignatures.put(row.getString("aggregate_name"), row.getList("signature",
UTF8Type.instance));
 +        });
 +
 +        Collection<Aggregate> aggregates = new ArrayList<>();
 +        aggregateSignatures.entries().forEach(pair -> aggregates.add(readAggregate(keyspaceName,
pair.getKey(), pair.getValue())));
 +        return aggregates;
 +    }
 +
 +    private static Aggregate readAggregate(String keyspaceName, String aggregateName, List<String>
signature)
 +    {
 +        long timestamp = readAggregateTimestamp(keyspaceName, aggregateName, signature);
 +        UDAggregate metadata = readAggregateMetadata(keyspaceName, aggregateName, signature);
 +        return new Aggregate(timestamp, metadata);
 +    }
 +
 +    private static long readAggregateTimestamp(String keyspaceName, String aggregateName,
List<String> signature)
 +    {
 +        String query = format("SELECT writeTime(return_type) AS timestamp " +
 +                              "FROM %s.%s " +
 +                              "WHERE keyspace_name = ? AND aggregate_name = ? AND signature
= ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_AGGREGATES);
 +        return query(query, keyspaceName, aggregateName, signature).one().getLong("timestamp");
 +    }
 +
 +    private static UDAggregate readAggregateMetadata(String keyspaceName, String functionName,
List<String> signature)
 +    {
 +        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND function_name
= ? AND signature = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_AGGREGATES);
 +        UntypedResultSet.Row row = query(query, keyspaceName, functionName, signature).one();
 +
 +        FunctionName name = new FunctionName(keyspaceName, functionName);
 +
 +        List<String> types = row.getList("argument_types", UTF8Type.instance);
 +
 +        List<AbstractType<?>> argTypes = new ArrayList<>();
 +        if (types != null)
 +        {
 +            argTypes = new ArrayList<>(types.size());
 +            for (String type : types)
 +                argTypes.add(parseType(type));
 +        }
 +
 +        AbstractType<?> returnType = parseType(row.getString("return_type"));
 +
-         FunctionName stateFunc = parseAggregateFunctionName(keyspaceName, row.getString("state_func"));
-         FunctionName finalFunc = row.has("final_func") ? parseAggregateFunctionName(keyspaceName,
row.getString("final_func")) : null;
++        FunctionName stateFunc = new FunctionName(keyspaceName, row.getString("state_func"));
++        FunctionName finalFunc = row.has("final_func") ? new FunctionName(keyspaceName,
row.getString("final_func")) : null;
 +        AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type"))
: null;
 +        ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
 +
 +        try
 +        {
 +            return UDAggregate.create(name, argTypes, returnType, stateFunc, finalFunc,
stateType, initcond);
 +        }
 +        catch (InvalidRequestException reason)
 +        {
 +            return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason);
 +        }
 +    }
 +
-     private static FunctionName parseAggregateFunctionName(String ksName, String func)
-     {
-         int i = func.indexOf('.');
- 
-         // function name can be abbreviated (pre 2.2rc2) - it is in the same keyspace as
the aggregate
-         if (i == -1)
-             return new FunctionName(ksName, func);
- 
-         String ks = func.substring(0, i);
-         String f = func.substring(i + 1);
- 
-         // only aggregate's function keyspace and system keyspace are allowed
-         assert ks.equals(ksName) || ks.equals(SystemKeyspace.NAME);
- 
-         return new FunctionName(ks, f);
-     }
- 
 +    private static UntypedResultSet query(String query, Object... values)
 +    {
 +        return QueryProcessor.executeOnceInternal(query, values);
 +    }
 +
 +    private static AbstractType<?> parseType(String str)
 +    {
 +        return TypeParser.parse(str);
 +    }
 +
 +    private static final class Keyspace
 +    {
 +        final long timestamp;
 +        final String name;
 +        final KeyspaceParams params;
 +        final Collection<Table> tables;
 +        final Collection<Type> types;
 +        final Collection<Function> functions;
 +        final Collection<Aggregate> aggregates;
 +
 +        Keyspace(long timestamp,
 +                 String name,
 +                 KeyspaceParams params,
 +                 Collection<Table> tables,
 +                 Collection<Type> types,
 +                 Collection<Function> functions,
 +                 Collection<Aggregate> aggregates)
 +        {
 +            this.timestamp = timestamp;
 +            this.name = name;
 +            this.params = params;
 +            this.tables = tables;
 +            this.types = types;
 +            this.functions = functions;
 +            this.aggregates = aggregates;
 +        }
 +    }
 +
 +    private static final class Table
 +    {
 +        final long timestamp;
 +        final CFMetaData metadata;
 +
 +        Table(long timestamp, CFMetaData metadata)
 +        {
 +            this.timestamp = timestamp;
 +            this.metadata = metadata;
 +        }
 +    }
 +
 +    private static final class Type
 +    {
 +        final long timestamp;
 +        final UserType metadata;
 +
 +        Type(long timestamp, UserType metadata)
 +        {
 +            this.timestamp = timestamp;
 +            this.metadata = metadata;
 +        }
 +    }
 +
 +    private static final class Function
 +    {
 +        final long timestamp;
 +        final UDFunction metadata;
 +
 +        Function(long timestamp, UDFunction metadata)
 +        {
 +            this.timestamp = timestamp;
 +            this.metadata = metadata;
 +        }
 +    }
 +
 +    private static final class Aggregate
 +    {
 +        final long timestamp;
 +        final UDAggregate metadata;
 +
 +        Aggregate(long timestamp, UDAggregate metadata)
 +        {
 +            this.timestamp = timestamp;
 +            this.metadata = metadata;
 +        }
 +    }
 +}


Mime
View raw message