Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F06C118053 for ; Tue, 22 Sep 2015 20:13:16 +0000 (UTC) Received: (qmail 79358 invoked by uid 500); 22 Sep 2015 20:13:13 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 79259 invoked by uid 500); 22 Sep 2015 20:13:13 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 78986 invoked by uid 99); 22 Sep 2015 20:13:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Sep 2015 20:13:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27F1AE03CE; Tue, 22 Sep 2015 20:13:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blerer@apache.org To: commits@cassandra.apache.org Date: Tue, 22 Sep 2015 20:13:16 -0000 Message-Id: In-Reply-To: <8ad7bcc28f3a4a1abeb1c86171e9eae1@git.apache.org> References: <8ad7bcc28f3a4a1abeb1c86171e9eae1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/9] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0 http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index c4ef239,87891ae..8d23597 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@@ -409,14 -399,9 +409,14 @@@ public abstract class SSTableReader ext System.exit(1); } - logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); + logger.debug("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); - SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(), - statsMetadata, OpenReason.NORMAL); + SSTableReader sstable = internalOpen(descriptor, + components, + metadata, + System.currentTimeMillis(), + statsMetadata, + OpenReason.NORMAL, + header.toHeader(metadata)); // special implementation of load to use non-pooled SegmentedFile builders try(SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder(); @@@ -465,15 -446,9 +465,15 @@@ System.exit(1); } - logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); + logger.debug("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); - SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(), - statsMetadata, OpenReason.NORMAL); + SSTableReader sstable = internalOpen(descriptor, + components, + metadata, + System.currentTimeMillis(), + statsMetadata, + OpenReason.NORMAL, + header == null ? null : header.toHeader(metadata)); + try { // load index and filter @@@ -1656,10 -1631,10 +1656,10 @@@ * @return true if the this is the first time the file was marked obsolete. Calling this * multiple times is usually buggy (see exceptions in Tracker.unmarkCompacting and removeOldSSTablesSize). */ - public boolean markObsolete(Tracker tracker) + public void markObsolete(Runnable tidier) { - if (logger.isDebugEnabled()) - logger.debug("Marking {} compacted", getFilename()); + if (logger.isTraceEnabled()) + logger.trace("Marking {} compacted", getFilename()); synchronized (tidy.global) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index 9197b7a,30ed85b..635adcd --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@@ -76,10 -75,10 +76,10 @@@ public class MetadataSerializer impleme } } - public Map deserialize(Descriptor descriptor, EnumSet types) throws IOException + public Map deserialize( Descriptor descriptor, EnumSet types) throws IOException { Map components; - logger.debug("Load metadata for {}", descriptor); + logger.trace("Load metadata for {}", descriptor); File statsFile = new File(descriptor.filenameFor(Component.STATS)); if (!statsFile.exists()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 8e8ce15,f6652b0..7054bcc --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@@ -149,10 -148,10 +149,10 @@@ public class IncomingTcpConnection exte if (compressed) { - logger.debug("Upgrading incoming connection to be compressed"); + logger.trace("Upgrading incoming connection to be compressed"); if (version < MessagingService.VERSION_21) { - in = new DataInputStream(new SnappyInputStream(socket.getInputStream())); + in = new DataInputStreamPlus(new SnappyInputStream(socket.getInputStream())); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessagingService.java index 4fb67ec,2a63553..810d086 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -874,24 -847,17 +874,24 @@@ public final class MessagingService imp public void resetVersion(InetAddress endpoint) { - logger.debug("Resetting version for {}", endpoint); + logger.trace("Resetting version for {}", endpoint); Integer removed = versions.remove(endpoint); - if (removed != null && removed <= VERSION_22) - refreshAllNodesAtLeast22(); + if (removed != null && removed <= VERSION_30) + refreshAllNodeMinVersions(); } - private void refreshAllNodesAtLeast22() + private void refreshAllNodeMinVersions() { - for (Integer version: versions.values()) + boolean anyNodeLowerThan30 = false; + for (Integer version : versions.values()) { - if (version < VERSION_22) + if (version < MessagingService.VERSION_30) + { + anyNodeLowerThan30 = true; + allNodesAtLeast30 = false; + } + + if (version < MessagingService.VERSION_22) { allNodesAtLeast22 = false; return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 5f27d82,0000000..bc9da31 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@@ -1,1708 -1,0 +1,1708 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.*; +import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.view.View; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.OpOrder; + +import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; + +/** + * system_schema.* tables and methods for manipulating them. + */ +public final class SchemaKeyspace +{ + private SchemaKeyspace() + { + } + + private static final Logger logger = LoggerFactory.getLogger(SchemaKeyspace.class); + + public static final String NAME = "system_schema"; + + public static final String KEYSPACES = "keyspaces"; + public static final String TABLES = "tables"; + public static final String COLUMNS = "columns"; + public static final String DROPPED_COLUMNS = "dropped_columns"; + public static final String TRIGGERS = "triggers"; + public static final String VIEWS = "views"; + public static final String TYPES = "types"; + public static final String FUNCTIONS = "functions"; + public static final String AGGREGATES = "aggregates"; + public static final String INDEXES = "indexes"; + + + public static final List ALL = + ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES); + + private static final CFMetaData Keyspaces = + compile(KEYSPACES, + "keyspace definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "durable_writes boolean," + + "replication frozen>," + + "PRIMARY KEY ((keyspace_name)))"); + + private static final CFMetaData Tables = + compile(TABLES, + "table definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "bloom_filter_fp_chance double," + + "caching frozen>," + + "comment text," + + "compaction frozen>," + + "compression frozen>," + + "crc_check_chance double," + + "dclocal_read_repair_chance double," + + "default_time_to_live int," + + "extensions frozen>," + + "flags frozen>," // SUPER, COUNTER, DENSE, COMPOUND + + "gc_grace_seconds int," + + "id uuid," + + "max_index_interval int," + + "memtable_flush_period_in_ms int," + + "min_index_interval int," + + "read_repair_chance double," + + "speculative_retry text," + + "PRIMARY KEY ((keyspace_name), table_name))"); + + private static final CFMetaData Columns = + compile(COLUMNS, + "column definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "column_name text," + + "clustering_order text," + + "column_name_bytes blob," + + "kind text," + + "position int," + + "type text," + + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); + + private static final CFMetaData DroppedColumns = + compile(DROPPED_COLUMNS, + "dropped column registry", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "column_name text," + + "dropped_time timestamp," + + "type text," + + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); + + private static final CFMetaData Triggers = + compile(TRIGGERS, + "trigger definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "trigger_name text," + + "options frozen>," + + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))"); + + private static final CFMetaData Views = + compile(VIEWS, + "view definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "view_name text," + + "base_table_id uuid," + + "base_table_name text," + + "where_clause text," + + "bloom_filter_fp_chance double," + + "caching frozen>," + + "comment text," + + "compaction frozen>," + + "compression frozen>," + + "crc_check_chance double," + + "dclocal_read_repair_chance double," + + "default_time_to_live int," + + "extensions frozen>," + + "gc_grace_seconds int," + + "id uuid," + + "include_all_columns boolean," + + "max_index_interval int," + + "memtable_flush_period_in_ms int," + + "min_index_interval int," + + "read_repair_chance double," + + "speculative_retry text," + + "PRIMARY KEY ((keyspace_name), view_name))"); + + private static final CFMetaData Indexes = + compile(INDEXES, + "secondary index definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "index_name text," + + "kind text," + + "options frozen>," + + "PRIMARY KEY ((keyspace_name), table_name, index_name))"); + + private static final CFMetaData Types = + compile(TYPES, + "user defined type definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "type_name text," + + "field_names frozen>," + + "field_types frozen>," + + "PRIMARY KEY ((keyspace_name), type_name))"); + + private static final CFMetaData Functions = + compile(FUNCTIONS, + "user defined function definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "function_name text," + + "signature frozen>," + + "argument_names frozen>," + + "argument_types frozen>," + + "body text," + + "language text," + + "return_type text," + + "called_on_null_input boolean," + + "PRIMARY KEY ((keyspace_name), function_name, signature))"); + + private static final CFMetaData Aggregates = + compile(AGGREGATES, + "user defined aggregate definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "aggregate_name text," + + "signature frozen>," + + "argument_types frozen>," + + "final_func text," + + "initcond blob," + + "return_type text," + + "state_func text," + + "state_type text," + + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))"); + + public static final List ALL_TABLE_METADATA = + ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, Views, Types, Functions, Aggregates, Indexes); + + private static CFMetaData compile(String name, String description, String schema) + { + return CFMetaData.compile(String.format(schema, name), NAME) + .comment(description) + .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7)); + } + + public static KeyspaceMetadata metadata() + { + return KeyspaceMetadata.create(NAME, KeyspaceParams.local(), org.apache.cassandra.schema.Tables.of(ALL_TABLE_METADATA)); + } + + /** + * Add entries to system_schema.* for the hardcoded system keyspaces + */ + public static void saveSystemKeyspacesSchema() + { + KeyspaceMetadata system = Schema.instance.getKSMetaData(SystemKeyspace.NAME); + KeyspaceMetadata schema = Schema.instance.getKSMetaData(NAME); + + long timestamp = FBUtilities.timestampMicros(); + + // delete old, possibly obsolete entries in schema tables + for (String schemaTable : ALL) + { + String query = String.format("DELETE FROM %s.%s USING TIMESTAMP ? WHERE keyspace_name = ?", NAME, schemaTable); + for (String systemKeyspace : Schema.SYSTEM_KEYSPACE_NAMES) + executeOnceInternal(query, timestamp, systemKeyspace); + } + + // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added) + makeCreateKeyspaceMutation(system, timestamp + 1).apply(); + makeCreateKeyspaceMutation(schema, timestamp + 1).apply(); + } + + public static List readSchemaFromSystemTables() + { + ReadCommand cmd = getReadCommandForTableSchema(KEYSPACES); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator schema = cmd.executeInternal(orderGroup)) + { + List keyspaces = new ArrayList<>(); + + while (schema.hasNext()) + { + try (RowIterator partition = schema.next()) + { + if (isSystemKeyspaceSchemaPartition(partition.partitionKey())) + continue; + + DecoratedKey key = partition.partitionKey(); + + readSchemaPartitionForKeyspaceAndApply(TYPES, key, + types -> readSchemaPartitionForKeyspaceAndApply(TABLES, key, + tables -> readSchemaPartitionForKeyspaceAndApply(VIEWS, key, + views -> readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key, + functions -> readSchemaPartitionForKeyspaceAndApply(AGGREGATES, key, + aggregates -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, views, types, functions, aggregates)))))) + ); + } + } + return keyspaces; + } + } + + public static void truncate() + { + ALL.forEach(table -> getSchemaCFS(table).truncateBlocking()); + } + + static void flush() + { + if (!Boolean.getBoolean("cassandra.unsafesystem")) + ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush())); + } + + /** + * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest + * will be converted into UUID which would act as content-based version of the schema. + */ + public static UUID calculateSchemaDigest() + { + MessageDigest digest; + try + { + digest = MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) + { + throw new RuntimeException(e); + } + + for (String table : ALL) + { + ReadCommand cmd = getReadCommandForTableSchema(table); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); + PartitionIterator schema = cmd.executeInternal(orderGroup)) + { + while (schema.hasNext()) + { + try (RowIterator partition = schema.next()) + { + if (!isSystemKeyspaceSchemaPartition(partition.partitionKey())) + RowIterators.digest(partition, digest); + } + } + } + } + return UUID.nameUUIDFromBytes(digest.digest()); + } + + /** + * @param schemaTableName The name of the table responsible for part of the schema + * @return CFS responsible to hold low-level serialized schema + */ + private static ColumnFamilyStore getSchemaCFS(String schemaTableName) + { + return Keyspace.open(NAME).getColumnFamilyStore(schemaTableName); + } + + /** + * @param schemaTableName The name of the table responsible for part of the schema. + * @return low-level schema representation + */ + private static ReadCommand getReadCommandForTableSchema(String schemaTableName) + { + ColumnFamilyStore cfs = getSchemaCFS(schemaTableName); + return PartitionRangeReadCommand.allDataRead(cfs.metadata, FBUtilities.nowInSeconds()); + } + + public static Collection convertSchemaToMutations() + { + Map mutationMap = new HashMap<>(); + + for (String table : ALL) + convertSchemaToMutations(mutationMap, table); + + return mutationMap.values(); + } + + private static void convertSchemaToMutations(Map mutationMap, String schemaTableName) + { + ReadCommand cmd = getReadCommandForTableSchema(schemaTableName); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) + { + while (iter.hasNext()) + { + try (UnfilteredRowIterator partition = iter.next()) + { + if (isSystemKeyspaceSchemaPartition(partition.partitionKey())) + continue; + + DecoratedKey key = partition.partitionKey(); + Mutation mutation = mutationMap.get(key); + if (mutation == null) + { + mutation = new Mutation(NAME, key); + mutationMap.put(key, mutation); + } + + mutation.add(PartitionUpdate.fromIterator(partition)); + } + } + } + } + + private static Map readSchemaForKeyspaces(String schemaTableName, Set keyspaceNames) + { + Map schema = new HashMap<>(); + + for (String keyspaceName : keyspaceNames) + { + // We don't to return the RowIterator directly because we should guarantee that this iterator + // will be closed, and putting it in a Map make that harder/more awkward. + readSchemaPartitionForKeyspaceAndApply(schemaTableName, keyspaceName, + partition -> { + if (!partition.isEmpty()) + schema.put(partition.partitionKey(), FilteredPartition.create(partition)); + return null; + } + ); + } + + return schema; + } + + private static ByteBuffer getSchemaKSKey(String ksName) + { + return AsciiType.instance.fromString(ksName); + } + + private static T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, String keyspaceName, Function fct) + { + return readSchemaPartitionForKeyspaceAndApply(schemaTableName, getSchemaKSKey(keyspaceName), fct); + } + + private static T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, ByteBuffer keyspaceKey, Function fct) + { + ColumnFamilyStore store = getSchemaCFS(schemaTableName); + return readSchemaPartitionForKeyspaceAndApply(store, store.decorateKey(keyspaceKey), fct); + } + + private static T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, DecoratedKey keyspaceKey, Function fct) + { + return readSchemaPartitionForKeyspaceAndApply(getSchemaCFS(schemaTableName), keyspaceKey, fct); + } + + private static T readSchemaPartitionForKeyspaceAndApply(ColumnFamilyStore store, DecoratedKey keyspaceKey, Function fct) + { + int nowInSec = FBUtilities.nowInSeconds(); + try (OpOrder.Group op = store.readOrdering.start(); + RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(store.metadata, nowInSec, keyspaceKey) + .queryMemtableAndDisk(store, op), nowInSec)) + { + return fct.apply(partition); + } + } + + private static T readSchemaPartitionForTableAndApply(String schemaTableName, String keyspaceName, String tableName, Function fct) + { + ColumnFamilyStore store = getSchemaCFS(schemaTableName); + + ClusteringComparator comparator = store.metadata.comparator; + Slices slices = Slices.with(comparator, Slice.make(comparator, tableName)); + int nowInSec = FBUtilities.nowInSeconds(); + try (OpOrder.Group op = store.readOrdering.start(); + RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionSliceCommand.create(store.metadata, nowInSec, getSchemaKSKey(keyspaceName), slices) + .queryMemtableAndDisk(store, op), nowInSec)) + { + return fct.apply(partition); + } + } + + private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey) + { + return Schema.isSystemKeyspace(UTF8Type.instance.compose(partitionKey.getKey())); + } + + /** + * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects + * (which also involves fs operations on add/drop ks/cf) + * + * @param mutations the schema changes to apply + * + * @throws ConfigurationException If one of metadata attributes has invalid value + * @throws IOException If data was corrupted during transportation or failed to apply fs operations + */ + public static synchronized void mergeSchema(Collection mutations) throws ConfigurationException, IOException + { + mergeSchema(mutations, true); + Schema.instance.updateVersionAndAnnounce(); + } + + public static synchronized void mergeSchema(Collection mutations, boolean doFlush) throws IOException + { + // compare before/after schemas of the affected keyspaces only + Set keyspaces = new HashSet<>(mutations.size()); + for (Mutation mutation : mutations) + keyspaces.add(ByteBufferUtil.string(mutation.key().getKey())); + + // current state of the schema + Map oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); + Map oldColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces); + Map oldViews = readSchemaForKeyspaces(VIEWS, keyspaces); + Map oldTypes = readSchemaForKeyspaces(TYPES, keyspaces); + Map oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); + Map oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); + + mutations.forEach(Mutation::apply); + + if (doFlush) + flush(); + + // with new data applied + Map newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); + Map newColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces); + Map newViews = readSchemaForKeyspaces(VIEWS, keyspaces); + Map newTypes = readSchemaForKeyspaces(TYPES, keyspaces); + Map newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); + Map newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); + + Set keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces); + mergeTables(oldColumnFamilies, newColumnFamilies); + mergeViews(oldViews, newViews); + mergeTypes(oldTypes, newTypes); + mergeFunctions(oldFunctions, newFunctions); + mergeAggregates(oldAggregates, newAggregates); + + // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted + keyspacesToDrop.forEach(Schema.instance::dropKeyspace); + } + + private static Set mergeKeyspaces(Map before, Map after) + { + for (FilteredPartition newPartition : after.values()) + { + String name = AsciiType.instance.compose(newPartition.partitionKey().getKey()); + KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(newPartition.rowIterator()); + + FilteredPartition oldPartition = before.remove(newPartition.partitionKey()); + if (oldPartition == null || oldPartition.isEmpty()) + Schema.instance.addKeyspace(KeyspaceMetadata.create(name, params)); + else + Schema.instance.updateKeyspace(name, params); + } + + // What's remain in old is those keyspace that are not in updated, i.e. the dropped ones. + return asKeyspaceNamesSet(before.keySet()); + } + + private static Set asKeyspaceNamesSet(Set keys) + { + Set names = new HashSet<>(keys.size()); + for (DecoratedKey key : keys) + names.add(AsciiType.instance.compose(key.getKey())); + return names; + } + + private static void mergeTables(Map before, Map after) + { + diffSchema(before, after, new Differ() + { + public void onDropped(UntypedResultSet.Row oldRow) + { + Schema.instance.dropTable(oldRow.getString("keyspace_name"), oldRow.getString("table_name")); + } + + public void onAdded(UntypedResultSet.Row newRow) + { + Schema.instance.addTable(createTableFromTableRow(newRow)); + } + + public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) + { + Schema.instance.updateTable(newRow.getString("keyspace_name"), newRow.getString("table_name")); + } + }); + } + + private static void mergeViews(Map before, Map after) + { + diffSchema(before, after, new Differ() + { + public void onDropped(UntypedResultSet.Row oldRow) + { + Schema.instance.dropView(oldRow.getString("keyspace_name"), oldRow.getString("view_name")); + } + + public void onAdded(UntypedResultSet.Row newRow) + { + Schema.instance.addView(createViewFromViewRow(newRow)); + } + + public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) + { + Schema.instance.updateView(newRow.getString("keyspace_name"), newRow.getString("view_name")); + } + }); + } + + private static void mergeTypes(Map before, Map after) + { + diffSchema(before, after, new Differ() + { + public void onDropped(UntypedResultSet.Row oldRow) + { + Schema.instance.dropType(createTypeFromRow(oldRow)); + } + + public void onAdded(UntypedResultSet.Row newRow) + { + Schema.instance.addType(createTypeFromRow(newRow)); + } + + public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) + { + Schema.instance.updateType(createTypeFromRow(newRow)); + } + }); + } + + private static void mergeFunctions(Map before, Map after) + { + diffSchema(before, after, new Differ() + { + public void onDropped(UntypedResultSet.Row oldRow) + { + Schema.instance.dropFunction(createFunctionFromFunctionRow(oldRow)); + } + + public void onAdded(UntypedResultSet.Row newRow) + { + Schema.instance.addFunction(createFunctionFromFunctionRow(newRow)); + } + + public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) + { + Schema.instance.updateFunction(createFunctionFromFunctionRow(newRow)); + } + }); + } + + private static void mergeAggregates(Map before, Map after) + { + diffSchema(before, after, new Differ() + { + public void onDropped(UntypedResultSet.Row oldRow) + { + Schema.instance.dropAggregate(createAggregateFromAggregateRow(oldRow)); + } + + public void onAdded(UntypedResultSet.Row newRow) + { + Schema.instance.addAggregate(createAggregateFromAggregateRow(newRow)); + } + + public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) + { + Schema.instance.updateAggregate(createAggregateFromAggregateRow(newRow)); + } + }); + } + + public interface Differ + { + void onDropped(UntypedResultSet.Row oldRow); + void onAdded(UntypedResultSet.Row newRow); + void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow); + } + + private static void diffSchema(Map before, Map after, Differ differ) + { + for (FilteredPartition newPartition : after.values()) + { + CFMetaData metadata = newPartition.metadata(); + DecoratedKey key = newPartition.partitionKey(); + + FilteredPartition oldPartition = before.remove(key); + + if (oldPartition == null || oldPartition.isEmpty()) + { + // Means everything is to be added + for (Row row : newPartition) + differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, row)); + continue; + } + + Iterator oldIter = oldPartition.iterator(); + Iterator newIter = newPartition.iterator(); + + Row oldRow = oldIter.hasNext() ? oldIter.next() : null; + Row newRow = newIter.hasNext() ? newIter.next() : null; + while (oldRow != null && newRow != null) + { + int cmp = metadata.comparator.compare(oldRow.clustering(), newRow.clustering()); + if (cmp < 0) + { + differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow)); + oldRow = oldIter.hasNext() ? oldIter.next() : null; + } + else if (cmp > 0) + { + + differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow)); + newRow = newIter.hasNext() ? newIter.next() : null; + } + else + { + if (!oldRow.equals(newRow)) + differ.onUpdated(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow), UntypedResultSet.Row.fromInternalRow(metadata, key, newRow)); + + oldRow = oldIter.hasNext() ? oldIter.next() : null; + newRow = newIter.hasNext() ? newIter.next() : null; + } + } + + while (oldRow != null) + { + differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow)); + oldRow = oldIter.hasNext() ? oldIter.next() : null; + } + while (newRow != null) + { + differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow)); + newRow = newIter.hasNext() ? newIter.next() : null; + } + } + + // What remains is those keys that were only in before. + for (FilteredPartition partition : before.values()) + for (Row row : partition) + differ.onDropped(UntypedResultSet.Row.fromInternalRow(partition.metadata(), partition.partitionKey(), row)); + } + + /* + * Keyspace metadata serialization/deserialization. + */ + + public static Mutation makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp) + { + RowUpdateBuilder adder = new RowUpdateBuilder(Keyspaces, timestamp, name).clustering(); + return adder.add(KeyspaceParams.Option.DURABLE_WRITES.toString(), params.durableWrites) + .frozenMap(KeyspaceParams.Option.REPLICATION.toString(), params.replication.asMap()) + .build(); + } + + public static Mutation makeCreateKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp) + { + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + + keyspace.tables.forEach(table -> addTableToSchemaMutation(table, timestamp, true, mutation)); + keyspace.views.forEach(view -> addViewToSchemaMutation(view, timestamp, true, mutation)); + keyspace.types.forEach(type -> addTypeToSchemaMutation(type, timestamp, mutation)); + keyspace.functions.udfs().forEach(udf -> addFunctionToSchemaMutation(udf, timestamp, mutation)); + keyspace.functions.udas().forEach(uda -> addAggregateToSchemaMutation(uda, timestamp, mutation)); + + return mutation; + } + + public static Mutation makeDropKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp) + { + int nowInSec = FBUtilities.nowInSeconds(); + Mutation mutation = new Mutation(NAME, Keyspaces.decorateKey(getSchemaKSKey(keyspace.name))); + + for (CFMetaData schemaTable : ALL_TABLE_METADATA) + mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec)); + + return mutation; + } + + private static KeyspaceMetadata createKeyspaceFromSchemaPartitions(RowIterator serializedParams, + RowIterator serializedTables, + RowIterator serializedViews, + RowIterator serializedTypes, + RowIterator serializedFunctions, + RowIterator serializedAggregates) + { + String name = AsciiType.instance.compose(serializedParams.partitionKey().getKey()); + + KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(serializedParams); + Tables tables = createTablesFromTablesPartition(serializedTables); + Views views = createViewsFromViewsPartition(serializedViews); + Types types = createTypesFromPartition(serializedTypes); + + Collection udfs = createFunctionsFromFunctionsPartition(serializedFunctions); + Collection udas = createAggregatesFromAggregatesPartition(serializedAggregates); + Functions functions = org.apache.cassandra.schema.Functions.builder().add(udfs).add(udas).build(); + + return KeyspaceMetadata.create(name, params, tables, views, types, functions); + } + + /** + * Deserialize only Keyspace attributes without nested tables or types + * + * @param partition Keyspace attributes in serialized form + */ + + private static KeyspaceParams createKeyspaceParamsFromSchemaPartition(RowIterator partition) + { + String query = String.format("SELECT * FROM %s.%s", NAME, KEYSPACES); + UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one(); + + return KeyspaceParams.create(row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString()), + row.getFrozenTextMap(KeyspaceParams.Option.REPLICATION.toString())); + } + + /* + * User type metadata serialization/deserialization. + */ + + public static Mutation makeCreateTypeMutation(KeyspaceMetadata keyspace, UserType type, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addTypeToSchemaMutation(type, timestamp, mutation); + return mutation; + } + + static void addTypeToSchemaMutation(UserType type, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = new RowUpdateBuilder(Types, timestamp, mutation) + .clustering(type.getNameAsString()) + .frozenList("field_names", type.fieldNames().stream().map(SchemaKeyspace::bbToString).collect(Collectors.toList())) + .frozenList("field_types", type.fieldTypes().stream().map(AbstractType::toString).collect(Collectors.toList())); + + adder.build(); + } + + private static String bbToString(ByteBuffer bb) + { + try + { + return ByteBufferUtil.string(bb); + } + catch (CharacterCodingException e) + { + throw new RuntimeException(e); + } + } + + public static Mutation dropTypeFromSchemaMutation(KeyspaceMetadata keyspace, UserType type, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + return RowUpdateBuilder.deleteRow(Types, timestamp, mutation, type.name); + } + + private static Types createTypesFromPartition(RowIterator partition) + { + String query = String.format("SELECT * FROM %s.%s", NAME, TYPES); + Types.Builder types = org.apache.cassandra.schema.Types.builder(); + QueryProcessor.resultify(query, partition).forEach(row -> types.add(createTypeFromRow(row))); + return types.build(); + } + + private static UserType createTypeFromRow(UntypedResultSet.Row row) + { + String keyspace = row.getString("keyspace_name"); + ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name")); + List rawColumns = row.getFrozenList("field_names", UTF8Type.instance); + List rawTypes = row.getFrozenList("field_types", UTF8Type.instance); + + List columns = new ArrayList<>(rawColumns.size()); + for (String rawColumn : rawColumns) + columns.add(ByteBufferUtil.bytes(rawColumn)); + + List> types = new ArrayList<>(rawTypes.size()); + for (String rawType : rawTypes) + types.add(parseType(rawType)); + + return new UserType(keyspace, name, columns, types); + } + + /* + * Table metadata serialization/deserialization. + */ + + public static Mutation makeCreateTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addTableToSchemaMutation(table, timestamp, true, mutation); + return mutation; + } + + static void addTableToSchemaMutation(CFMetaData table, long timestamp, boolean withColumnsAndTriggers, Mutation mutation) + { + RowUpdateBuilder adder = new RowUpdateBuilder(Tables, timestamp, mutation).clustering(table.cfName); + + addTableParamsToSchemaMutation(table.params, adder); + + adder.add("id", table.cfId) + .frozenSet("flags", CFMetaData.flagsToStrings(table.flags())) + .build(); + + if (withColumnsAndTriggers) + { + for (ColumnDefinition column : table.allColumns()) + addColumnToSchemaMutation(table, column, timestamp, mutation); + + for (CFMetaData.DroppedColumn column : table.getDroppedColumns().values()) + addDroppedColumnToSchemaMutation(table, column, timestamp, mutation); + + for (TriggerMetadata trigger : table.getTriggers()) + addTriggerToSchemaMutation(table, trigger, timestamp, mutation); + + for (IndexMetadata index : table.getIndexes()) + addIndexToSchemaMutation(table, index, timestamp, mutation); + } + } + + private static void addTableParamsToSchemaMutation(TableParams params, RowUpdateBuilder adder) + { + adder.add("bloom_filter_fp_chance", params.bloomFilterFpChance) + .add("comment", params.comment) + .add("dclocal_read_repair_chance", params.dcLocalReadRepairChance) + .add("default_time_to_live", params.defaultTimeToLive) + .add("gc_grace_seconds", params.gcGraceSeconds) + .add("max_index_interval", params.maxIndexInterval) + .add("memtable_flush_period_in_ms", params.memtableFlushPeriodInMs) + .add("min_index_interval", params.minIndexInterval) + .add("read_repair_chance", params.readRepairChance) + .add("speculative_retry", params.speculativeRetry.toString()) + .add("crc_check_chance", params.crcCheckChance) + .frozenMap("caching", params.caching.asMap()) + .frozenMap("compaction", params.compaction.asMap()) + .frozenMap("compression", params.compression.asMap()) + .frozenMap("extensions", params.extensions); + } + + public static Mutation makeUpdateTableMutation(KeyspaceMetadata keyspace, + CFMetaData oldTable, + CFMetaData newTable, + long timestamp, + boolean fromThrift) + { + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + + addTableToSchemaMutation(newTable, timestamp, false, mutation); + + MapDifference columnDiff = Maps.difference(oldTable.getColumnMetadata(), + newTable.getColumnMetadata()); + + // columns that are no longer needed + for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values()) + { + // Thrift only knows about the REGULAR ColumnDefinition type, so don't consider other type + // are being deleted just because they are not here. + if (!fromThrift || + column.kind == ColumnDefinition.Kind.REGULAR || + (newTable.isStaticCompactTable() && column.kind == ColumnDefinition.Kind.STATIC)) + { + dropColumnFromSchemaMutation(oldTable, column, timestamp, mutation); + } + } + + // newly added columns + for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values()) + addColumnToSchemaMutation(newTable, column, timestamp, mutation); + + // old columns with updated attributes + for (ByteBuffer name : columnDiff.entriesDiffering().keySet()) + addColumnToSchemaMutation(newTable, newTable.getColumnDefinition(name), timestamp, mutation); + + // dropped columns + MapDifference droppedColumnDiff = + Maps.difference(oldTable.getDroppedColumns(), newTable.getDroppedColumns()); + + // newly dropped columns + for (CFMetaData.DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values()) + addDroppedColumnToSchemaMutation(newTable, column, timestamp, mutation); + + // columns added then dropped again + for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet()) + addDroppedColumnToSchemaMutation(newTable, newTable.getDroppedColumns().get(name), timestamp, mutation); + + MapDifference triggerDiff = triggersDiff(oldTable.getTriggers(), newTable.getTriggers()); + + // dropped triggers + for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnLeft().values()) + dropTriggerFromSchemaMutation(oldTable, trigger, timestamp, mutation); + + // newly created triggers + for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnRight().values()) + addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation); + + MapDifference indexesDiff = indexesDiff(oldTable.getIndexes(), + newTable.getIndexes()); + + // dropped indexes + for (IndexMetadata index : indexesDiff.entriesOnlyOnLeft().values()) + dropIndexFromSchemaMutation(oldTable, index, timestamp, mutation); + + // newly created indexes + for (IndexMetadata index : indexesDiff.entriesOnlyOnRight().values()) + addIndexToSchemaMutation(newTable, index, timestamp, mutation); + + // updated indexes need to be updated + for (MapDifference.ValueDifference diff : indexesDiff.entriesDiffering().values()) + { + addUpdatedIndexToSchemaMutation(newTable, diff.rightValue(), timestamp, mutation); + } + + return mutation; + } + + private static MapDifference indexesDiff(Indexes before, Indexes after) + { + Map beforeMap = new HashMap<>(); + before.forEach(i -> beforeMap.put(i.name, i)); + + Map afterMap = new HashMap<>(); + after.forEach(i -> afterMap.put(i.name, i)); + + return Maps.difference(beforeMap, afterMap); + } + + private static MapDifference triggersDiff(Triggers before, Triggers after) + { + Map beforeMap = new HashMap<>(); + before.forEach(t -> beforeMap.put(t.name, t)); + + Map afterMap = new HashMap<>(); + after.forEach(t -> afterMap.put(t.name, t)); + + return Maps.difference(beforeMap, afterMap); + } + + public static Mutation makeDropTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + + RowUpdateBuilder.deleteRow(Tables, timestamp, mutation, table.cfName); + + for (ColumnDefinition column : table.allColumns()) + dropColumnFromSchemaMutation(table, column, timestamp, mutation); + + for (TriggerMetadata trigger : table.getTriggers()) + dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation); + + for (IndexMetadata index : table.getIndexes()) + dropIndexFromSchemaMutation(table, index, timestamp, mutation); + + return mutation; + } + + public static CFMetaData createTableFromName(String keyspace, String table) + { + return readSchemaPartitionForTableAndApply(TABLES, keyspace, table, partition -> + { + if (partition.isEmpty()) + throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table)); + + return createTableFromTablePartition(partition); + }); + } + + /** + * Deserialize tables from low-level schema representation, all of them belong to the same keyspace + */ + private static Tables createTablesFromTablesPartition(RowIterator partition) + { + String query = String.format("SELECT * FROM %s.%s", NAME, TABLES); + Tables.Builder tables = org.apache.cassandra.schema.Tables.builder(); + QueryProcessor.resultify(query, partition).forEach(row -> tables.add(createTableFromTableRow(row))); + return tables.build(); + } + + private static List createColumnsFromColumnsPartition(RowIterator serializedColumns) + { + String query = String.format("SELECT * FROM %s.%s", NAME, COLUMNS); + return createColumnsFromColumnRows(QueryProcessor.resultify(query, serializedColumns)); + } + + private static CFMetaData createTableFromTablePartition(RowIterator partition) + { + String query = String.format("SELECT * FROM %s.%s", NAME, TABLES); + return createTableFromTableRow(QueryProcessor.resultify(query, partition).one()); + } + + public static CFMetaData createTableFromTablePartitionAndColumnsPartition(RowIterator tablePartition, + RowIterator columnsPartition) + { + List columns = createColumnsFromColumnsPartition(columnsPartition); + String query = String.format("SELECT * FROM %s.%s", NAME, TABLES); + return createTableFromTableRowAndColumns(QueryProcessor.resultify(query, tablePartition).one(), columns); + } + + /** + * Deserialize table metadata from low-level representation + * + * @return Metadata deserialized from schema + */ + private static CFMetaData createTableFromTableRow(UntypedResultSet.Row row) + { + String keyspace = row.getString("keyspace_name"); + String table = row.getString("table_name"); + + List columns = + readSchemaPartitionForTableAndApply(COLUMNS, keyspace, table, SchemaKeyspace::createColumnsFromColumnsPartition); + + Map droppedColumns = + readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, table, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition); + + Triggers triggers = + readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition); + + CFMetaData cfm = createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns) + .triggers(triggers); + + // the CFMetaData itself is required to build the collection of indexes as + // the column definitions are needed because we store only the name each + // index's target columns and this is not enough to reconstruct a ColumnIdentifier + org.apache.cassandra.schema.Indexes indexes = + readSchemaPartitionForTableAndApply(INDEXES, keyspace, table, rowIterator -> createIndexesFromIndexesPartition(cfm, rowIterator)); + cfm.indexes(indexes); + + return cfm; + } + + public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row row, List columns) + { + String keyspace = row.getString("keyspace_name"); + String table = row.getString("table_name"); + UUID id = row.getUUID("id"); + + Set flags = row.has("flags") + ? CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance)) + : Collections.emptySet(); + + boolean isSuper = flags.contains(CFMetaData.Flag.SUPER); + boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER); + boolean isDense = flags.contains(CFMetaData.Flag.DENSE); + boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND); + + return CFMetaData.create(keyspace, + table, + id, + isDense, + isCompound, + isSuper, + isCounter, + false, + columns, + DatabaseDescriptor.getPartitioner()) + .params(createTableParamsFromRow(row)); + } + + private static TableParams createTableParamsFromRow(UntypedResultSet.Row row) + { + TableParams.Builder builder = TableParams.builder(); + + builder.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance")) + .caching(CachingParams.fromMap(row.getFrozenTextMap("caching"))) + .comment(row.getString("comment")) + .compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction"))) + .compression(CompressionParams.fromMap(row.getFrozenTextMap("compression"))) + .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance")) + .defaultTimeToLive(row.getInt("default_time_to_live")) + .gcGraceSeconds(row.getInt("gc_grace_seconds")) + .maxIndexInterval(row.getInt("max_index_interval")) + .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms")) + .minIndexInterval(row.getInt("min_index_interval")) + .readRepairChance(row.getDouble("read_repair_chance")) + .crcCheckChance(row.getDouble("crc_check_chance")) + .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))); + + if (row.has("extensions")) + builder.extensions(row.getFrozenMap("extensions", UTF8Type.instance, BytesType.instance)); + + return builder.build(); + } + + /* + * Column metadata serialization/deserialization. + */ + + private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation).clustering(table.cfName, column.name.toString()); + + AbstractType type = column.type; + if (type instanceof ReversedType) + type = ((ReversedType) type).baseType; + + adder.add("column_name_bytes", column.name.bytes) + .add("kind", column.kind.toString().toLowerCase()) + .add("position", column.isOnAllComponents() ? ColumnDefinition.NO_POSITION : column.position()) + .add("clustering_order", column.clusteringOrder().toString().toLowerCase()) + .add("type", type.toString()) + .build(); + } + + private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) + { + // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference). + RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString()); + } + + private static List createColumnsFromColumnRows(UntypedResultSet rows) +{ + List columns = new ArrayList<>(rows.size()); + rows.forEach(row -> columns.add(createColumnFromColumnRow(row))); + return columns; + } + + private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row) + { + String keyspace = row.getString("keyspace_name"); + String table = row.getString("table_name"); + + ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name")); + + ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase()); + + int position = row.getInt("position"); + ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase()); + + AbstractType type = parseType(row.getString("type")); + if (order == ClusteringOrder.DESC) + type = ReversedType.getInstance(type); + + return new ColumnDefinition(keyspace, table, name, type, position, kind); + } + + /* + * Dropped column metadata serialization/deserialization. + */ + + private static void addDroppedColumnToSchemaMutation(CFMetaData table, CFMetaData.DroppedColumn column, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = new RowUpdateBuilder(DroppedColumns, timestamp, mutation).clustering(table.cfName, column.name); + + adder.add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime))) + .add("type", column.type.toString()) + .build(); + } + + private static Map createDroppedColumnsFromDroppedColumnsPartition(RowIterator serializedColumns) + { + String query = String.format("SELECT * FROM %s.%s", NAME, DROPPED_COLUMNS); + Map columns = new HashMap<>(); + for (CFMetaData.DroppedColumn column : createDroppedColumnsFromDroppedColumnRows(QueryProcessor.resultify(query, serializedColumns))) + columns.put(UTF8Type.instance.decompose(column.name), column); + return columns; + } + + private static List createDroppedColumnsFromDroppedColumnRows(UntypedResultSet rows) + { + List columns = new ArrayList<>(rows.size()); + rows.forEach(row -> columns.add(createDroppedColumnFromDroppedColumnRow(row))); + return columns; + } + + private static CFMetaData.DroppedColumn createDroppedColumnFromDroppedColumnRow(UntypedResultSet.Row row) + { + String name = row.getString("column_name"); + AbstractType type = TypeParser.parse(row.getString("type")); + long droppedTime = TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time")); + + return new CFMetaData.DroppedColumn(name, type, droppedTime); + } + + /* + * Trigger metadata serialization/deserialization. + */ + + private static void addTriggerToSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation) + { + new RowUpdateBuilder(Triggers, timestamp, mutation) + .clustering(table.cfName, trigger.name) + .frozenMap("options", Collections.singletonMap("class", trigger.classOption)) + .build(); + } + + private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation) + { + RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name); + } + + /** + * Deserialize triggers from storage-level representation. + * + * @param partition storage-level partition containing the trigger definitions + * @return the list of processed TriggerDefinitions + */ + private static Triggers createTriggersFromTriggersPartition(RowIterator partition) + { + Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder(); + String query = String.format("SELECT * FROM %s.%s", NAME, TRIGGERS); + QueryProcessor.resultify(query, partition).forEach(row -> triggers.add(createTriggerFromTriggerRow(row))); + return triggers.build(); + } + + private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row) + { + String name = row.getString("trigger_name"); + String classOption = row.getFrozenTextMap("options").get("class"); + return new TriggerMetadata(name, classOption); + } + + /* + * View metadata serialization/deserialization. + */ + + public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addViewToSchemaMutation(view, timestamp, true, mutation); + return mutation; + } + + private static void addViewToSchemaMutation(ViewDefinition view, long timestamp, boolean includeColumns, Mutation mutation) + { + RowUpdateBuilder builder = new RowUpdateBuilder(Views, timestamp, mutation) + .clustering(view.viewName); + + CFMetaData table = view.metadata; + + builder.add("include_all_columns", view.includeAllColumns) + .add("base_table_id", view.baseTableId) + .add("base_table_name", view.baseTableMetadata().cfName) + .add("where_clause", view.whereClause) + .add("id", table.cfId); + + addTableParamsToSchemaMutation(table.params, builder); + + if (includeColumns) + { + for (ColumnDefinition column : table.allColumns()) + addColumnToSchemaMutation(table, column, timestamp, mutation); + + for (CFMetaData.DroppedColumn column : table.getDroppedColumns().values()) + addDroppedColumnToSchemaMutation(table, column, timestamp, mutation); + } + + builder.build(); + } + + public static Mutation makeDropViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + + RowUpdateBuilder.deleteRow(Views, timestamp, mutation, view.viewName); + + CFMetaData table = view.metadata; + for (ColumnDefinition column : table.allColumns()) + dropColumnFromSchemaMutation(table, column, timestamp, mutation); + + for (IndexMetadata index : table.getIndexes()) + dropIndexFromSchemaMutation(table, index, timestamp, mutation); + + return mutation; + } + + public static Mutation makeUpdateViewMutation(KeyspaceMetadata keyspace, + ViewDefinition oldView, + ViewDefinition newView, + long timestamp) + { + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + + addViewToSchemaMutation(newView, timestamp, false, mutation); + + MapDifference columnDiff = Maps.difference(oldView.metadata.getColumnMetadata(), + newView.metadata.getColumnMetadata()); + + // columns that are no longer needed + for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values()) + { + dropColumnFromSchemaMutation(oldView.metadata, column, timestamp, mutation); + } + + // newly added columns + for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values()) + addColumnToSchemaMutation(newView.metadata, column, timestamp, mutation); + + // old columns with updated attributes + for (ByteBuffer name : columnDiff.entriesDiffering().keySet()) + addColumnToSchemaMutation(newView.metadata, newView.metadata.getColumnDefinition(name), timestamp, mutation); + + // dropped columns + MapDifference droppedColumnDiff = + Maps.difference(oldView.metadata.getDroppedColumns(), oldView.metadata.getDroppedColumns()); + + // newly dropped columns + for (CFMetaData.DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values()) + addDroppedColumnToSchemaMutation(oldView.metadata, column, timestamp, mutation); + + // columns added then dropped again + for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet()) + addDroppedColumnToSchemaMutation(newView.metadata, newView.metadata.getDroppedColumns().get(name), timestamp, mutation); + + return mutation; + } + + public static ViewDefinition createViewFromName(String keyspace, String view) + { + return readSchemaPartitionForTableAndApply(VIEWS, keyspace, view, partition -> + { + if (partition.isEmpty()) + throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, view)); + + return createViewFromViewPartition(partition); + }); + } + + private static ViewDefinition createViewFromViewPartition(RowIterator partition) + { + String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS); + return createViewFromViewRow(QueryProcessor.resultify(query, partition).one()); + } + + /** + * Deserialize views from storage-level representation. + * + * @param partition storage-level partition containing the view definitions + * @return the list of processed ViewDefinitions + */ + private static Views createViewsFromViewsPartition(RowIterator partition) + { + Views.Builder views = org.apache.cassandra.schema.Views.builder(); + String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS); + for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) + { + ViewDefinition view = createViewFromViewRow(row); + views.add(view); + } + return views.build(); + } + + private static ViewDefinition createViewFromViewRow(UntypedResultSet.Row row) + { + String keyspace = row.getString("keyspace_name"); + String view = row.getString("view_name"); + UUID id = row.getUUID("id"); + UUID baseTableId = row.getUUID("base_table_id"); + String baseTableName = row.getString("base_table_name"); + boolean includeAll = row.getBoolean("include_all_columns"); + String whereClause = row.getString("where_clause"); + + List columns = + readSchemaPartitionForTableAndApply(COLUMNS, keyspace, view, SchemaKeyspace::createColumnsFromColumnsPartition); + + Map droppedColumns = + readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, view, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition); + + CFMetaData cfm = CFMetaData.create(keyspace, + view, + id, + false, + true, + false, + false, + true, + columns, + DatabaseDescriptor.getPartitioner()) + .params(createTableParamsFromRow(row)) + .droppedColumns(droppedColumns); + + String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause); + SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect); + + return new ViewDefinition(keyspace, view, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm); + } + + /* + * Secondary Index metadata serialization/deserialization. + */ + + private static void addIndexToSchemaMutation(CFMetaData table, + IndexMetadata index, + long timestamp, + Mutation mutation) + { + RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name); + + builder.add("kind", index.kind.toString()); + builder.frozenMap("options", index.options); + builder.build(); + } + + private static void dropIndexFromSchemaMutation(CFMetaData table, + IndexMetadata index, + long timestamp, + Mutation mutation) + { + RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, index.name); + } + + private static void addUpdatedIndexToSchemaMutation(CFMetaData table, + IndexMetadata index, + long timestamp, + Mutation mutation) + { + addIndexToSchemaMutation(table, index, timestamp, mutation); + } + /** + * Deserialize secondary indexes from storage-level representation. + * + * @param partition storage-level partition containing the index definitions + * @return the list of processed IndexMetadata + */ + private static Indexes createIndexesFromIndexesPartition(CFMetaData cfm, RowIterator partition) + { + Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder(); + String query = String.format("SELECT * FROM %s.%s", NAME, INDEXES); + QueryProcessor.resultify(query, partition).forEach(row -> indexes.add(createIndexMetadataFromIndexesRow(row))); + return indexes.build(); + } + + private static IndexMetadata createIndexMetadataFromIndexesRow(UntypedResultSet.Row row) + { + String name = row.getString("index_name"); + IndexMetadata.Kind type = IndexMetadata.Kind.valueOf(row.getString("kind")); + Map options = row.getFrozenTextMap("options"); + return IndexMetadata.fromSchemaMetadata(name, type, options); + } + + /* + * UDF metadata serialization/deserialization. + */ + + public static Mutation makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addFunctionToSchemaMutation(function, timestamp, mutation); + return mutation; + } + + static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = new RowUpdateBuilder(Functions, timestamp, mutation) + .clustering(function.name().name, functionSignatureWithTypes(function)); + + adder.add("body", function.body()) + .add("language", function.language()) + .add("return_type", function.returnType().toString()) + .add("called_on_null_input", function.isCalledOnNullInput()) + .frozenList("argument_names", function.argNames().stream().map((c) -> bbToString(c.bytes)).collect(Collectors.toList())) + .frozenList("argument_types", function.argTypes().stream().map(AbstractType::toString).collect(Collectors.toList())); + + adder.build(); + } + + public static Mutation makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionSignatureWithTypes(function)); + } + + private static Collection createFunctionsFromFunctionsPartition(RowIterator partition) + { + List functions = new ArrayList<>(); + String query = String.format("SELECT * FROM %s.%s", NAME, FUNCTIONS); + for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) + functions.add(createFunctionFromFunctionRow(row)); + return functions; + } + + private static UDFunction createFunctionFromFunctionRow(UntypedResultSet.Row row) + { + String ksName = row.getString("keyspace_name"); + String functionName = row.getString("function_name"); + FunctionName name = new FunctionName(ksName, functionName); + + List argNames = new ArrayList<>(); + if (row.has("argument_names")) + for (String arg : row.getFrozenList("argument_names", UTF8Type.instance)) + argNames.add(new ColumnIdentifier(arg, true)); + + List> argTypes = new ArrayList<>(); + if (row.has("argument_types")) + for (String type : row.getFrozenList("argument_types", UTF8Type.instance)) + argTypes.add(parseType(type)); + + AbstractType returnType = parseType(row.getString("return_type")); + + String language = row.getString("language"); + String body = row.getString("body"); + boolean calledOnNullInput = row.getBoolean("called_on_null_input"); + + org.apache.cassandra.cql3.functions.Function existing = Schema.instance.findFunction(name, argTypes).orElse(null); + if (existing instanceof UDFunction) + { + // This check prevents duplicate compilation of effectively the same UDF. + // Duplicate compilation attempts can occur on the coordinator node handling the CREATE FUNCTION + // statement, since CreateFunctionStatement needs to execute UDFunction.create but schema migration + // also needs that (since it needs to handle its own change). + UDFunction udf = (UDFunction) existing; + if (udf.argNames().equals(argNames) && // arg types checked in Functions.find call + udf.returnType().equals(returnType) && + !udf.isAggregate() && + udf.language().equals(language) && + udf.body().equals(body) && + udf.isCalledOnNullInput() == calledOnNullInput) + { - logger.debug("Skipping duplicate compilation of already existing UDF {}", name); ++ logger.trace("Skipping duplicate compilation of already existing UDF {}", name); + return udf; + } + } + + try + { + return UDFunction.create(name, argNames, argTypes, returnType, calledOnNullInput, language, body); + } + catch (InvalidRequestException e) + { + logger.error(String.format("Cannot load function '%s' from schema: this function won't be available (on this node)", name), e); + return UDFunction.createBrokenFunction(name, argNames, argTypes, returnType, calledOnNullInput, language, body, e); + } + } + + /* + * Aggregate UDF metadata serialization/deserialization. + */ + + public static Mutation makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addAggregateToSchemaMutation(aggregate, timestamp, mutation); + return mutation; + } + + static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = new RowUpdateBuilder(Aggregates, timestamp, mutation) + .clustering(aggregate.name().name, functionSignatureWithTypes(aggregate)); + + adder.add("return_type", aggregate.returnType().toString()) + .add("state_func", aggregate.stateFunction().name().name) + .add("state_type", aggregate.stateType() != null ? aggregate.stateType().toString() : null) + .add("final_func", aggregate.finalFunction() != null ? aggregate.finalFunction().name().name : null) + .add("initcond", aggregate.initialCondition()) + .frozenList("argument_types", aggregate.argTypes().stream().map(AbstractType::toString).collect(Collectors.toList())) + .build(); + } + + private static Collection createAggregatesFromAggregatesPartition(RowIterator partition) + { + List aggregates = new ArrayList<>(); + String query = String.format("SELECT * FROM %s.%s", NAME, AGGREGATES); + for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) + aggregates.add(createAggregateFromAggregateRow(row)); + return aggregates; + } + + private static UDAggregate createAggregateFromAggregateRow(UntypedResultSet.Row row) + { + String ksName = row.getString("keyspace_name"); + String functionName = row.getString("aggregate_name"); + FunctionName name = new FunctionName(ksName, functionName); + + List types = row.getFrozenList("argument_types", UTF8Type.instance); + + List> argTypes; + if (types == null) + { + argTypes = Collections.emptyList(); + } + else + { + argTypes = new ArrayList<>(types.size()); + for (String type : types) + argTypes.add(parseType(type)); + } + + AbstractType returnType = parseType(row.getString("return_type")); + + FunctionName stateFunc = new FunctionName(ksName, (row.getString("state_func"))); + FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null; + AbstractType stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null; + ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null; + + try + { + return UDAggregate.create(name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond); + } + catch (InvalidRequestException reason) + { + return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason); + } + } + + public static Mutation makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionSignatureWithTypes(aggregate)); + } + + private static AbstractType parseType(String str) + { + return TypeParser.parse(str); + } + + // We allow method overloads, so a function is not uniquely identified by its name only, but + // also by its argument types. To distinguish overloads of given function name in the schema + // we use a "signature" which is just a list of it's CQL argument types (we could replace that by + // using a "signature" UDT that would be comprised of the function name and argument types, + // which we could then use as clustering column. But as we haven't yet used UDT in system tables, + // We'll leave that decision to #6717). + public static ByteBuffer functionSignatureWithTypes(AbstractFunction fun) + { + ListType list = ListType.getInstance(UTF8Type.instance, false); + List strList = new ArrayList<>(fun.argTypes().size()); + for (AbstractType argType : fun.argTypes()) + strList.add(argType.asCQL3Type().toString()); + return list.decompose(strList); + } +}