cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [3/3] git commit: merge from 1.2
Date Thu, 18 Jul 2013 16:45:33 GMT
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cdc57b3c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cdc57b3c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cdc57b3c

Branch: refs/heads/trunk
Commit: cdc57b3c36a3418ca2a97f1b9c7547a03b6ce57e
Parents: d881024 b010784
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Thu Jul 18 09:45:28 2013 -0700
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Thu Jul 18 09:45:28 2013 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 .../org/apache/cassandra/db/DefsTables.java     | 20 +++++++++-----------
 .../org/apache/cassandra/db/SystemKeyspace.java |  9 +++++----
 3 files changed, 15 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cdc57b3c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 45da623,253c649..98a0cdb
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,91 -1,5 +1,91 @@@
 +2.0.0-beta2
 + * Allow nodetool with no args, and with help to run without a server (CASSANDRA-5734)
 + * Cleanup AbstractType/TypeSerializer classes (CASSANDRA-5744)
 + * Remove unimplemented cli option schema-mwt (CASSANDRA-5754)
 + * Support range tombstones in thrift (CASSANDRA-5435)
 + * Normalize table-manipulating CQL3 statements' class names (CASSANDRA-5759)
 + * cqlsh: add missing table options to DESCRIBE output (CASSANDRA-5749)
 + * Fix assertion error during repair (CASSANDRA-5757)
 +
 +2.0.0-beta1
 + * Removed on-heap row cache (CASSANDRA-5348)
 + * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
 + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
 + * Experimental triggers (CASSANDRA-1311)
 + * JEMalloc support for off-heap allocation (CASSANDRA-3997)
 + * Single-pass compaction (CASSANDRA-4180)
 + * Removed token range bisection (CASSANDRA-5518)
 + * Removed compatibility with pre-1.2.5 sstables and network messages
 +   (CASSANDRA-5511)
 + * removed PBSPredictor (CASSANDRA-5455)
 + * CAS support (CASSANDRA-5062, 5441, 5442, 5443, 5619, 5667)
 + * Leveled compaction performs size-tiered compactions in L0 
 +   (CASSANDRA-5371, 5439)
 + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
 + * Log when a node is down longer than the hint window (CASSANDRA-4554)
 + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
 + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
 + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
 + * Change Message IDs to ints (CASSANDRA-5307)
 + * Move sstable level information into the Stats component, removing the
 +   need for a separate Manifest file (CASSANDRA-4872)
 + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
 + * make index_interval configurable per columnfamily (CASSANDRA-3961, CASSANDRA-5650)
 + * add default_time_to_live (CASSANDRA-3974)
 + * add memtable_flush_period_in_ms (CASSANDRA-4237)
 + * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
 + * upgrade thrift to 0.9.0 (CASSANDRA-3719)
 + * drop unnecessary keyspace parameter from user-defined compaction API 
 +   (CASSANDRA-5139)
 + * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
 + * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
 + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
 + * Allow custom configuration loader (CASSANDRA-5045)
 + * Remove memory emergency pressure valve logic (CASSANDRA-3534)
 + * Reduce request latency with eager retry (CASSANDRA-4705)
 + * cqlsh: Remove ASSUME command (CASSANDRA-5331)
 + * Rebuild BF when loading sstables if bloom_filter_fp_chance
 +   has changed since compaction (CASSANDRA-5015)
 + * remove row-level bloom filters (CASSANDRA-4885)
 + * Change Kernel Page Cache skipping into row preheating (disabled by default)
 +   (CASSANDRA-4937)
 + * Improve repair by deciding on a gcBefore before sending
 +   out TreeRequests (CASSANDRA-4932)
 + * Add an official way to disable compactions (CASSANDRA-5074)
 + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
 + * Add binary protocol versioning (CASSANDRA-5436)
 + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
 + * Add alias support to SELECT statement (CASSANDRA-5075)
 + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
 + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
 + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
 + * Track max/min column names in sstables to be able to optimize slice
 +   queries (CASSANDRA-5514, CASSANDRA-5595, CASSANDRA-5600)
 + * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
 + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
 + * Support native link w/o JNA in Java7 (CASSANDRA-3734)
 + * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
 + * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
 + * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
 + * Include a timestamp with all read commands to determine column expiration
 +   (CASSANDRA-5149)
 + * Streaming 2.0 (CASSANDRA-5286, 5699)
 + * Conditional create/drop ks/table/index statements in CQL3 (CASSANDRA-2737)
 + * more pre-table creation property validation (CASSANDRA-5693)
 + * Redesign repair messages (CASSANDRA-5426)
 + * Fix ALTER RENAME post-5125 (CASSANDRA-5702)
 + * Disallow renaming a 2ndary indexed column (CASSANDRA-5705)
 + * Rename Table to Keyspace (CASSANDRA-5613)
 + * Ensure changing column_index_size_in_kb on different nodes don't corrupt the
 +   sstable (CASSANDRA-5454)
 + * Move resultset type information into prepare, not execute (CASSANDRA-5649)
 + * Auto paging in binary protocol (CASSANDRA-4415, 5714)
 + * Don't tie client side use of AbstractType to JDBC (CASSANDRA-4495)
 + * Adds new TimestampType to replace DateType (CASSANDRA-5723, CASSANDRA-5729)
 +
 +
  1.2.7
-  * add cassandra.unsafetruncate property (CASSANDRA-5704)
+  * add cassandra.unsafesystem property (CASSANDRA-5704)
   * (Hadoop) quote identifiers in CqlPagingRecordReader (CASSANDRA-5763)
   * Add replace_node functionality for vnodes (CASSANDRA-5337)
   * Add timeout events to query traces (CASSANDRA-5520)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cdc57b3c/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/DefsTables.java
index 6f35ed8,0000000..22a74c6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@@ -1,470 -1,0 +1,468 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.MapDifference;
 +import com.google.common.collect.Maps;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import org.apache.cassandra.config.*;
++import org.apache.cassandra.config.CFMetaData;
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.config.KSMetaData;
++import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.QueryFilter;
 +import org.apache.cassandra.db.marshal.AsciiType;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.MigrationManager;
++import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
- import org.apache.cassandra.utils.FBUtilities;
 +
 +/**
 + * SCHEMA_{KEYSPACES, COLUMNFAMILIES, COLUMNS}_CF are used to store Keyspace/ColumnFamily attributes to make schema
 + * load/distribution easy, it replaces old mechanism when local migrations where serialized, stored in system.Migrations
 + * and used for schema distribution.
 + *
 + * SCHEMA_KEYSPACES_CF layout:
 + *
 + * <key (AsciiType)>
 + *   ascii => json_serialized_value
 + *   ...
 + * </key>
 + *
 + * Where <key> is a name of keyspace e.g. "ks".
 + *
 + * SCHEMA_COLUMNFAMILIES_CF layout:
 + *
 + * <key (AsciiType)>
 + *     composite(ascii, ascii) => json_serialized_value
 + * </key>
 + *
 + * Where <key> is a name of keyspace e.g. "ks"., first component of the column name is name of the ColumnFamily, last
 + * component is the name of the ColumnFamily attribute.
 + *
 + * SCHEMA_COLUMNS_CF layout:
 + *
 + * <key (AsciiType)>
 + *     composite(ascii, ascii, ascii) => json_serialized value
 + * </key>
 + *
 + * Where <key> is a name of keyspace e.g. "ks".
 + *
 + * Column names where made composite to support 3-level nesting which represents following structure:
 + * "ColumnFamily name":"column name":"column attribute" => "value"
 + *
 + * Example of schema (using CLI):
 + *
 + * schema_keyspaces
 + * ----------------
 + * RowKey: ks
 + *  => (column=durable_writes, value=true, timestamp=1327061028312185000)
 + *  => (column=name, value="ks", timestamp=1327061028312185000)
 + *  => (column=replication_factor, value=0, timestamp=1327061028312185000)
 + *  => (column=strategy_class, value="org.apache.cassandra.locator.NetworkTopologyStrategy", timestamp=1327061028312185000)
 + *  => (column=strategy_options, value={"datacenter1":"1"}, timestamp=1327061028312185000)
 + *
 + * schema_columnfamilies
 + * ---------------------
 + * RowKey: ks
 + *  => (column=cf:bloom_filter_fp_chance, value=0.0, timestamp=1327061105833119000)
 + *  => (column=cf:caching, value="NONE", timestamp=1327061105833119000)
 + *  => (column=cf:column_type, value="Standard", timestamp=1327061105833119000)
 + *  => (column=cf:comment, value="ColumnFamily", timestamp=1327061105833119000)
 + *  => (column=cf:default_validation_class, value="org.apache.cassandra.db.marshal.BytesType", timestamp=1327061105833119000)
 + *  => (column=cf:gc_grace_seconds, value=864000, timestamp=1327061105833119000)
 + *  => (column=cf:id, value=1000, timestamp=1327061105833119000)
 + *  => (column=cf:key_alias, value="S0VZ", timestamp=1327061105833119000)
 + *  ... part of the output omitted.
 + *
 + * schema_columns
 + * --------------
 + * RowKey: ks
 + *  => (column=cf:c:index_name, value=null, timestamp=1327061105833119000)
 + *  => (column=cf:c:index_options, value=null, timestamp=1327061105833119000)
 + *  => (column=cf:c:index_type, value=null, timestamp=1327061105833119000)
 + *  => (column=cf:c:name, value="aGVsbG8=", timestamp=1327061105833119000)
 + *  => (column=cf:c:validation_class, value="org.apache.cassandra.db.marshal.AsciiType", timestamp=1327061105833119000)
 + */
 +public class DefsTables
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(DefsTables.class);
 +
 +    /* saves keyspace definitions to system schema columnfamilies */
 +    public static synchronized void save(Collection<KSMetaData> keyspaces)
 +    {
 +        long timestamp = System.currentTimeMillis();
 +
 +        for (KSMetaData ksMetaData : keyspaces)
 +            ksMetaData.toSchema(timestamp).apply();
 +    }
 +
 +    /**
 +     * Load keyspace definitions for the system keyspace (system.SCHEMA_KEYSPACES_CF)
 +     *
 +     * @return Collection of found keyspace definitions
 +     */
 +    public static Collection<KSMetaData> loadFromKeyspace()
 +    {
 +        List<Row> serializedSchema = SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
 +
 +        List<KSMetaData> keyspaces = new ArrayList<KSMetaData>(serializedSchema.size());
 +
 +        for (Row row : serializedSchema)
 +        {
 +            if (Schema.invalidSchemaRow(row) || Schema.ignoredSchemaRow(row))
 +                continue;
 +
 +            keyspaces.add(KSMetaData.fromSchema(row, serializedColumnFamilies(row.key)));
 +        }
 +
 +        return keyspaces;
 +    }
 +
 +    public static ByteBuffer searchComposite(String name, boolean start)
 +    {
 +        assert name != null;
 +        ByteBuffer nameBytes = UTF8Type.instance.decompose(name);
 +        int length = nameBytes.remaining();
 +        byte[] bytes = new byte[2 + length + 1];
 +        bytes[0] = (byte)((length >> 8) & 0xFF);
 +        bytes[1] = (byte)(length & 0xFF);
 +        ByteBufferUtil.arrayCopy(nameBytes, 0, bytes, 2, length);
 +        bytes[bytes.length - 1] = (byte)(start ? 0 : 1);
 +        return ByteBuffer.wrap(bytes);
 +    }
 +
 +    private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
 +    {
 +        ColumnFamilyStore cfsStore = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
 +        return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey,
 +                                                                                         SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF,
 +                                                                                         System.currentTimeMillis())));
 +    }
 +
 +    /**
 +     * Merge remote schema in form of row mutations with local and mutate ks/cf metadata objects
 +     * (which also involves fs operations on add/drop ks/cf)
 +     *
 +     * @param mutations the schema changes to apply
 +     *
 +     * @throws ConfigurationException If one of metadata attributes has invalid value
 +     * @throws IOException If data was corrupted during transportation or failed to apply fs operations
 +     */
 +    public static synchronized void mergeSchema(Collection<RowMutation> mutations) throws ConfigurationException, IOException
 +    {
 +        // current state of the schema
 +        Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
 +        Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
 +
 +        for (RowMutation mutation : mutations)
 +            mutation.apply();
 +
 +        if (!StorageService.instance.isClientMode())
 +            flushSchemaCFs();
 +
 +        Schema.instance.updateVersionAndAnnounce();
 +
 +        // with new data applied
 +        Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
 +        Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
 +
 +        Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
 +        mergeColumnFamilies(oldColumnFamilies, newColumnFamilies);
 +
 +        // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
 +        for (String keyspaceToDrop : keyspacesToDrop)
 +            dropKeyspace(keyspaceToDrop);
 +
 +    }
 +
 +    private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
 +    {
 +        // calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty)
 +        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
 +
 +        /**
 +         * At first step we check if any new keyspaces were added.
 +         */
 +        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
 +        {
 +            ColumnFamily ksAttrs = entry.getValue();
 +
 +            // we don't care about nested ColumnFamilies here because those are going to be processed separately
 +            if (!(ksAttrs.getColumnCount() == 0))
 +                addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), entry.getValue()), Collections.<CFMetaData>emptyList()));
 +        }
 +
 +        /**
 +         * At second step we check if there were any keyspaces re-created, in this context
 +         * re-created means that they were previously deleted but still exist in the low-level schema as empty keys
 +         */
 +
 +        Map<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntries = diff.entriesDiffering();
 +
 +        // instead of looping over all modified entries and skipping processed keys all the time
 +        // we would rather store "left to process" items and iterate over them removing already met keys
 +        List<DecoratedKey> leftToProcess = new ArrayList<DecoratedKey>(modifiedEntries.size());
 +
 +        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : modifiedEntries.entrySet())
 +        {
 +            ColumnFamily prevValue = entry.getValue().leftValue();
 +            ColumnFamily newValue = entry.getValue().rightValue();
 +
 +            if (prevValue.getColumnCount() == 0)
 +            {
 +                addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), newValue), Collections.<CFMetaData>emptyList()));
 +                continue;
 +            }
 +
 +            leftToProcess.add(entry.getKey());
 +        }
 +
 +        if (leftToProcess.size() == 0)
 +            return Collections.emptySet();
 +
 +        /**
 +         * At final step we updating modified keyspaces and saving keyspaces drop them later
 +         */
 +
 +        Set<String> keyspacesToDrop = new HashSet<String>();
 +
 +        for (DecoratedKey key : leftToProcess)
 +        {
 +            MapDifference.ValueDifference<ColumnFamily> valueDiff = modifiedEntries.get(key);
 +
 +            ColumnFamily newState = valueDiff.rightValue();
 +
 +            if (newState.getColumnCount() == 0)
 +                keyspacesToDrop.add(AsciiType.instance.getString(key.key));
 +            else
 +                updateKeyspace(KSMetaData.fromSchema(new Row(key, newState), Collections.<CFMetaData>emptyList()));
 +        }
 +
 +        return keyspacesToDrop;
 +    }
 +
 +    private static void mergeColumnFamilies(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
 +    {
 +        // calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty)
 +        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
 +
 +        // check if any new Keyspaces with ColumnFamilies were added.
 +        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
 +        {
 +            ColumnFamily cfAttrs = entry.getValue();
 +
 +            if (!(cfAttrs.getColumnCount() == 0))
 +            {
 +               Map<String, CFMetaData> cfDefs = KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), cfAttrs));
 +
 +                for (CFMetaData cfDef : cfDefs.values())
 +                    addColumnFamily(cfDef);
 +            }
 +        }
 +
 +        // deal with modified ColumnFamilies (remember that all of the keyspace nested ColumnFamilies are put to the single row)
 +        Map<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntries = diff.entriesDiffering();
 +
 +        for (DecoratedKey keyspace : modifiedEntries.keySet())
 +        {
 +            MapDifference.ValueDifference<ColumnFamily> valueDiff = modifiedEntries.get(keyspace);
 +
 +            ColumnFamily prevValue = valueDiff.leftValue(); // state before external modification
 +            ColumnFamily newValue = valueDiff.rightValue(); // updated state
 +
 +            Row newRow = new Row(keyspace, newValue);
 +
 +            if (prevValue.getColumnCount() == 0) // whole keyspace was deleted and now it's re-created
 +            {
 +                for (CFMetaData cfm : KSMetaData.deserializeColumnFamilies(newRow).values())
 +                    addColumnFamily(cfm);
 +            }
 +            else if (newValue.getColumnCount() == 0) // whole keyspace is deleted
 +            {
 +                for (CFMetaData cfm : KSMetaData.deserializeColumnFamilies(new Row(keyspace, prevValue)).values())
 +                    dropColumnFamily(cfm.ksName, cfm.cfName);
 +            }
 +            else // has modifications in the nested ColumnFamilies, need to perform nested diff to determine what was really changed
 +            {
 +                String ksName = AsciiType.instance.getString(keyspace.key);
 +
 +                Map<String, CFMetaData> oldCfDefs = new HashMap<String, CFMetaData>();
 +                for (CFMetaData cfm : Schema.instance.getKSMetaData(ksName).cfMetaData().values())
 +                    oldCfDefs.put(cfm.cfName, cfm);
 +
 +                Map<String, CFMetaData> newCfDefs = KSMetaData.deserializeColumnFamilies(newRow);
 +
 +                MapDifference<String, CFMetaData> cfDefDiff = Maps.difference(oldCfDefs, newCfDefs);
 +
 +                for (CFMetaData cfDef : cfDefDiff.entriesOnlyOnRight().values())
 +                    addColumnFamily(cfDef);
 +
 +                for (CFMetaData cfDef : cfDefDiff.entriesOnlyOnLeft().values())
 +                    dropColumnFamily(cfDef.ksName, cfDef.cfName);
 +
 +                for (MapDifference.ValueDifference<CFMetaData> cfDef : cfDefDiff.entriesDiffering().values())
 +                    updateColumnFamily(cfDef.rightValue());
 +            }
 +        }
 +    }
 +
 +    private static void addKeyspace(KSMetaData ksm)
 +    {
 +        assert Schema.instance.getKSMetaData(ksm.name) == null;
 +        Schema.instance.load(ksm);
 +
 +        if (!StorageService.instance.isClientMode())
 +        {
 +            Keyspace.open(ksm.name);
 +            MigrationManager.instance.notifyCreateKeyspace(ksm);
 +        }
 +    }
 +
 +    private static void addColumnFamily(CFMetaData cfm)
 +    {
 +        assert Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName) == null;
 +        KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName);
 +        ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm)));
 +
 +        logger.info("Loading " + cfm);
 +
 +        Schema.instance.load(cfm);
 +
 +        // make sure it's init-ed w/ the old definitions first,
 +        // since we're going to call initCf on the new one manually
 +        Keyspace.open(cfm.ksName);
 +
 +        Schema.instance.setKeyspaceDefinition(ksm);
 +
 +        if (!StorageService.instance.isClientMode())
 +        {
 +            Keyspace.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
 +            MigrationManager.instance.notifyCreateColumnFamily(cfm);
 +        }
 +    }
 +
 +    private static void updateKeyspace(KSMetaData newState)
 +    {
 +        KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name);
 +        assert oldKsm != null;
 +        KSMetaData newKsm = KSMetaData.cloneWith(oldKsm.reloadAttributes(), oldKsm.cfMetaData().values());
 +
 +        Schema.instance.setKeyspaceDefinition(newKsm);
 +
 +        if (!StorageService.instance.isClientMode())
 +        {
 +            Keyspace.open(newState.name).createReplicationStrategy(newKsm);
 +            MigrationManager.instance.notifyUpdateKeyspace(newKsm);
 +        }
 +    }
 +
 +    private static void updateColumnFamily(CFMetaData newState)
 +    {
 +        CFMetaData cfm = Schema.instance.getCFMetaData(newState.ksName, newState.cfName);
 +        assert cfm != null;
 +        cfm.reload();
 +
 +        if (!StorageService.instance.isClientMode())
 +        {
 +            Keyspace keyspace = Keyspace.open(cfm.ksName);
 +            keyspace.getColumnFamilyStore(cfm.cfName).reload();
 +            MigrationManager.instance.notifyUpdateColumnFamily(cfm);
 +        }
 +    }
 +
 +    private static void dropKeyspace(String ksName)
 +    {
 +        KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
 +        String snapshotName = Keyspace.getTimestampedSnapshotName(ksName);
 +
 +        CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
 +
 +        // remove all cfs from the keyspace instance.
 +        for (CFMetaData cfm : ksm.cfMetaData().values())
 +        {
 +            ColumnFamilyStore cfs = Keyspace.open(ksm.name).getColumnFamilyStore(cfm.cfName);
 +
 +            Schema.instance.purge(cfm);
 +
 +            if (!StorageService.instance.isClientMode())
 +            {
 +                if (DatabaseDescriptor.isAutoSnapshot())
 +                    cfs.snapshot(snapshotName);
 +                Keyspace.open(ksm.name).dropCf(cfm.cfId);
 +            }
 +        }
 +
 +        // remove the keyspace from the static instances.
 +        Keyspace.clear(ksm.name);
 +        Schema.instance.clearKeyspaceDefinition(ksm);
 +        if (!StorageService.instance.isClientMode())
 +        {
 +            MigrationManager.instance.notifyDropKeyspace(ksm);
 +        }
 +    }
 +
 +    private static void dropColumnFamily(String ksName, String cfName)
 +    {
 +        KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
 +        assert ksm != null;
 +        ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName);
 +        assert cfs != null;
 +
 +        // reinitialize the keyspace.
 +        CFMetaData cfm = ksm.cfMetaData().get(cfName);
 +
 +        Schema.instance.purge(cfm);
 +        Schema.instance.setKeyspaceDefinition(makeNewKeyspaceDefinition(ksm, cfm));
 +
 +        CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
 +
 +        if (!StorageService.instance.isClientMode())
 +        {
 +            if (DatabaseDescriptor.isAutoSnapshot())
 +                cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
 +            Keyspace.open(ksm.name).dropCf(cfm.cfId);
 +            MigrationManager.instance.notifyDropColumnFamily(cfm);
 +        }
 +    }
 +
 +    private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
 +    {
 +        // clone ksm but do not include the new def
 +        List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
 +        newCfs.remove(toExclude);
 +        assert newCfs.size() == ksm.cfMetaData().size() - 1;
 +        return KSMetaData.cloneWith(ksm, newCfs);
 +    }
 +
 +    private static void flushSchemaCFs()
 +    {
-         flushSchemaCF(SystemKeyspace.SCHEMA_KEYSPACES_CF);
-         flushSchemaCF(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
-         flushSchemaCF(SystemKeyspace.SCHEMA_COLUMNS_CF);
-     }
- 
-     private static void flushSchemaCF(String cfName)
-     {
-         FBUtilities.waitOnFuture(SystemKeyspace.schemaCFS(cfName).forceFlush());
++        SystemKeyspace.forceBlockingFlush(SystemKeyspace.SCHEMA_KEYSPACES_CF);
++        SystemKeyspace.forceBlockingFlush(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
++        SystemKeyspace.forceBlockingFlush(SystemKeyspace.SCHEMA_COLUMNS_CF);
 +    }
 +}
++

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cdc57b3c/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 135df26,0000000..2c232b1
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -1,833 -1,0 +1,834 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.DataInputStream;
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.nio.ByteBuffer;
 +import java.util.*;
++import java.util.concurrent.ExecutionException;
 +
 +import com.google.common.base.Function;
 +import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.SetMultimap;
 +import com.google.common.collect.Sets;
 +import org.apache.commons.lang.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.filter.QueryFilter;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.locator.IEndpointSnitch;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.paxos.Commit;
 +import org.apache.cassandra.service.paxos.PaxosState;
 +import org.apache.cassandra.thrift.cassandraConstants;
 +import org.apache.cassandra.utils.*;
 +
 +import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
 +
 +public class SystemKeyspace
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SystemKeyspace.class);
 +
 +    // see CFMetaData for schema definitions
 +    public static final String PEERS_CF = "peers";
 +    public static final String PEER_EVENTS_CF = "peer_events";
 +    public static final String LOCAL_CF = "local";
 +    public static final String INDEX_CF = "IndexInfo";
 +    public static final String COUNTER_ID_CF = "NodeIdInfo";
 +    public static final String HINTS_CF = "hints";
 +    public static final String RANGE_XFERS_CF = "range_xfers";
 +    public static final String BATCHLOG_CF = "batchlog";
 +    // see layout description in the DefsTables class header
 +    public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";
 +    public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies";
 +    public static final String SCHEMA_COLUMNS_CF = "schema_columns";
 +    public static final String SCHEMA_TRIGGERS_CF = "schema_triggers";
 +    public static final String COMPACTION_LOG = "compactions_in_progress";
 +    public static final String PAXOS_CF = "paxos";
 +
 +    private static final String LOCAL_KEY = "local";
 +    private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
 +
 +    public enum BootstrapState
 +    {
 +        NEEDS_BOOTSTRAP,
 +        COMPLETED,
 +        IN_PROGRESS
 +    }
 +
 +    private static DecoratedKey decorate(ByteBuffer key)
 +    {
 +        return StorageService.getPartitioner().decorateKey(key);
 +    }
 +
 +    public static void finishStartup()
 +    {
 +        setupVersion();
 +
 +        // add entries to system schema columnfamilies for the hardcoded system definitions
 +        for (String ksname : Schema.systemKeyspaceNames)
 +        {
 +            KSMetaData ksmd = Schema.instance.getKSMetaData(ksname);
 +
 +            // delete old, possibly obsolete entries in schema columnfamilies
 +            for (String cfname : Arrays.asList(SystemKeyspace.SCHEMA_KEYSPACES_CF, SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, SystemKeyspace.SCHEMA_COLUMNS_CF))
 +            {
 +                String req = String.format("DELETE FROM system.%s WHERE keyspace_name = '%s'", cfname, ksmd.name);
 +                processInternal(req);
 +            }
 +
 +            // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added)
 +            ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply();
 +        }
 +    }
 +
 +    private static void setupVersion()
 +    {
 +        String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, data_center, rack, partitioner) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s')";
 +        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 +        processInternal(String.format(req, LOCAL_CF,
 +                                         LOCAL_KEY,
 +                                         FBUtilities.getReleaseVersionString(),
 +                                         QueryProcessor.CQL_VERSION.toString(),
 +                                         cassandraConstants.VERSION,
 +                                         snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
 +                                         snitch.getRack(FBUtilities.getBroadcastAddress()),
 +                                         DatabaseDescriptor.getPartitioner().getClass().getName()));
 +    }
 +
 +    /**
 +     * Write compaction log, except columfamilies under system keyspace.
 +     *
 +     * @param cfs
 +     * @param toCompact sstables to compact
 +     * @return compaction task id or null if cfs is under system keyspace
 +     */
 +    public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
 +    {
 +        if (Keyspace.SYSTEM_KS.equals(cfs.keyspace.getName()))
 +            return null;
 +
 +        UUID compactionId = UUIDGen.getTimeUUID();
 +        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
 +        Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
 +        {
 +            public Integer apply(SSTableReader sstable)
 +            {
 +                return sstable.descriptor.generation;
 +            }
 +        });
 +        processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.keyspace.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ',')));
 +        forceBlockingFlush(COMPACTION_LOG);
 +        return compactionId;
 +    }
 +
 +    public static void finishCompaction(UUID taskId)
 +    {
 +        assert taskId != null;
 +
 +        String req = "DELETE FROM system.%s WHERE id = %s";
 +        processInternal(String.format(req, COMPACTION_LOG, taskId));
 +        forceBlockingFlush(COMPACTION_LOG);
 +    }
 +
 +    /**
 +     * @return unfinished compactions, grouped by keyspace/columnfamily pair.
 +     */
 +    public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
 +    {
 +        String req = "SELECT * FROM system.%s";
 +        UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
 +
 +        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
 +        for (UntypedResultSet.Row row : resultSet)
 +        {
 +            String keyspace = row.getString("keyspace_name");
 +            String columnfamily = row.getString("columnfamily_name");
 +            Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
 +
 +            unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
 +        }
 +        return unfinishedCompactions;
 +    }
 +
 +    public static void discardCompactionsInProgress()
 +    {
 +        ColumnFamilyStore compactionLog = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
 +        compactionLog.truncateBlocking();
 +    }
 +
 +    public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
 +    {
 +        String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
 +        processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
-         if (!Boolean.getBoolean("cassandra.unsafetruncate"))
-             forceBlockingFlush(LOCAL_CF);
++        forceBlockingFlush(LOCAL_CF);
 +    }
 +
 +    /**
 +     * This method is used to remove information about truncation time for specified column family
 +     */
 +    public static void removeTruncationRecord(UUID cfId)
 +    {
 +        String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'";
 +        processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
 +        forceBlockingFlush(LOCAL_CF);
 +    }
 +
 +    private static String truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
 +    {
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        try
 +        {
 +            ReplayPosition.serializer.serialize(position, out);
 +            out.writeLong(truncatedAt);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +        return String.format("{%s: 0x%s}",
 +                             cfs.metadata.cfId,
 +                             ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
 +    }
 +
 +    public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords()
 +    {
 +        String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'";
 +        UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +        if (rows.isEmpty())
 +            return Collections.emptyMap();
 +
 +        UntypedResultSet.Row row = rows.one();
 +        Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance);
 +        if (rawMap == null)
 +            return Collections.emptyMap();
 +
 +        Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, Pair<ReplayPosition, Long>>();
 +        for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet())
 +            positions.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
 +        return positions;
 +    }
 +
 +    private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
 +    {
 +        try
 +        {
 +            DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
 +            return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * Record tokens being used by another node
 +     */
 +    public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
 +    {
 +        if (ep.equals(FBUtilities.getBroadcastAddress()))
 +        {
 +            removeEndpoint(ep);
 +            return;
 +        }
 +
 +        String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)";
 +        processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), tokensAsSet(tokens)));
 +        forceBlockingFlush(PEERS_CF);
 +    }
 +
 +    public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
 +    {
 +        String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES ('%s', '%s')";
 +        processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), preferred_ip.getHostAddress()));
 +        forceBlockingFlush(PEERS_CF);
 +    }
 +
 +    public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value)
 +    {
 +        if (ep.equals(FBUtilities.getBroadcastAddress()))
 +            return;
 +
 +        String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', %s)";
 +        processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value));
 +    }
 +
 +    public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
 +    {
 +        // with 30 day TTL
 +        String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ %s ] = %s WHERE peer = '%s'";
 +        processInternal(String.format(req, PEER_EVENTS_CF, timePeriod.toString(), value, ep.getHostAddress()));
 +    }
 +
 +    public static synchronized void updateSchemaVersion(UUID version)
 +    {
 +        String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', %s)";
 +        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, version.toString()));
 +    }
 +
 +    private static String tokensAsSet(Collection<Token> tokens)
 +    {
 +        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
 +        StringBuilder sb = new StringBuilder();
 +        sb.append("{");
 +        Iterator<Token> iter = tokens.iterator();
 +        while (iter.hasNext())
 +        {
 +            sb.append("'").append(factory.toString(iter.next())).append("'");
 +            if (iter.hasNext())
 +                sb.append(",");
 +        }
 +        sb.append("}");
 +        return sb.toString();
 +    }
 +
 +    private static Collection<Token> deserializeTokens(Collection<String> tokensStrings)
 +    {
 +        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
 +        List<Token> tokens = new ArrayList<Token>(tokensStrings.size());
 +        for (String tk : tokensStrings)
 +            tokens.add(factory.fromString(tk));
 +        return tokens;
 +    }
 +
 +    /**
 +     * Remove stored tokens being used by another node
 +     */
 +    public static synchronized void removeEndpoint(InetAddress ep)
 +    {
 +        String req = "DELETE FROM system.%s WHERE peer = '%s'";
 +        processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
 +        forceBlockingFlush(PEERS_CF);
 +    }
 +
 +    /**
 +     * This method is used to update the System Keyspace with the new tokens for this node
 +    */
 +    public static synchronized void updateTokens(Collection<Token> tokens)
 +    {
 +        assert !tokens.isEmpty() : "removeEndpoint should be used instead";
 +        String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)";
 +        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokensAsSet(tokens)));
 +        forceBlockingFlush(LOCAL_CF);
 +    }
 +
 +    /**
 +     * Convenience method to update the list of tokens in the local system keyspace.
 +     *
 +     * @param addTokens tokens to add
 +     * @param rmTokens tokens to remove
 +     * @return the collection of persisted tokens
 +     */
 +    public static synchronized Collection<Token> updateLocalTokens(Collection<Token> addTokens, Collection<Token> rmTokens)
 +    {
 +        Collection<Token> tokens = getSavedTokens();
 +        tokens.removeAll(rmTokens);
 +        tokens.addAll(addTokens);
 +        updateTokens(tokens);
 +        return tokens;
 +    }
 +
-     private static void forceBlockingFlush(String cfname)
++    public static void forceBlockingFlush(String cfname)
 +    {
-         Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfname).forceBlockingFlush();
++        if (!Boolean.getBoolean("cassandra.unsafesystem"))
++            FBUtilities.waitOnFuture(Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfname).forceFlush());
 +    }
 +
 +    /**
 +     * Return a map of stored tokens to IP addresses
 +     *
 +     */
 +    public static SetMultimap<InetAddress, Token> loadTokens()
 +    {
 +        SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
 +        for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF))
 +        {
 +            InetAddress peer = row.getInetAddress("peer");
 +            if (row.has("tokens"))
 +                tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance)));
 +        }
 +
 +        return tokenMap;
 +    }
 +
 +    /**
 +     * Return a map of store host_ids to IP addresses
 +     *
 +     */
 +    public static Map<InetAddress, UUID> loadHostIds()
 +    {
 +        Map<InetAddress, UUID> hostIdMap = new HashMap<InetAddress, UUID>();
 +        for (UntypedResultSet.Row row : processInternal("SELECT peer, host_id FROM system." + PEERS_CF))
 +        {
 +            InetAddress peer = row.getInetAddress("peer");
 +            if (row.has("host_id"))
 +            {
 +                hostIdMap.put(peer, row.getUUID("host_id"));
 +            }
 +        }
 +        return hostIdMap;
 +    }
 +
 +    public static InetAddress getPreferredIP(InetAddress ep)
 +    {
 +        String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
 +        if (!result.isEmpty() && result.one().has("preferred_ip"))
 +            return result.one().getInetAddress("preferred_ip");
 +        return null;
 +    }
 +
 +    /**
 +     * Return a map of IP addresses containing a map of dc and rack info
 +     */
 +    public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
 +    {
 +        Map<InetAddress, Map<String, String>> result = new HashMap<InetAddress, Map<String, String>>();
 +        for (UntypedResultSet.Row row : processInternal("SELECT peer, data_center, rack from system." + PEERS_CF))
 +        {
 +            InetAddress peer = row.getInetAddress("peer");
 +            if (row.has("data_center") && row.has("rack"))
 +            {
 +                Map<String, String> dcRack = new HashMap<String, String>();
 +                dcRack.put("data_center", row.getString("data_center"));
 +                dcRack.put("rack", row.getString("rack"));
 +                result.put(peer, dcRack);
 +            }
 +        }
 +        return result;
 +    }
 +
 +    /**
 +     * One of three things will happen if you try to read the system keyspace:
 +     * 1. files are present and you can read them: great
 +     * 2. no files are there: great (new node is assumed)
 +     * 3. files are present but you can't read them: bad
 +     * @throws ConfigurationException
 +     */
 +    public static void checkHealth() throws ConfigurationException
 +    {
 +        Keyspace keyspace;
 +        try
 +        {
 +            keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
 +        }
 +        catch (AssertionError err)
 +        {
 +            // this happens when a user switches from OPP to RP.
 +            ConfigurationException ex = new ConfigurationException("Could not read system keyspace!");
 +            ex.initCause(err);
 +            throw ex;
 +        }
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL_CF);
 +
 +        String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +
 +        if (result.isEmpty() || !result.one().has("cluster_name"))
 +        {
 +            // this is a brand new node
 +            if (!cfs.getSSTables().isEmpty())
 +                throw new ConfigurationException("Found system keyspace files, but they couldn't be loaded!");
 +
 +            // no system files.  this is a new node.
 +            req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', '%s')";
 +            processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, DatabaseDescriptor.getClusterName()));
 +            return;
 +        }
 +
 +        String savedClusterName = result.one().getString("cluster_name");
 +        if (!DatabaseDescriptor.getClusterName().equals(savedClusterName))
 +            throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName());
 +    }
 +
 +    public static Collection<Token> getSavedTokens()
 +    {
 +        String req = "SELECT tokens FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +        return result.isEmpty() || !result.one().has("tokens")
 +             ? Collections.<Token>emptyList()
 +             : deserializeTokens(result.one().<String>getSet("tokens", UTF8Type.instance));
 +    }
 +
 +    public static int incrementAndGetGeneration()
 +    {
 +        String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +
 +        int generation;
 +        if (result.isEmpty() || !result.one().has("gossip_generation"))
 +        {
 +            // seconds-since-epoch isn't a foolproof new generation
 +            // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"),
 +            // but it's as close as sanely possible
 +            generation = (int) (System.currentTimeMillis() / 1000);
 +        }
 +        else
 +        {
 +            // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen.
 +            final int storedGeneration = result.one().getInt("gossip_generation") + 1;
 +            final int now = (int) (System.currentTimeMillis() / 1000);
 +            if (storedGeneration >= now)
 +            {
 +                logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}.  See CASSANDRA-3654 if you experience problems",
 +                            storedGeneration, now);
 +                generation = storedGeneration;
 +            }
 +            else
 +            {
 +                generation = now;
 +            }
 +        }
 +
 +        req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', %d)";
 +        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, generation));
 +        forceBlockingFlush(LOCAL_CF);
 +
 +        return generation;
 +    }
 +
 +    public static BootstrapState getBootstrapState()
 +    {
 +        String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +
 +        if (result.isEmpty() || !result.one().has("bootstrapped"))
 +            return BootstrapState.NEEDS_BOOTSTRAP;
 +
 +        return BootstrapState.valueOf(result.one().getString("bootstrapped"));
 +    }
 +
 +    public static boolean bootstrapComplete()
 +    {
 +        return getBootstrapState() == BootstrapState.COMPLETED;
 +    }
 +
 +    public static boolean bootstrapInProgress()
 +    {
 +        return getBootstrapState() == BootstrapState.IN_PROGRESS;
 +    }
 +
 +    public static void setBootstrapState(BootstrapState state)
 +    {
 +        String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')";
 +        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, state.name()));
 +        forceBlockingFlush(LOCAL_CF);
 +    }
 +
 +    public static boolean isIndexBuilt(String keyspaceName, String indexName)
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
 +        QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)),
 +                                                        INDEX_CF,
 +                                                        ByteBufferUtil.bytes(indexName),
 +                                                        System.currentTimeMillis());
 +        return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
 +    }
 +
 +    public static void setIndexBuilt(String keyspaceName, String indexName)
 +    {
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, INDEX_CF);
 +        cf.addColumn(new Column(ByteBufferUtil.bytes(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
 +        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName), cf);
 +        rm.apply();
 +        forceBlockingFlush(INDEX_CF);
 +    }
 +
 +    public static void setIndexRemoved(String keyspaceName, String indexName)
 +    {
 +        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName));
 +        rm.delete(INDEX_CF, ByteBufferUtil.bytes(indexName), FBUtilities.timestampMicros());
 +        rm.apply();
 +        forceBlockingFlush(INDEX_CF);
 +    }
 +
 +    /**
 +     * Read the host ID from the system keyspace, creating (and storing) one if
 +     * none exists.
 +     */
 +    public static UUID getLocalHostId()
 +    {
 +        UUID hostId = null;
 +
 +        String req = "SELECT host_id FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +
 +        // Look up the Host UUID (return it if found)
 +        if (!result.isEmpty() && result.one().has("host_id"))
 +        {
 +            return result.one().getUUID("host_id");
 +        }
 +
 +        // ID not found, generate a new one, persist, and then return it.
 +        hostId = UUID.randomUUID();
 +        logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId);
 +
 +        req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
 +        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId));
 +        return hostId;
 +    }
 +
 +    /**
 +     * Read the current local node id from the system keyspace or null if no
 +     * such node id is recorded.
 +     */
 +    public static CounterId getCurrentLocalCounterId()
 +    {
 +        Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
 +
 +        // Get the last CounterId (since CounterId are timeuuid is thus ordered from the older to the newer one)
 +        QueryFilter filter = QueryFilter.getSliceFilter(decorate(ALL_LOCAL_NODE_ID_KEY),
 +                                                        COUNTER_ID_CF,
 +                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
 +                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
 +                                                        true,
 +                                                        1,
 +                                                        System.currentTimeMillis());
 +        ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
 +        if (cf != null && cf.getColumnCount() != 0)
 +            return CounterId.wrap(cf.iterator().next().name());
 +        else
 +            return null;
 +    }
 +
 +    /**
 +     * Write a new current local node id to the system keyspace.
 +     *
 +     * @param oldCounterId the previous local node id (that {@code newCounterId}
 +     * replace) or null if no such node id exists (new node or removed system
 +     * keyspace)
 +     * @param newCounterId the new current local node id to record
 +     * @param now microsecond time stamp.
 +     */
 +    public static void writeCurrentLocalCounterId(CounterId oldCounterId, CounterId newCounterId, long now)
 +    {
 +        ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getBroadcastAddress().getAddress());
 +
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, COUNTER_ID_CF);
 +        cf.addColumn(new Column(newCounterId.bytes(), ip, now));
 +        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf);
 +        rm.apply();
 +        forceBlockingFlush(COUNTER_ID_CF);
 +    }
 +
 +    public static List<CounterId.CounterIdRecord> getOldLocalCounterIds()
 +    {
 +        List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
 +
 +        Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
 +        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
 +        ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
 +
 +        CounterId previous = null;
 +        for (Column c : cf)
 +        {
 +            if (previous != null)
 +                l.add(new CounterId.CounterIdRecord(previous, c.timestamp()));
 +
 +            // this will ignore the last column on purpose since it is the
 +            // current local node id
 +            previous = CounterId.wrap(c.name());
 +        }
 +        return l;
 +    }
 +
 +    /**
 +     * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
 +     * @return CFS responsible to hold low-level serialized schema
 +     */
 +    public static ColumnFamilyStore schemaCFS(String cfName)
 +    {
 +        return Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfName);
 +    }
 +
 +    public static List<Row> serializedSchema()
 +    {
 +        List<Row> schema = new ArrayList<Row>(3);
 +
 +        schema.addAll(serializedSchema(SCHEMA_KEYSPACES_CF));
 +        schema.addAll(serializedSchema(SCHEMA_COLUMNFAMILIES_CF));
 +        schema.addAll(serializedSchema(SCHEMA_COLUMNS_CF));
 +
 +        return schema;
 +    }
 +
 +    /**
 +     * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
 +     * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily)
 +     */
 +    public static List<Row> serializedSchema(String schemaCfName)
 +    {
 +        Token minToken = StorageService.getPartitioner().getMinimumToken();
 +
 +        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
 +                                                     null,
 +                                                     new IdentityQueryFilter(),
 +                                                     Integer.MAX_VALUE,
 +                                                     System.currentTimeMillis());
 +    }
 +
 +    public static Collection<RowMutation> serializeSchema()
 +    {
 +        Map<DecoratedKey, RowMutation> mutationMap = new HashMap<DecoratedKey, RowMutation>();
 +
 +        serializeSchema(mutationMap, SCHEMA_KEYSPACES_CF);
 +        serializeSchema(mutationMap, SCHEMA_COLUMNFAMILIES_CF);
 +        serializeSchema(mutationMap, SCHEMA_COLUMNS_CF);
 +
 +        return mutationMap.values();
 +    }
 +
 +    private static void serializeSchema(Map<DecoratedKey, RowMutation> mutationMap, String schemaCfName)
 +    {
 +        for (Row schemaRow : serializedSchema(schemaCfName))
 +        {
 +            if (Schema.ignoredSchemaRow(schemaRow))
 +                continue;
 +
 +            RowMutation mutation = mutationMap.get(schemaRow.key);
 +            if (mutation == null)
 +            {
 +                mutation = new RowMutation(Keyspace.SYSTEM_KS, schemaRow.key.key);
 +                mutationMap.put(schemaRow.key, mutation);
 +            }
 +
 +            mutation.add(schemaRow.cf);
 +        }
 +    }
 +
 +    public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName)
 +    {
 +        Map<DecoratedKey, ColumnFamily> schema = new HashMap<DecoratedKey, ColumnFamily>();
 +
 +        for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName))
 +            schema.put(schemaEntity.key, schemaEntity.cf);
 +
 +        return schema;
 +    }
 +
 +    public static ByteBuffer getSchemaKSKey(String ksName)
 +    {
 +        return AsciiType.instance.fromString(ksName);
 +    }
 +
 +    public static Row readSchemaRow(String ksName)
 +    {
 +        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
 +
 +        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SCHEMA_KEYSPACES_CF);
 +        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF, System.currentTimeMillis()));
 +
 +        return new Row(key, result);
 +    }
 +
 +    public static Row readSchemaRow(String ksName, String cfName)
 +    {
 +        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
 +
 +        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SCHEMA_COLUMNFAMILIES_CF);
 +        ColumnFamily result = schemaCFS.getColumnFamily(key,
 +                                                        DefsTables.searchComposite(cfName, true),
 +                                                        DefsTables.searchComposite(cfName, false),
 +                                                        false,
 +                                                        Integer.MAX_VALUE,
 +                                                        System.currentTimeMillis());
 +
 +        return new Row(key, result);
 +    }
 +
 +    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
 +    {
 +        String req = "SELECT * FROM system.%s WHERE row_key = 0x%s AND cf_id = %s";
 +        UntypedResultSet results = processInternal(String.format(req, PAXOS_CF, ByteBufferUtil.bytesToHex(key), metadata.cfId));
 +        if (results.isEmpty())
 +            return new PaxosState(key, metadata);
 +        UntypedResultSet.Row row = results.one();
 +        Commit inProgress = new Commit(key,
 +                                       row.getUUID("in_progress_ballot"),
 +                                       row.has("proposal") ? ColumnFamily.fromBytes(row.getBytes("proposal")) : EmptyColumns.factory.create(metadata));
 +        // either most_recent_commit and most_recent_commit_at will both be set, or neither
 +        Commit mostRecent = row.has("most_recent_commit")
 +                          ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit")))
 +                          : Commit.emptyCommit(key, metadata);
 +        return new PaxosState(inProgress, mostRecent);
 +    }
 +
 +    public static void savePaxosPromise(Commit promise)
 +    {
 +        String req = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s WHERE row_key = 0x%s AND cf_id = %s";
 +        processInternal(String.format(req,
 +                                      PAXOS_CF,
 +                                      UUIDGen.microsTimestamp(promise.ballot),
 +                                      paxosTtl(promise.update.metadata),
 +                                      promise.ballot,
 +                                      ByteBufferUtil.bytesToHex(promise.key),
 +                                      promise.update.id()));
 +    }
 +
 +    public static void savePaxosProposal(Commit commit)
 +    {
 +        processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
 +                                      PAXOS_CF,
 +                                      UUIDGen.microsTimestamp(commit.ballot),
 +                                      paxosTtl(commit.update.metadata),
 +                                      ByteBufferUtil.bytesToHex(commit.update.toBytes()),
 +                                      ByteBufferUtil.bytesToHex(commit.key),
 +                                      commit.update.id()));
 +    }
 +
 +    private static int paxosTtl(CFMetaData metadata)
 +    {
 +        // keep paxos state around for at least 3h
 +        return Math.max(3 * 3600, metadata.getGcGraceSeconds());
 +    }
 +
 +    public static void savePaxosCommit(Commit commit, UUID inProgressBallot)
 +    {
 +        String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
 +        // identical except adds proposal = null
 +        String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = null, in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
 +        boolean proposalAfterCommit = inProgressBallot.timestamp() > commit.ballot.timestamp();
 +        processInternal(String.format(proposalAfterCommit ? preserveCql : eraseCql,
 +                                      PAXOS_CF,
 +                                      UUIDGen.microsTimestamp(commit.ballot),
 +                                      paxosTtl(commit.update.metadata),
 +                                      proposalAfterCommit ? inProgressBallot : commit.ballot,
 +                                      commit.ballot,
 +                                      ByteBufferUtil.bytesToHex(commit.update.toBytes()),
 +                                      ByteBufferUtil.bytesToHex(commit.key),
 +                                      commit.update.id()));
 +    }
 +}


Mime
View raw message