Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 282EF10151 for ; Tue, 16 Dec 2014 23:08:11 +0000 (UTC) Received: (qmail 69854 invoked by uid 500); 16 Dec 2014 23:08:10 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 69719 invoked by uid 500); 16 Dec 2014 23:08:10 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 69465 invoked by uid 99); 16 Dec 2014 23:08:10 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Dec 2014 23:08:10 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4DD89A2DDCC; Tue, 16 Dec 2014 23:08:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aleksey@apache.org To: commits@cassandra.apache.org Date: Tue, 16 Dec 2014 23:08:14 -0000 Message-Id: <7e91c5dab3b34f76a3174c05f313663c@git.apache.org> In-Reply-To: <939f051cc6b048098b5d1742793ba57e@git.apache.org> References: <939f051cc6b048098b5d1742793ba57e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] cassandra git commit: Isolate schema serializaton code Isolate schema serializaton code patch by Aleksey Yeschenko; reviewed by Tyler Hobbs for CASSANDRA-8261 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e9d345f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e9d345f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e9d345f Branch: refs/heads/trunk Commit: 3e9d345f0078922950157de4fd4c7992512b43b8 Parents: 32ac6af Author: Aleksey Yeschenko Authored: Wed Dec 17 01:12:19 2014 +0300 Committer: Aleksey Yeschenko Committed: Wed Dec 17 01:34:16 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 331 +--- .../cassandra/config/ColumnDefinition.java | 134 +- .../cassandra/config/DatabaseDescriptor.java | 87 +- .../org/apache/cassandra/config/KSMetaData.java | 155 +- .../org/apache/cassandra/config/Schema.java | 248 ++- .../cassandra/config/TriggerDefinition.java | 63 - .../org/apache/cassandra/config/UTMetaData.java | 91 +- .../cassandra/cql3/functions/Functions.java | 22 +- .../cql3/functions/JavaSourceUDFFactory.java | 5 +- .../cassandra/cql3/functions/UDAggregate.java | 206 +-- .../cassandra/cql3/functions/UDFunction.java | 193 +-- .../cassandra/cql3/functions/UDHelper.java | 12 +- .../cql3/statements/CreateTableStatement.java | 24 +- .../apache/cassandra/db/AtomicBTreeColumns.java | 3 +- .../apache/cassandra/db/BatchlogManager.java | 14 +- .../db/DefinitionsUpdateVerbHandler.java | 3 +- .../org/apache/cassandra/db/DefsTables.java | 622 -------- .../cassandra/db/HintedHandOffManager.java | 16 +- src/java/org/apache/cassandra/db/Keyspace.java | 2 +- src/java/org/apache/cassandra/db/Memtable.java | 2 +- .../db/MigrationRequestVerbHandler.java | 3 +- .../org/apache/cassandra/db/SystemKeyspace.java | 514 ++---- .../hadoop/ColumnFamilyRecordReader.java | 28 +- .../cassandra/hadoop/cql3/CqlRecordReader.java | 28 +- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 23 +- .../hadoop/pig/AbstractCassandraStorage.java | 46 +- .../apache/cassandra/hadoop/pig/CqlStorage.java | 26 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 10 +- .../cassandra/schema/LegacySchemaTables.java | 1480 ++++++++++++++++++ .../cassandra/service/CassandraDaemon.java | 8 +- .../apache/cassandra/service/ClientState.java | 3 +- .../cassandra/service/MigrationManager.java | 127 +- .../apache/cassandra/service/MigrationTask.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 2 +- .../cassandra/service/StorageService.java | 4 +- .../cassandra/thrift/ThriftConversion.java | 5 +- .../org/apache/cassandra/tools/BulkLoader.java | 6 +- .../apache/cassandra/tools/SSTableExport.java | 3 +- .../apache/cassandra/tools/SSTableImport.java | 2 +- .../cassandra/tools/SSTableLevelResetter.java | 3 +- .../cassandra/tools/StandaloneScrubber.java | 3 +- .../cassandra/tools/StandaloneSplitter.java | 4 +- .../cassandra/tools/StandaloneUpgrader.java | 2 +- .../apache/cassandra/config/CFMetaDataTest.java | 15 +- .../config/DatabaseDescriptorTest.java | 7 +- .../org/apache/cassandra/config/DefsTest.java | 564 ------- .../apache/cassandra/config/KSMetaDataTest.java | 6 +- .../org/apache/cassandra/cql3/CQLTester.java | 4 +- .../cassandra/db/BatchlogManagerTest.java | 4 +- .../apache/cassandra/db/HintedHandOffTest.java | 8 +- .../schema/LegacySchemaTablesTest.java | 568 +++++++ .../service/EmbeddedCassandraServiceTest.java | 2 +- .../service/StorageServiceServerTest.java | 3 +- 54 files changed, 2792 insertions(+), 2957 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3571c1e..6f4cdec 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Modernize schema tables (CASSANDRA-8261) * Support for user-defined aggregation functions (CASSANDRA-8053) * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419) * Refactor SelectStatement, return IN results in natural order instead http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index eb78ec7..0730ba7 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.statements.CFStatement; import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.db.*; @@ -50,14 +49,12 @@ import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.compress.LZ4Compressor; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.github.jamm.Unmetered; -import static org.apache.cassandra.utils.FBUtilities.fromJsonMap; -import static org.apache.cassandra.utils.FBUtilities.json; - /** * This class can be tricky to modify. Please read http://wiki.apache.org/cassandra/ConfigurationNotes for how to do so safely. */ @@ -221,7 +218,7 @@ public final class CFMetaData public volatile CompressionParameters compressionParameters = new CompressionParameters(null); // attribute setters that return the modified CFMetaData instance - public CFMetaData comment(String prop) { comment = Strings.nullToEmpty(prop); return this;} + public CFMetaData comment(String prop) {comment = Strings.nullToEmpty(prop); return this;} public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;} public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;} public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;} @@ -344,8 +341,8 @@ public final class CFMetaData // Depends on parent's cache setting, turn on its index CF's cache. // Row caching is never enabled; see CASSANDRA-5732 CachingOptions indexCaching = parent.getCaching().keyCache.isEnabled() - ? CachingOptions.KEYS_ONLY - : CachingOptions.NONE; + ? CachingOptions.KEYS_ONLY + : CachingOptions.NONE; return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, indexComparator, parent.cfId) .keyValidator(info.type) @@ -386,7 +383,8 @@ public final class CFMetaData return copyOpts(new CFMetaData(ksName, cfName, cfType, comparator, newCfId), this); } - static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD) + @VisibleForTesting + public static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD) { List clonedColumns = new ArrayList<>(oldCFMD.allColumns().size()); for (ColumnDefinition cd : oldCFMD.allColumns()) @@ -449,6 +447,11 @@ public final class CFMetaData return cfName.contains("."); } + public Map getColumnMetadata() + { + return columnMetadata; + } + /** * * @return The name of the parent cf if this is a seconday index @@ -723,14 +726,9 @@ public final class CFMetaData public void reload() { - Row cfDefRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, ksName, cfName); - - if (cfDefRow.cf == null || !cfDefRow.cf.hasColumns()) - throw new RuntimeException(String.format("%s not found in the schema definitions keyspace.", ksName + ":" + cfName)); - try { - apply(fromSchema(cfDefRow)); + apply(LegacySchemaTables.createTableFromName(ksName, cfName)); } catch (ConfigurationException e) { @@ -739,13 +737,12 @@ public final class CFMetaData } /** - * Updates CFMetaData in-place to match cf_def - * - * *Note*: This method left package-private only for DefsTest, don't use directly! + * Updates CFMetaData in-place to match cfm * * @throws ConfigurationException if ks/cf names or cf ids didn't match */ - void apply(CFMetaData cfm) throws ConfigurationException + @VisibleForTesting + public void apply(CFMetaData cfm) throws ConfigurationException { logger.debug("applying {} to {}", cfm, this); @@ -1116,89 +1113,6 @@ public final class CFMetaData "interval (%d).", maxIndexInterval, minIndexInterval)); } - /** - * Create schema mutations to update this metadata to provided new state. - * - * @param newState The new metadata (for the same CF) - * @param modificationTimestamp Timestamp to use for mutation - * @param fromThrift whether the newState comes from thrift - * - * @return Difference between attributes in form of schema mutation - */ - public Mutation toSchemaUpdate(CFMetaData newState, long modificationTimestamp, boolean fromThrift) - { - Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(ksName)); - - newState.toSchemaNoColumnsNoTriggers(mutation, modificationTimestamp); - - MapDifference columnDiff = Maps.difference(columnMetadata, newState.columnMetadata); - - // columns that are no longer needed - for (ColumnDefinition cd : columnDiff.entriesOnlyOnLeft().values()) - { - // Thrift only knows about the REGULAR ColumnDefinition type, so don't consider other type - // are being deleted just because they are not here. - if (fromThrift && cd.kind != ColumnDefinition.Kind.REGULAR) - continue; - - cd.deleteFromSchema(mutation, modificationTimestamp); - } - - // newly added columns - for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values()) - cd.toSchema(mutation, modificationTimestamp); - - // old columns with updated attributes - for (ByteBuffer name : columnDiff.entriesDiffering().keySet()) - { - ColumnDefinition cd = newState.columnMetadata.get(name); - cd.toSchema(mutation, modificationTimestamp); - } - - MapDifference triggerDiff = Maps.difference(triggers, newState.triggers); - - // dropped triggers - for (TriggerDefinition td : triggerDiff.entriesOnlyOnLeft().values()) - td.deleteFromSchema(mutation, cfName, modificationTimestamp); - - // newly created triggers - for (TriggerDefinition td : triggerDiff.entriesOnlyOnRight().values()) - td.toSchema(mutation, cfName, modificationTimestamp); - - return mutation; - } - - /** - * Remove all CF attributes from schema - * - * @param timestamp Timestamp to use - * - * @return Mutation to use to completely remove cf from schema - */ - public Mutation dropFromSchema(long timestamp) - { - Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(ksName)); - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnFamiliesTable); - int ldt = (int) (System.currentTimeMillis() / 1000); - - Composite prefix = SystemKeyspace.SchemaColumnFamiliesTable.comparator.make(cfName); - cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); - - for (ColumnDefinition cd : allColumns()) - cd.deleteFromSchema(mutation, timestamp); - - for (TriggerDefinition td : triggers.values()) - td.deleteFromSchema(mutation, cfName, timestamp); - - for (String indexName : Keyspace.open(this.ksName).getColumnFamilyStore(this.cfName).getBuiltIndexes()) - { - ColumnFamily indexCf = mutation.addOrGet(SystemKeyspace.BuiltIndexesTable); - indexCf.addTombstone(indexCf.getComparator().makeCellName(indexName), ldt, timestamp); - } - - return mutation; - } - public boolean isPurged() { return isPurged; @@ -1209,215 +1123,6 @@ public final class CFMetaData isPurged = true; } - public void toSchema(Mutation mutation, long timestamp) - { - toSchemaNoColumnsNoTriggers(mutation, timestamp); - - for (TriggerDefinition td : triggers.values()) - td.toSchema(mutation, cfName, timestamp); - - for (ColumnDefinition cd : allColumns()) - cd.toSchema(mutation, timestamp); - } - - private void toSchemaNoColumnsNoTriggers(Mutation mutation, long timestamp) - { - // For property that can be null (and can be changed), we insert tombstones, to make sure - // we don't keep a property the user has removed - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnFamiliesTable); - Composite prefix = SystemKeyspace.SchemaColumnFamiliesTable.comparator.make(cfName); - CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp); - - adder.add("cf_id", cfId); - adder.add("type", cfType.toString()); - - if (isSuper()) - { - // We need to continue saving the comparator and subcomparator separatly, otherwise - // we won't know at deserialization if the subcomparator should be taken into account - // TODO: we should implement an on-start migration if we want to get rid of that. - adder.add("comparator", comparator.subtype(0).toString()); - adder.add("subcomparator", comparator.subtype(1).toString()); - } - else - { - adder.add("comparator", comparator.toString()); - } - - adder.add("comment", comment); - adder.add("read_repair_chance", readRepairChance); - adder.add("local_read_repair_chance", dcLocalReadRepairChance); - adder.add("gc_grace_seconds", gcGraceSeconds); - adder.add("default_validator", defaultValidator.toString()); - adder.add("key_validator", keyValidator.toString()); - adder.add("min_compaction_threshold", minCompactionThreshold); - adder.add("max_compaction_threshold", maxCompactionThreshold); - adder.add("bloom_filter_fp_chance", getBloomFilterFpChance()); - adder.add("memtable_flush_period_in_ms", memtableFlushPeriod); - adder.add("caching", caching.toString()); - adder.add("default_time_to_live", defaultTimeToLive); - adder.add("compaction_strategy_class", compactionStrategyClass.getName()); - adder.add("compression_parameters", json(compressionParameters.asThriftOptions())); - adder.add("compaction_strategy_options", json(compactionStrategyOptions)); - adder.add("min_index_interval", minIndexInterval); - adder.add("max_index_interval", maxIndexInterval); - adder.add("speculative_retry", speculativeRetry.toString()); - - for (Map.Entry entry : droppedColumns.entrySet()) - adder.addMapEntry("dropped_columns", entry.getKey().toString(), entry.getValue()); - - adder.add("is_dense", isDense); - } - - @VisibleForTesting - public static CFMetaData fromSchemaNoTriggers(UntypedResultSet.Row result, UntypedResultSet serializedColumnDefinitions) - { - try - { - String ksName = result.getString("keyspace_name"); - String cfName = result.getString("columnfamily_name"); - - AbstractType rawComparator = TypeParser.parse(result.getString("comparator")); - AbstractType subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null; - ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type")); - - AbstractType fullRawComparator = makeRawAbstractType(rawComparator, subComparator); - - List columnDefs = ColumnDefinition.fromSchema(serializedColumnDefinitions, - ksName, - cfName, - fullRawComparator, - cfType == ColumnFamilyType.Super); - - boolean isDense = result.has("is_dense") - ? result.getBoolean("is_dense") - : calculateIsDense(fullRawComparator, columnDefs); - - CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense); - - // if we are upgrading, we use id generated from names initially - UUID cfId = result.has("cf_id") - ? result.getUUID("cf_id") - : generateLegacyCfId(ksName, cfName); - - CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId); - cfm.isDense(isDense); - - cfm.readRepairChance(result.getDouble("read_repair_chance")); - cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance")); - cfm.gcGraceSeconds(result.getInt("gc_grace_seconds")); - cfm.defaultValidator(TypeParser.parse(result.getString("default_validator"))); - cfm.keyValidator(TypeParser.parse(result.getString("key_validator"))); - cfm.minCompactionThreshold(result.getInt("min_compaction_threshold")); - cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold")); - if (result.has("comment")) - cfm.comment(result.getString("comment")); - if (result.has("memtable_flush_period_in_ms")) - cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms")); - cfm.caching(CachingOptions.fromString(result.getString("caching"))); - if (result.has("default_time_to_live")) - cfm.defaultTimeToLive(result.getInt("default_time_to_live")); - if (result.has("speculative_retry")) - cfm.speculativeRetry(SpeculativeRetry.fromString(result.getString("speculative_retry"))); - cfm.compactionStrategyClass(createCompactionStrategy(result.getString("compaction_strategy_class"))); - cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters")))); - cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options"))); - - if (result.has("min_index_interval")) - cfm.minIndexInterval(result.getInt("min_index_interval")); - - if (result.has("max_index_interval")) - cfm.maxIndexInterval(result.getInt("max_index_interval")); - - if (result.has("bloom_filter_fp_chance")) - cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance")); - else - cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance()); - - if (result.has("dropped_columns")) - cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance))); - - for (ColumnDefinition cd : columnDefs) - cfm.addOrReplaceColumnDefinition(cd); - - return cfm.rebuild(); - } - catch (SyntaxException | ConfigurationException e) - { - throw new RuntimeException(e); - } - } - - public void addColumnMetadataFromAliases(List aliases, AbstractType comparator, ColumnDefinition.Kind kind) - { - if (comparator instanceof CompositeType) - { - CompositeType ct = (CompositeType)comparator; - for (int i = 0; i < aliases.size(); ++i) - { - if (aliases.get(i) != null) - { - addOrReplaceColumnDefinition(new ColumnDefinition(this, aliases.get(i), ct.types.get(i), i, kind)); - } - } - } - else - { - assert aliases.size() <= 1; - if (!aliases.isEmpty() && aliases.get(0) != null) - addOrReplaceColumnDefinition(new ColumnDefinition(this, aliases.get(0), comparator, null, kind)); - } - } - - /** - * Deserialize CF metadata from low-level representation - * - * @return Metadata deserialized from schema - */ - public static CFMetaData fromSchema(UntypedResultSet.Row result) - { - String ksName = result.getString("keyspace_name"); - String cfName = result.getString("columnfamily_name"); - - Row serializedColumns = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNS_TABLE, ksName, cfName); - CFMetaData cfm = fromSchemaNoTriggers(result, ColumnDefinition.resultify(serializedColumns)); - - Row serializedTriggers = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_TRIGGERS_TABLE, ksName, cfName); - addTriggerDefinitionsFromSchema(cfm, serializedTriggers); - - return cfm; - } - - private static CFMetaData fromSchema(Row row) - { - UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_columnfamilies", row).one(); - return fromSchema(result); - } - - private static Map convertDroppedColumns(Map raw) - { - Map converted = Maps.newHashMap(); - for (Map.Entry entry : raw.entrySet()) - converted.put(new ColumnIdentifier(entry.getKey(), true), entry.getValue()); - return converted; - } - - /** - * Convert current metadata into schema mutation - * - * @param timestamp Timestamp to use - * - * @return Low-level representation of the CF - * - * @throws ConfigurationException if any of the attributes didn't pass validation - */ - public Mutation toSchema(long timestamp) throws ConfigurationException - { - Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(ksName)); - toSchema(mutation, timestamp); - return mutation; - } - // The comparator to validate the definition name. public AbstractType getColumnDefinitionComparator(ColumnDefinition def) @@ -1474,12 +1179,6 @@ public final class CFMetaData return columnMetadata.remove(def.name.bytes) != null; } - private static void addTriggerDefinitionsFromSchema(CFMetaData cfDef, Row serializedTriggerDefinitions) - { - for (TriggerDefinition td : TriggerDefinition.fromSchema(serializedTriggerDefinitions)) - cfDef.triggers.put(td.name, td); - } - public void addTriggerDefinition(TriggerDefinition def) throws InvalidRequestException { if (containsTriggerDefinition(def)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/ColumnDefinition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java index 354a6f1..1cc7f1d 100644 --- a/src/java/org/apache/cassandra/config/ColumnDefinition.java +++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java @@ -26,25 +26,11 @@ import com.google.common.base.Objects; import com.google.common.collect.Lists; import org.apache.cassandra.cql3.*; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.utils.FBUtilities; - -import static org.apache.cassandra.utils.FBUtilities.json; public class ColumnDefinition extends ColumnSpecification { - // system.schema_columns column names - private static final String COLUMN_NAME = "column_name"; - private static final String TYPE = "validator"; - private static final String INDEX_TYPE = "index_type"; - private static final String INDEX_OPTIONS = "index_options"; - private static final String INDEX_NAME = "index_name"; - private static final String COMPONENT_INDEX = "component_index"; - private static final String KIND = "type"; - /* * The type of CQL3 column this definition represents. * There is 3 main type of CQL3 columns: those parts of the partition key, @@ -62,20 +48,7 @@ public class ColumnDefinition extends ColumnSpecification CLUSTERING_COLUMN, REGULAR, STATIC, - COMPACT_VALUE; - - public String serialize() - { - // For backward compatibility we need to special case CLUSTERING_COLUMN - return this == CLUSTERING_COLUMN ? "clustering_key" : this.toString().toLowerCase(); - } - - public static Kind deserialize(String value) - { - if (value.equalsIgnoreCase("clustering_key")) - return CLUSTERING_COLUMN; - return Enum.valueOf(Kind.class, value.toUpperCase()); - } + COMPACT_VALUE } public final Kind kind; @@ -266,36 +239,6 @@ public class ColumnDefinition extends ColumnSpecification return kind == Kind.REGULAR || kind == Kind.STATIC; } - /** - * Drop specified column from the schema using given mutation. - * - * @param mutation The schema mutation - * @param timestamp The timestamp to use for column modification - */ - public void deleteFromSchema(Mutation mutation, long timestamp) - { - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnsTable); - int ldt = (int) (System.currentTimeMillis() / 1000); - - // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference). - Composite prefix = SystemKeyspace.SchemaColumnsTable.comparator.make(cfName, name.toString()); - cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); - } - - public void toSchema(Mutation mutation, long timestamp) - { - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnsTable); - Composite prefix = SystemKeyspace.SchemaColumnsTable.comparator.make(cfName, name.toString()); - CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp); - - adder.add(TYPE, type.toString()); - adder.add(INDEX_TYPE, indexType == null ? null : indexType.toString()); - adder.add(INDEX_OPTIONS, json(indexOptions)); - adder.add(INDEX_NAME, indexName); - adder.add(COMPONENT_INDEX, componentIndex); - adder.add(KIND, kind.serialize()); - } - public ColumnDefinition apply(ColumnDefinition def) throws ConfigurationException { assert kind == def.kind && Objects.equal(componentIndex, def.componentIndex); @@ -323,81 +266,6 @@ public class ColumnDefinition extends ColumnSpecification kind); } - public static UntypedResultSet resultify(Row serializedColumns) - { - String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.SCHEMA_COLUMNS_TABLE); - return QueryProcessor.resultify(query, serializedColumns); - } - - /** - * Deserialize columns from storage-level representation - * - * @param serializedColumns storage-level partition containing the column definitions - * @return the list of processed ColumnDefinitions - */ - public static List fromSchema(UntypedResultSet serializedColumns, String ksName, String cfName, AbstractType rawComparator, boolean isSuper) - { - List cds = new ArrayList<>(); - for (UntypedResultSet.Row row : serializedColumns) - { - Kind kind = row.has(KIND) - ? Kind.deserialize(row.getString(KIND)) - : Kind.REGULAR; - - Integer componentIndex = null; - if (row.has(COMPONENT_INDEX)) - componentIndex = row.getInt(COMPONENT_INDEX); - else if (kind == Kind.CLUSTERING_COLUMN && isSuper) - componentIndex = 1; // A ColumnDefinition for super columns applies to the column component - - // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we - // we need to use the comparator fromString method - AbstractType comparator = getComponentComparator(rawComparator, componentIndex, kind); - ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString(COLUMN_NAME)), comparator); - - AbstractType validator; - try - { - validator = TypeParser.parse(row.getString(TYPE)); - } - catch (RequestValidationException e) - { - throw new RuntimeException(e); - } - - IndexType indexType = null; - if (row.has(INDEX_TYPE)) - indexType = IndexType.valueOf(row.getString(INDEX_TYPE)); - - Map indexOptions = null; - if (row.has(INDEX_OPTIONS)) - indexOptions = FBUtilities.fromJsonMap(row.getString(INDEX_OPTIONS)); - - String indexName = null; - if (row.has(INDEX_NAME)) - indexName = row.getString(INDEX_NAME); - - cds.add(new ColumnDefinition(ksName, cfName, name, validator, indexType, indexOptions, indexName, componentIndex, kind)); - } - - return cds; - } - - public static AbstractType getComponentComparator(AbstractType rawComparator, Integer componentIndex, ColumnDefinition.Kind kind) - { - switch (kind) - { - case REGULAR: - if (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType))) - return rawComparator; - - return ((CompositeType)rawComparator).types.get(componentIndex); - default: - // CQL3 column names are UTF8 - return UTF8Type.instance; - } - } - public String getIndexName() { return indexName; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index a0e84f9..f2897ee 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -18,49 +18,29 @@ package org.apache.cassandra.config; import java.io.File; -import java.io.FileFilter; import java.io.IOException; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Enumeration; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.net.*; +import java.util.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Longs; -import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.auth.AllowAllAuthenticator; -import org.apache.cassandra.auth.AllowAllAuthorizer; -import org.apache.cassandra.auth.AllowAllInternodeAuthenticator; -import org.apache.cassandra.auth.IAuthenticator; -import org.apache.cassandra.auth.IAuthorizer; -import org.apache.cassandra.auth.IInternodeAuthenticator; + +import org.apache.cassandra.auth.*; import org.apache.cassandra.config.Config.RequestSchedulerId; import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DefsTables; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.IAllocator; -import org.apache.cassandra.locator.DynamicEndpointSnitch; -import org.apache.cassandra.locator.EndpointSnitchInfo; -import org.apache.cassandra.locator.IEndpointSnitch; -import org.apache.cassandra.locator.SeedProvider; +import org.apache.cassandra.locator.*; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.scheduler.NoScheduler; @@ -69,10 +49,7 @@ import org.apache.cassandra.thrift.ThriftServer; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.memory.HeapPool; -import org.apache.cassandra.utils.memory.NativePool; -import org.apache.cassandra.utils.memory.MemtablePool; -import org.apache.cassandra.utils.memory.SlabPool; +import org.apache.cassandra.utils.memory.*; public class DatabaseDescriptor { @@ -585,9 +562,6 @@ public class DatabaseDescriptor conf.server_encryption_options = conf.encryption_options; } - // hardcoded system keyspace - Schema.instance.load(SystemKeyspace.definition()); - // load the seeds for node contact points if (conf.seed_provider == null) { @@ -620,53 +594,6 @@ public class DatabaseDescriptor return conf.dynamic_snitch ? new DynamicEndpointSnitch(snitch) : snitch; } - /** load keyspace (keyspace) definitions, but do not initialize the keyspace instances. */ - public static void loadSchemas() - { - ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_KEYSPACES_TABLE); - - // if keyspace with definitions is empty try loading the old way - if (schemaCFS.estimateKeys() == 0) - { - logger.info("Couldn't detect any schema definitions in local storage."); - // peek around the data directories to see if anything is there. - if (hasExistingNoSystemTables()) - logger.info("Found keyspace data in data directories. Consider using cqlsh to define your schema."); - else - logger.info("To create keyspaces and column families, see 'help create' in cqlsh."); - } - else - { - Schema.instance.load(DefsTables.loadFromKeyspace()); - } - - Schema.instance.updateVersion(); - } - - private static boolean hasExistingNoSystemTables() - { - for (String dataDir : getAllDataFileLocations()) - { - File dataPath = new File(dataDir); - if (dataPath.exists() && dataPath.isDirectory()) - { - // see if there are other directories present. - int dirCount = dataPath.listFiles(new FileFilter() - { - public boolean accept(File pathname) - { - return pathname.isDirectory() && !pathname.getName().equals(SystemKeyspace.NAME); - } - }).length; - - if (dirCount > 0) - return true; - } - } - - return false; - } - public static IAuthenticator getAuthenticator() { return authenticator; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/KSMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java index e5576ad..1537aae 100644 --- a/src/java/org/apache/cassandra/config/KSMetaData.java +++ b/src/java/org/apache/cassandra/config/KSMetaData.java @@ -21,15 +21,10 @@ import java.util.*; import com.google.common.base.Objects; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.*; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.*; import org.apache.cassandra.service.StorageService; -import static org.apache.cassandra.utils.FBUtilities.*; - public final class KSMetaData { public final String name; @@ -43,18 +38,26 @@ public final class KSMetaData public KSMetaData(String name, Class strategyClass, Map strategyOptions, + boolean durableWrites) + { + this(name, strategyClass, strategyOptions, durableWrites, Collections.emptyList(), new UTMetaData()); + } + + public KSMetaData(String name, + Class strategyClass, + Map strategyOptions, boolean durableWrites, Iterable cfDefs) { this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData()); } - KSMetaData(String name, - Class strategyClass, - Map strategyOptions, - boolean durableWrites, - Iterable cfDefs, - UTMetaData userTypes) + private KSMetaData(String name, + Class strategyClass, + Map strategyOptions, + boolean durableWrites, + Iterable cfDefs, + UTMetaData userTypes) { this.name = name; this.strategyClass = strategyClass == null ? NetworkTopologyStrategy.class : strategyClass; @@ -82,9 +85,27 @@ public final class KSMetaData return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs, new UTMetaData()); } - public static KSMetaData cloneWith(KSMetaData ksm, Iterable cfDefs) + public KSMetaData cloneWithTableRemoved(CFMetaData table) + { + // clone ksm but do not include the new table + List newTables = new ArrayList<>(cfMetaData().values()); + newTables.remove(table); + assert newTables.size() == cfMetaData().size() - 1; + return cloneWith(newTables, userTypes); + } + + public KSMetaData cloneWithTableAdded(CFMetaData table) + { + // clone ksm but include the new table + List newTables = new ArrayList<>(cfMetaData().values()); + newTables.add(table); + assert newTables.size() == cfMetaData().size() + 1; + return cloneWith(newTables, userTypes); + } + + public KSMetaData cloneWith(Iterable tables, UTMetaData types) { - return new KSMetaData(ksm.name, ksm.strategyClass, ksm.strategyOptions, ksm.durableWrites, cfDefs, ksm.userTypes); + return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types); } public static KSMetaData testMetadata(String name, Class strategyClass, Map strategyOptions, CFMetaData... cfDefs) @@ -145,11 +166,6 @@ public final class KSMetaData return Collections.singletonMap("replication_factor", rf.toString()); } - public Mutation toSchemaUpdate(KSMetaData newState, long modificationTimestamp) - { - return newState.toSchema(modificationTimestamp); - } - public KSMetaData validate() throws ConfigurationException { if (!CFMetaData.isNameValid(name)) @@ -165,107 +181,4 @@ public final class KSMetaData return this; } - - public KSMetaData reloadAttributes() - { - Row ksDefRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, name); - - if (ksDefRow.cf == null) - throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", name, SystemKeyspace.SCHEMA_KEYSPACES_TABLE)); - - return fromSchema(ksDefRow, Collections.emptyList(), userTypes); - } - - public Mutation dropFromSchema(long timestamp) - { - Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(name)); - - mutation.delete(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, timestamp); - mutation.delete(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, timestamp); - mutation.delete(SystemKeyspace.SCHEMA_COLUMNS_TABLE, timestamp); - mutation.delete(SystemKeyspace.SCHEMA_TRIGGERS_TABLE, timestamp); - mutation.delete(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, timestamp); - mutation.delete(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, timestamp); - mutation.delete(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, timestamp); - mutation.delete(SystemKeyspace.BUILT_INDEXES_TABLE, timestamp); - - return mutation; - } - - public Mutation toSchema(long timestamp) - { - Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(name)); - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaKeyspacesTable); - CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.SchemaKeyspacesTable.comparator.builder().build(), timestamp); - - adder.add("durable_writes", durableWrites); - adder.add("strategy_class", strategyClass.getName()); - adder.add("strategy_options", json(strategyOptions)); - - for (CFMetaData cfm : cfMetaData.values()) - cfm.toSchema(mutation, timestamp); - - userTypes.toSchema(mutation, timestamp); - return mutation; - } - - /** - * Deserialize only Keyspace attributes without nested ColumnFamilies - * - * @param row Keyspace attributes in serialized form - * - * @return deserialized keyspace without cf_defs - */ - public static KSMetaData fromSchema(Row row, Iterable cfms, UTMetaData userTypes) - { - UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_keyspaces", row).one(); - try - { - return new KSMetaData(result.getString("keyspace_name"), - AbstractReplicationStrategy.getClass(result.getString("strategy_class")), - fromJsonMap(result.getString("strategy_options")), - result.getBoolean("durable_writes"), - cfms, - userTypes); - } - catch (ConfigurationException e) - { - throw new RuntimeException(e); - } - } - - /** - * Deserialize Keyspace with nested ColumnFamilies - * - * @param serializedKs Keyspace in serialized form - * @param serializedCFs Collection of the serialized ColumnFamilies - * - * @return deserialized keyspace with cf_defs - */ - public static KSMetaData fromSchema(Row serializedKs, Row serializedCFs, Row serializedUserTypes) - { - Map cfs = deserializeColumnFamilies(serializedCFs); - UTMetaData userTypes = new UTMetaData(UTMetaData.fromSchema(serializedUserTypes)); - return fromSchema(serializedKs, cfs.values(), userTypes); - } - - /** - * Deserialize ColumnFamilies from low-level schema representation, all of them belong to the same keyspace - * - * @return map containing name of the ColumnFamily and it's metadata for faster lookup - */ - public static Map deserializeColumnFamilies(Row row) - { - if (row.cf == null) - return Collections.emptyMap(); - - UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system.schema_columnfamilies", row); - Map cfms = new HashMap<>(results.size()); - for (UntypedResultSet.Row result : results) - { - CFMetaData cfm = CFMetaData.fromSchema(result); - cfms.put(cfm.cfName, cfm); - } - return cfms; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index 43cc6b5..21244ab 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.config; -import java.nio.charset.CharacterCodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; @@ -27,13 +26,18 @@ import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.cql3.functions.Functions; +import org.apache.cassandra.cql3.functions.UDAggregate; +import org.apache.cassandra.cql3.functions.UDFunction; import org.apache.cassandra.db.*; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.utils.ConcurrentBiMap; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -78,10 +82,20 @@ public class Schema } /** - * Initialize empty schema object + * Initialize empty schema object and load the hardcoded system tables */ public Schema() - {} + { + load(SystemKeyspace.definition()); + } + + /** load keyspace (keyspace) definitions, but do not initialize the keyspace instances. */ + public Schema loadFromDisk() + { + load(LegacySchemaTables.readSchemaFromSystemTables()); + updateVersion(); + return this; + } /** * Load up non-system keyspaces @@ -350,28 +364,8 @@ public class Schema */ public void updateVersion() { - try - { - MessageDigest versionDigest = MessageDigest.getInstance("MD5"); - - for (Row row : SystemKeyspace.serializedSchema()) - { - if (invalidSchemaRow(row) || ignoredSchemaRow(row)) - continue; - - // we want to digest only live columns - ColumnFamilyStore.removeDeletedColumnsOnly(row.cf, Integer.MAX_VALUE, SecondaryIndexManager.nullUpdater); - row.cf.purgeTombstones(Integer.MAX_VALUE); - row.cf.updateDigest(versionDigest); - } - - version = UUID.nameUUIDFromBytes(versionDigest.digest()); - SystemKeyspace.updateSchemaVersion(version); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + version = LegacySchemaTables.calculateSchemaDigest(); + SystemKeyspace.updateSchemaVersion(version); } /* @@ -399,20 +393,202 @@ public class Schema updateVersionAndAnnounce(); } - public static boolean invalidSchemaRow(Row row) + public void addKeyspace(KSMetaData ksm) { - return row.cf == null || (row.cf.isMarkedForDelete() && !row.cf.hasColumns()); + assert getKSMetaData(ksm.name) == null; + load(ksm); + + Keyspace.open(ksm.name); + MigrationManager.instance.notifyCreateKeyspace(ksm); } - public static boolean ignoredSchemaRow(Row row) + public void updateKeyspace(String ksName) { - try - { - return ByteBufferUtil.string(row.key.getKey()).equals(SystemKeyspace.NAME); - } - catch (CharacterCodingException e) + KSMetaData oldKsm = getKSMetaData(ksName); + assert oldKsm != null; + KSMetaData newKsm = LegacySchemaTables.createKeyspaceFromName(ksName).cloneWith(oldKsm.cfMetaData().values(), oldKsm.userTypes); + + setKeyspaceDefinition(newKsm); + + Keyspace.open(ksName).createReplicationStrategy(newKsm); + MigrationManager.instance.notifyUpdateKeyspace(newKsm); + } + + public void dropKeyspace(String ksName) + { + KSMetaData ksm = Schema.instance.getKSMetaData(ksName); + String snapshotName = Keyspace.getTimestampedSnapshotName(ksName); + + CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true); + + Keyspace keyspace = Keyspace.open(ksm.name); + + // remove all cfs from the keyspace instance. + List droppedCfs = new ArrayList<>(); + for (CFMetaData cfm : ksm.cfMetaData().values()) { - throw new RuntimeException(e); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfm.cfName); + + purge(cfm); + + if (DatabaseDescriptor.isAutoSnapshot()) + cfs.snapshot(snapshotName); + Keyspace.open(ksm.name).dropCf(cfm.cfId); + + droppedCfs.add(cfm.cfId); } + + // remove the keyspace from the static instances. + Keyspace.clear(ksm.name); + clearKeyspaceDefinition(ksm); + + keyspace.writeOrder.awaitNewBarrier(); + + // force a new segment in the CL + CommitLog.instance.forceRecycleAllSegments(droppedCfs); + + MigrationManager.instance.notifyDropKeyspace(ksm); + } + + public void addTable(CFMetaData cfm) + { + assert getCFMetaData(cfm.ksName, cfm.cfName) == null; + KSMetaData ksm = getKSMetaData(cfm.ksName).cloneWithTableAdded(cfm); + + logger.info("Loading {}", cfm); + + load(cfm); + + // make sure it's init-ed w/ the old definitions first, + // since we're going to call initCf on the new one manually + Keyspace.open(cfm.ksName); + + setKeyspaceDefinition(ksm); + Keyspace.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true); + MigrationManager.instance.notifyCreateColumnFamily(cfm); + } + + public void updateTable(String ksName, String tableName) + { + CFMetaData cfm = getCFMetaData(ksName, tableName); + assert cfm != null; + cfm.reload(); + + Keyspace keyspace = Keyspace.open(cfm.ksName); + keyspace.getColumnFamilyStore(cfm.cfName).reload(); + MigrationManager.instance.notifyUpdateColumnFamily(cfm); + } + + public void dropTable(String ksName, String tableName) + { + KSMetaData ksm = getKSMetaData(ksName); + assert ksm != null; + ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(tableName); + assert cfs != null; + + // reinitialize the keyspace. + CFMetaData cfm = ksm.cfMetaData().get(tableName); + + purge(cfm); + setKeyspaceDefinition(ksm.cloneWithTableRemoved(cfm)); + + CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true); + + if (DatabaseDescriptor.isAutoSnapshot()) + cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name)); + Keyspace.open(ksm.name).dropCf(cfm.cfId); + MigrationManager.instance.notifyDropColumnFamily(cfm); + + CommitLog.instance.forceRecycleAllSegments(Collections.singleton(cfm.cfId)); + } + + public void addType(UserType ut) + { + KSMetaData ksm = getKSMetaData(ut.keyspace); + assert ksm != null; + + logger.info("Loading {}", ut); + + ksm.userTypes.addType(ut); + + MigrationManager.instance.notifyCreateUserType(ut); + } + + public void updateType(UserType ut) + { + KSMetaData ksm = getKSMetaData(ut.keyspace); + assert ksm != null; + + logger.info("Updating {}", ut); + + ksm.userTypes.addType(ut); + + MigrationManager.instance.notifyUpdateUserType(ut); + } + + public void dropType(UserType ut) + { + KSMetaData ksm = getKSMetaData(ut.keyspace); + assert ksm != null; + + ksm.userTypes.removeType(ut); + + MigrationManager.instance.notifyDropUserType(ut); + } + + public void addFunction(UDFunction udf) + { + logger.info("Loading {}", udf); + + Functions.addFunction(udf); + + MigrationManager.instance.notifyCreateFunction(udf); + } + + public void updateFunction(UDFunction udf) + { + logger.info("Updating {}", udf); + + Functions.replaceFunction(udf); + + MigrationManager.instance.notifyUpdateFunction(udf); + } + + public void dropFunction(UDFunction udf) + { + logger.info("Drop {}", udf); + + // TODO: this is kind of broken as this remove all overloads of the function name + Functions.removeFunction(udf.name(), udf.argTypes()); + + MigrationManager.instance.notifyDropFunction(udf); + } + + public void addAggregate(UDAggregate udf) + { + logger.info("Loading {}", udf); + + Functions.addFunction(udf); + + MigrationManager.instance.notifyCreateAggregate(udf); + } + + public void updateAggregate(UDAggregate udf) + { + logger.info("Updating {}", udf); + + Functions.replaceFunction(udf); + + MigrationManager.instance.notifyUpdateAggregate(udf); + } + + public void dropAggregate(UDAggregate udf) + { + logger.info("Drop {}", udf); + + // TODO: this is kind of broken as this remove all overloads of the function name + Functions.removeFunction(udf.name(), udf.argTypes()); + + MigrationManager.instance.notifyDropAggregate(udf); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/TriggerDefinition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/TriggerDefinition.java b/src/java/org/apache/cassandra/config/TriggerDefinition.java index a395549..6a84379 100644 --- a/src/java/org/apache/cassandra/config/TriggerDefinition.java +++ b/src/java/org/apache/cassandra/config/TriggerDefinition.java @@ -18,20 +18,10 @@ */ package org.apache.cassandra.config; -import java.util.*; - import com.google.common.base.Objects; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.marshal.UTF8Type; - public class TriggerDefinition { - public static final String TRIGGER_NAME = "trigger_name"; - public static final String TRIGGER_OPTIONS = "trigger_options"; public static final String CLASS = "class"; public final String name; @@ -51,59 +41,6 @@ public class TriggerDefinition return new TriggerDefinition(name, classOption); } - /** - * Deserialize triggers from storage-level representation. - * - * @param serializedTriggers storage-level partition containing the trigger definitions - * @return the list of processed TriggerDefinitions - */ - public static List fromSchema(Row serializedTriggers) - { - List triggers = new ArrayList<>(); - String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.SCHEMA_TRIGGERS_TABLE); - for (UntypedResultSet.Row row : QueryProcessor.resultify(query, serializedTriggers)) - { - String name = row.getString(TRIGGER_NAME); - String classOption = row.getMap(TRIGGER_OPTIONS, UTF8Type.instance, UTF8Type.instance).get(CLASS); - triggers.add(new TriggerDefinition(name, classOption)); - } - return triggers; - } - - /** - * Add specified trigger to the schema using given mutation. - * - * @param mutation The schema mutation - * @param cfName The name of the parent ColumnFamily - * @param timestamp The timestamp to use for the columns - */ - public void toSchema(Mutation mutation, String cfName, long timestamp) - { - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_TABLE); - - CFMetaData cfm = SystemKeyspace.SchemaTriggersTable; - Composite prefix = cfm.comparator.make(cfName, name); - CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp); - - adder.addMapEntry(TRIGGER_OPTIONS, CLASS, classOption); - } - - /** - * Drop specified trigger from the schema using given mutation. - * - * @param mutation The schema mutation - * @param cfName The name of the parent ColumnFamily - * @param timestamp The timestamp to use for the tombstone - */ - public void deleteFromSchema(Mutation mutation, String cfName, long timestamp) - { - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_TABLE); - int ldt = (int) (System.currentTimeMillis() / 1000); - - Composite prefix = SystemKeyspace.SchemaTriggersTable.comparator.make(cfName, name); - cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); - } - @Override public boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/UTMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/UTMetaData.java b/src/java/org/apache/cassandra/config/UTMetaData.java index 46a7a4f..08cedee 100644 --- a/src/java/org/apache/cassandra/config/UTMetaData.java +++ b/src/java/org/apache/cassandra/config/UTMetaData.java @@ -20,12 +20,7 @@ package org.apache.cassandra.config; import java.nio.ByteBuffer; import java.util.*; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.exceptions.RequestValidationException; -import org.apache.cassandra.cql3.*; -import org.apache.cassandra.utils.ByteBufferUtil; /** * Defined (and loaded) user types. @@ -42,91 +37,11 @@ public final class UTMetaData this(new HashMap()); } - UTMetaData(Map types) + public UTMetaData(Map types) { this.userTypes = types; } - private static UserType fromSchema(UntypedResultSet.Row row) - { - try - { - String keyspace = row.getString("keyspace_name"); - ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name")); - List rawColumns = row.getList("field_names", UTF8Type.instance); - List rawTypes = row.getList("field_types", UTF8Type.instance); - - List columns = new ArrayList<>(rawColumns.size()); - for (String rawColumn : rawColumns) - columns.add(ByteBufferUtil.bytes(rawColumn)); - - List> types = new ArrayList<>(rawTypes.size()); - for (String rawType : rawTypes) - types.add(TypeParser.parse(rawType)); - - return new UserType(keyspace, name, columns, types); - } - catch (RequestValidationException e) - { - // If it has been written in the schema, it should be valid - throw new AssertionError(); - } - } - - public static Map fromSchema(Row row) - { - UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_USER_TYPES_TABLE, row); - Map types = new HashMap<>(results.size()); - for (UntypedResultSet.Row result : results) - { - UserType type = fromSchema(result); - types.put(type.name, type); - } - return types; - } - - public static Mutation toSchema(UserType newType, long timestamp) - { - return toSchema(new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(newType.keyspace)), newType, timestamp); - } - - public static Mutation toSchema(Mutation mutation, UserType newType, long timestamp) - { - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_TABLE); - - Composite prefix = SystemKeyspace.SchemaUserTypesTable.comparator.make(newType.name); - CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp); - - adder.resetCollection("field_names"); - adder.resetCollection("field_types"); - - for (int i = 0; i < newType.size(); i++) - { - adder.addListEntry("field_names", newType.fieldName(i)); - adder.addListEntry("field_types", newType.fieldType(i).toString()); - } - return mutation; - } - - public Mutation toSchema(Mutation mutation, long timestamp) - { - for (UserType ut : userTypes.values()) - toSchema(mutation, ut, timestamp); - return mutation; - } - - public static Mutation dropFromSchema(UserType droppedType, long timestamp) - { - Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(droppedType.keyspace)); - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_TABLE); - int ldt = (int) (System.currentTimeMillis() / 1000); - - Composite prefix = SystemKeyspace.SchemaUserTypesTable.comparator.make(droppedType.name); - cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); - - return mutation; - } - public UserType getType(ByteBuffer typeName) { return userTypes.get(typeName); @@ -134,11 +49,11 @@ public final class UTMetaData public Map getAllTypes() { - // Copy to avoid concurrent modification while iterating. Not intended to be called on a criticial path anyway + // Copy to avoid concurrent modification while iterating. Not intended to be called on a critical path anyway return new HashMap<>(userTypes); } - // This is *not* thread safe but is only called in DefsTables that is synchronized. + // This is *not* thread safe but is only called in Schema that is synchronized. public void addType(UserType type) { UserType old = userTypes.get(type.name); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/Functions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java index 7d94e47..b55ebc5 100644 --- a/src/java/org/apache/cassandra/cql3/functions/Functions.java +++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java @@ -22,12 +22,9 @@ import java.util.Collection; import java.util.List; import com.google.common.collect.ArrayListMultimap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.*; -import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.service.IMigrationListener; @@ -35,16 +32,11 @@ import org.apache.cassandra.service.MigrationManager; public abstract class Functions { - private static final Logger logger = LoggerFactory.getLogger(Functions.class); - // We special case the token function because that's the only function whose argument types actually // depend on the table on which the function is called. Because it's the sole exception, it's easier // to handle it as a special case. private static final FunctionName TOKEN_FUNCTION_NAME = FunctionName.nativeFunction("token"); - private static final String SELECT_UD_FUNCTION = "SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.SCHEMA_FUNCTIONS_TABLE; - private static final String SELECT_UD_AGGREGATE = "SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.SCHEMA_AGGREGATES_TABLE; - private Functions() {} private static final ArrayListMultimap declared = ArrayListMultimap.create(); @@ -96,18 +88,6 @@ public abstract class Functions declared.put(fun.name(), fun); } - /** - * Loading existing UDFs from the schema. - */ - public static void loadUDFFromSchema() - { - logger.debug("Loading UDFs"); - for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_UD_FUNCTION)) - addFunction(UDFunction.fromSchema(row)); - for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_UD_AGGREGATE)) - addFunction(UDAggregate.fromSchema(row)); - } - public static ColumnSpecification makeArgSpec(String receiverKs, String receiverCf, Function fun, int i) { return new ColumnSpecification(receiverKs, @@ -270,7 +250,7 @@ public abstract class Functions return sb.toString(); } - // This is *not* thread safe but is only called in DefsTables that is synchronized. + // This is *not* thread safe but is only called in SchemaTables that is synchronized. public static void addFunction(AbstractFunction fun) { // We shouldn't get there unless that function don't exist http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java index 5b1f5bd..e4e6a55 100644 --- a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java +++ b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java @@ -76,9 +76,8 @@ public final class JavaSourceUDFFactory // It is separated to allow return type and argument type checks during compile time via javassist. String codeExecInt = generateExecuteInternalMethod(argNames, body, javaReturnType, javaParamTypes); - if (logger.isDebugEnabled()) - logger.debug("Generating java source UDF for {} with following c'tor and functions:\n{}\n{}\n{}", - name, codeCtor, codeExecInt, codeExec); + logger.debug("Generating java source UDF for {} with following c'tor and functions:\n{}\n{}\n{}", + name, codeCtor, codeExecInt, codeExec); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java index f259265..e9c33ba 100644 --- a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java +++ b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java @@ -24,12 +24,7 @@ import com.google.common.base.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cql3.*; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.TypeParser; -import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.exceptions.*; /** @@ -58,6 +53,45 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction this.initcond = initcond; } + public static UDAggregate create(FunctionName name, + List> argTypes, + AbstractType returnType, + FunctionName stateFunc, + FunctionName finalFunc, + AbstractType stateType, + ByteBuffer initcond) + throws InvalidRequestException + { + List> stateTypes = new ArrayList<>(argTypes.size() + 1); + stateTypes.add(stateType); + stateTypes.addAll(argTypes); + List> finalTypes = Collections.>singletonList(stateType); + return new UDAggregate(name, + argTypes, + returnType, + resolveScalar(name, stateFunc, stateTypes), + finalFunc != null ? resolveScalar(name, finalFunc, finalTypes) : null, + initcond); + } + + public static UDAggregate createBroken(FunctionName name, + List> argTypes, + AbstractType returnType, + ByteBuffer initcond, + final InvalidRequestException reason) + { + return new UDAggregate(name, argTypes, returnType, null, null, initcond) + { + public Aggregate newAggregate() throws InvalidRequestException + { + throw new InvalidRequestException(String.format("Aggregate '%s' exists but hasn't been loaded successfully for the following reason: %s. " + + "Please see the server log for more details", + this, + reason.getMessage())); + } + }; + } + public boolean hasReferenceTo(Function function) { return stateFunction == function || finalFunction == function; @@ -85,6 +119,26 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction return false; } + public ScalarFunction stateFunction() + { + return stateFunction; + } + + public ScalarFunction finalFunction() + { + return finalFunction; + } + + public ByteBuffer initialCondition() + { + return initcond; + } + + public AbstractType stateType() + { + return stateType; + } + public Aggregate newAggregate() throws InvalidRequestException { return new Aggregate() @@ -128,134 +182,6 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction return (ScalarFunction) func; } - private static Mutation makeSchemaMutation(FunctionName name) - { - UTF8Type kv = (UTF8Type)SystemKeyspace.SchemaAggregatesTable.getKeyValidator(); - return new Mutation(SystemKeyspace.NAME, kv.decompose(name.keyspace)); - } - - public Mutation toSchemaDrop(long timestamp) - { - Mutation mutation = makeSchemaMutation(name); - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_AGGREGATES_TABLE); - - Composite prefix = SystemKeyspace.SchemaAggregatesTable.comparator.make(name.name, UDHelper.computeSignature(argTypes)); - int ldt = (int) (System.currentTimeMillis() / 1000); - cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); - - return mutation; - } - - public static Map fromSchema(Row row) - { - UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_AGGREGATES_TABLE, row); - Map udfs = new HashMap<>(results.size()); - for (UntypedResultSet.Row result : results) - udfs.put(SystemKeyspace.SchemaAggregatesTable.comparator.make(result.getString("aggregate_name"), result.getBlob("signature")), - fromSchema(result)); - return udfs; - } - - public Mutation toSchemaUpdate(long timestamp) - { - Mutation mutation = makeSchemaMutation(name); - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_AGGREGATES_TABLE); - - Composite prefix = SystemKeyspace.SchemaAggregatesTable.comparator.make(name.name, UDHelper.computeSignature(argTypes)); - CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp); - - adder.resetCollection("argument_types"); - adder.add("return_type", returnType.toString()); - adder.add("state_func", stateFunction.name().name); - if (stateType != null) - adder.add("state_type", stateType.toString()); - if (finalFunction != null) - adder.add("final_func", finalFunction.name().name); - if (initcond != null) - adder.add("initcond", initcond); - - for (AbstractType argType : argTypes) - adder.addListEntry("argument_types", argType.toString()); - - return mutation; - } - - public static UDAggregate fromSchema(UntypedResultSet.Row row) - { - String ksName = row.getString("keyspace_name"); - String functionName = row.getString("aggregate_name"); - FunctionName name = new FunctionName(ksName, functionName); - - List types = row.getList("argument_types", UTF8Type.instance); - - List> argTypes; - if (types == null) - { - argTypes = Collections.emptyList(); - } - else - { - argTypes = new ArrayList<>(types.size()); - for (String type : types) - argTypes.add(parseType(type)); - } - - AbstractType returnType = parseType(row.getString("return_type")); - - FunctionName stateFunc = new FunctionName(ksName, row.getString("state_func")); - FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null; - AbstractType stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null; - ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null; - - try - { - return create(name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond); - } - catch (InvalidRequestException reason) - { - return createBroken(name, argTypes, returnType, initcond, reason); - } - } - - private static UDAggregate createBroken(FunctionName name, List> argTypes, AbstractType returnType, - ByteBuffer initcond, final InvalidRequestException reason) - { - return new UDAggregate(name, argTypes, returnType, null, null, initcond) { - public Aggregate newAggregate() throws InvalidRequestException - { - throw new InvalidRequestException(String.format("Aggregate '%s' exists but hasn't been loaded successfully for the following reason: %s. " - + "Please see the server log for more details", this, reason.getMessage())); - } - }; - } - - private static UDAggregate create(FunctionName name, List> argTypes, AbstractType returnType, - FunctionName stateFunc, FunctionName finalFunc, AbstractType stateType, ByteBuffer initcond) - throws InvalidRequestException - { - List> stateTypes = new ArrayList<>(argTypes.size() + 1); - stateTypes.add(stateType); - stateTypes.addAll(argTypes); - List> finalTypes = Collections.>singletonList(stateType); - return new UDAggregate(name, argTypes, returnType, - resolveScalar(name, stateFunc, stateTypes), - finalFunc != null ? resolveScalar(name, finalFunc, finalTypes) : null, - initcond); - } - - private static AbstractType parseType(String str) - { - // We only use this when reading the schema where we shouldn't get an error - try - { - return TypeParser.parse(str); - } - catch (SyntaxException | ConfigurationException e) - { - throw new RuntimeException(e); - } - } - @Override public boolean equals(Object o) { @@ -263,13 +189,13 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction return false; UDAggregate that = (UDAggregate) o; - return Objects.equal(this.name, that.name) - && Functions.typeEquals(this.argTypes, that.argTypes) - && Functions.typeEquals(this.returnType, that.returnType) - && Objects.equal(this.stateFunction, that.stateFunction) - && Objects.equal(this.finalFunction, that.finalFunction) - && Objects.equal(this.stateType, that.stateType) - && Objects.equal(this.initcond, that.initcond); + return Objects.equal(name, that.name) + && Functions.typeEquals(argTypes, that.argTypes) + && Functions.typeEquals(returnType, that.returnType) + && Objects.equal(stateFunction, that.stateFunction) + && Objects.equal(finalFunction, that.finalFunction) + && Objects.equal(stateType, that.stateType) + && Objects.equal(initcond, that.initcond); } @Override