cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [5/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Date Fri, 18 Oct 2013 02:06:24 GMT
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	NEWS.txt
	src/java/org/apache/cassandra/gms/Gossiper.java
	src/java/org/apache/cassandra/service/StorageService.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f0841195
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f0841195
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f0841195

Branch: refs/heads/cassandra-2.0
Commit: f0841195410d5cb5c8d5c4f0750ae5dc5a39b3ec
Parents: f19610c 351d43e
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Thu Oct 17 21:04:33 2013 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Thu Oct 17 21:04:33 2013 -0500

----------------------------------------------------------------------
 NEWS.txt                                        |   5 +-
 .../cassandra/config/DatabaseDescriptor.java    |  17 ++-
 .../org/apache/cassandra/db/SystemKeyspace.java |  11 +-
 .../gms/GossipDigestAckVerbHandler.java         |  10 +-
 .../gms/GossipDigestSynVerbHandler.java         |   2 +-
 src/java/org/apache/cassandra/gms/Gossiper.java |  71 +++++++++++-
 .../cassandra/service/StorageService.java       | 115 ++++++++++++-------
 7 files changed, 183 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0841195/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 3fa0c8d,a676aa1..4bffa77
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -14,34 -14,24 +14,35 @@@ restore snapshots created with the prev
  using the provided 'sstableupgrade' tool.
  
  
 -1.2.11
 -======
 -Features
 ---------
 +2.0.2
 +=====
 +
 +New features
 +------------
 +    - Speculative retry defaults to 99th percentile
 +      (See blog post at http://www.datastax.com/dev/blog/rapid-read-protection-in-cassandra-2-0-2)
 +    - Configurable metrics reporting
 +      (see conf/metrics-reporter-config-sample.yaml)
 +    - Compaction history and stats are now saved to system keyspace
 +      (system.compaction_history table). You can access historiy via
 +      new 'nodetool compactionhistory' command or CQL.
      - Added a new consistenct level, LOCAL_ONE, that forces all CL.ONE operations to
        execute only in the local datacenter.
- 
+     - New replace_address to supplant the (now removed) replace_token and
+       replace_node workflows to replace a dead node in place.  Works like the
+       old options, but takes the IP address of the node to be replaced.
  
 +2.0.1
 +=====
- 
  Upgrading
  ---------
 -    - ColumnFamilyMetrics#sstablesPerReadHistogram switched from uniform sampling
 -      to biased-to-last-five-minutes sampling.
 +    - The default memtable allocation has changed from 1/3 of heap to 1/4
 +      of heap.  Also, default (single-partition) read and write timeouts
 +      have been reduced from 10s to 5s and 2s, respectively.
  
  
 -1.2.10
 -======
 +2.0.0
 +=====
  
  Upgrading
  ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0841195/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0841195/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 50af82d,0000000..e5f2025
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -1,946 -1,0 +1,953 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.DataInputStream;
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import javax.management.openmbean.*;
 +
 +import com.google.common.base.Function;
 +import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.SetMultimap;
 +import com.google.common.collect.Sets;
 +
 +import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 +import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.transport.Server;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.filter.QueryFilter;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.locator.IEndpointSnitch;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.paxos.Commit;
 +import org.apache.cassandra.service.paxos.PaxosState;
 +import org.apache.cassandra.thrift.cassandraConstants;
 +import org.apache.cassandra.utils.*;
 +
 +import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
 +
 +public class SystemKeyspace
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SystemKeyspace.class);
 +
 +    // see CFMetaData for schema definitions
 +    public static final String PEERS_CF = "peers";
 +    public static final String PEER_EVENTS_CF = "peer_events";
 +    public static final String LOCAL_CF = "local";
 +    public static final String INDEX_CF = "IndexInfo";
 +    public static final String COUNTER_ID_CF = "NodeIdInfo";
 +    public static final String HINTS_CF = "hints";
 +    public static final String RANGE_XFERS_CF = "range_xfers";
 +    public static final String BATCHLOG_CF = "batchlog";
 +    // see layout description in the DefsTables class header
 +    public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";
 +    public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies";
 +    public static final String SCHEMA_COLUMNS_CF = "schema_columns";
 +    public static final String SCHEMA_TRIGGERS_CF = "schema_triggers";
 +    public static final String COMPACTION_LOG = "compactions_in_progress";
 +    public static final String PAXOS_CF = "paxos";
 +    public static final String SSTABLE_ACTIVITY_CF = "sstable_activity";
 +    public static final String COMPACTION_HISTORY_CF = "compaction_history";
 +
 +    private static final String LOCAL_KEY = "local";
 +    private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
 +
 +    public enum BootstrapState
 +    {
 +        NEEDS_BOOTSTRAP,
 +        COMPLETED,
 +        IN_PROGRESS
 +    }
 +
 +    private static DecoratedKey decorate(ByteBuffer key)
 +    {
 +        return StorageService.getPartitioner().decorateKey(key);
 +    }
 +
 +    public static void finishStartup()
 +    {
 +        setupVersion();
 +
 +        copyAllAliasesToColumnsProper();
 +
 +        // add entries to system schema columnfamilies for the hardcoded system definitions
 +        for (String ksname : Schema.systemKeyspaceNames)
 +        {
 +            KSMetaData ksmd = Schema.instance.getKSMetaData(ksname);
 +
 +            // delete old, possibly obsolete entries in schema columnfamilies
 +            for (String cfname : Arrays.asList(SystemKeyspace.SCHEMA_KEYSPACES_CF, SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, SystemKeyspace.SCHEMA_COLUMNS_CF))
 +            {
 +                String req = String.format("DELETE FROM system.%s WHERE keyspace_name = '%s'", cfname, ksmd.name);
 +                processInternal(req);
 +            }
 +
 +            // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added)
 +            ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply();
 +        }
 +    }
 +
 +    // Starting with 2.0 (CASSANDRA-5125) we keep all the 'aliases' in system.schema_columns together with the regular columns,
 +    // but only for the newly-created tables. This migration is for the pre-2.0 created tables.
 +    private static void copyAllAliasesToColumnsProper()
 +    {
 +        for (UntypedResultSet.Row row : processInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF)))
 +        {
 +            CFMetaData table = CFMetaData.fromSchema(row);
 +            String query = String.format("SELECT writetime(type) "
 +                                         + "FROM system.%s "
 +                                         + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
 +                                         SCHEMA_COLUMNFAMILIES_CF,
 +                                         table.ksName,
 +                                         table.cfName);
 +            long timestamp = processInternal(query).one().getLong("writetime(type)");
 +            try
 +            {
 +                table.toSchema(timestamp).apply();
 +            }
 +            catch (ConfigurationException e)
 +            {
 +                // shouldn't happen
 +            }
 +        }
 +    }
 +
 +    private static void setupVersion()
 +    {
 +        String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')";
 +        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 +        processInternal(String.format(req, LOCAL_CF,
 +                                         LOCAL_KEY,
 +                                         FBUtilities.getReleaseVersionString(),
 +                                         QueryProcessor.CQL_VERSION.toString(),
 +                                         cassandraConstants.VERSION,
 +                                         Server.CURRENT_VERSION,
 +                                         snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
 +                                         snitch.getRack(FBUtilities.getBroadcastAddress()),
 +                                         DatabaseDescriptor.getPartitioner().getClass().getName()));
 +    }
 +
 +    /**
 +     * Write compaction log, except columfamilies under system keyspace.
 +     *
 +     * @param cfs
 +     * @param toCompact sstables to compact
 +     * @return compaction task id or null if cfs is under system keyspace
 +     */
 +    public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
 +    {
 +        if (Keyspace.SYSTEM_KS.equals(cfs.keyspace.getName()))
 +            return null;
 +
 +        UUID compactionId = UUIDGen.getTimeUUID();
 +        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
 +        Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
 +        {
 +            public Integer apply(SSTableReader sstable)
 +            {
 +                return sstable.descriptor.generation;
 +            }
 +        });
 +        processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.keyspace.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ',')));
 +        forceBlockingFlush(COMPACTION_LOG);
 +        return compactionId;
 +    }
 +
 +    public static void finishCompaction(UUID taskId)
 +    {
 +        assert taskId != null;
 +
 +        String req = "DELETE FROM system.%s WHERE id = %s";
 +        processInternal(String.format(req, COMPACTION_LOG, taskId));
 +        forceBlockingFlush(COMPACTION_LOG);
 +    }
 +
 +    /**
 +     * @return unfinished compactions, grouped by keyspace/columnfamily pair.
 +     */
 +    public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
 +    {
 +        String req = "SELECT * FROM system.%s";
 +        UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
 +
 +        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
 +        for (UntypedResultSet.Row row : resultSet)
 +        {
 +            String keyspace = row.getString("keyspace_name");
 +            String columnfamily = row.getString("columnfamily_name");
 +            Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
 +
 +            unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
 +        }
 +        return unfinishedCompactions;
 +    }
 +
 +    public static void discardCompactionsInProgress()
 +    {
 +        ColumnFamilyStore compactionLog = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
 +        compactionLog.truncateBlocking();
 +    }
 +
 +    public static void updateCompactionHistory(String ksname,
 +                                               String cfname,
 +                                               long compactedAt,
 +                                               long bytesIn,
 +                                               long bytesOut,
 +                                               Map<Integer, Long> rowsMerged)
 +    {
 +        // don't write anything when the history table itself is compacted, since that would in turn cause new compactions
 +        if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY_CF))
 +            return;
 +        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) "
 +                     + "VALUES (%s, '%s', '%s', %d, %d, %d, {%s})";
 +        processInternal(String.format(req, COMPACTION_HISTORY_CF, UUIDGen.getTimeUUID().toString(), ksname, cfname, compactedAt, bytesIn, bytesOut, FBUtilities.toString(rowsMerged)));
 +            forceBlockingFlush(COMPACTION_HISTORY_CF);
 +    }
 +
 +    public static TabularData getCompactionHistory() throws OpenDataException
 +    {
 +        UntypedResultSet queryResultSet = processInternal("SELECT * from system.compaction_history");
 +        return CompactionHistoryTabularData.from(queryResultSet);
 +    }
 +
 +    public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
 +    {
 +        String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
 +        processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
 +        forceBlockingFlush(LOCAL_CF);
 +    }
 +
 +    /**
 +     * This method is used to remove information about truncation time for specified column family
 +     */
 +    public static void removeTruncationRecord(UUID cfId)
 +    {
 +        String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'";
 +        processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
 +        forceBlockingFlush(LOCAL_CF);
 +    }
 +
 +    private static String truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
 +    {
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        try
 +        {
 +            ReplayPosition.serializer.serialize(position, out);
 +            out.writeLong(truncatedAt);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +        return String.format("{%s: 0x%s}",
 +                             cfs.metadata.cfId,
 +                             ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
 +    }
 +
 +    public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords()
 +    {
 +        String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'";
 +        UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +        if (rows.isEmpty())
 +            return Collections.emptyMap();
 +
 +        UntypedResultSet.Row row = rows.one();
 +        Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance);
 +        if (rawMap == null)
 +            return Collections.emptyMap();
 +
 +        Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, Pair<ReplayPosition, Long>>();
 +        for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet())
 +            positions.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
 +        return positions;
 +    }
 +
 +    private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
 +    {
 +        try
 +        {
 +            DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
 +            return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * Record tokens being used by another node
 +     */
 +    public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
 +    {
 +        if (ep.equals(FBUtilities.getBroadcastAddress()))
 +        {
 +            removeEndpoint(ep);
 +            return;
 +        }
 +
 +        String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)";
 +        processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), tokensAsSet(tokens)));
 +        forceBlockingFlush(PEERS_CF);
 +    }
 +
 +    public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
 +    {
 +        String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES ('%s', '%s')";
 +        processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), preferred_ip.getHostAddress()));
 +        forceBlockingFlush(PEERS_CF);
 +    }
 +
 +    public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value)
 +    {
 +        if (ep.equals(FBUtilities.getBroadcastAddress()))
 +            return;
 +
 +        String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', %s)";
 +        processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value));
 +    }
 +
 +    public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
 +    {
 +        // with 30 day TTL
 +        String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ %s ] = %s WHERE peer = '%s'";
 +        processInternal(String.format(req, PEER_EVENTS_CF, timePeriod.toString(), value, ep.getHostAddress()));
 +    }
 +
 +    public static synchronized void updateSchemaVersion(UUID version)
 +    {
 +        String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', %s)";
 +        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, version.toString()));
 +    }
 +
 +    private static String tokensAsSet(Collection<Token> tokens)
 +    {
 +        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
 +        StringBuilder sb = new StringBuilder();
 +        sb.append("{");
 +        Iterator<Token> iter = tokens.iterator();
 +        while (iter.hasNext())
 +        {
 +            sb.append("'").append(factory.toString(iter.next())).append("'");
 +            if (iter.hasNext())
 +                sb.append(",");
 +        }
 +        sb.append("}");
 +        return sb.toString();
 +    }
 +
 +    private static Collection<Token> deserializeTokens(Collection<String> tokensStrings)
 +    {
 +        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
 +        List<Token> tokens = new ArrayList<Token>(tokensStrings.size());
 +        for (String tk : tokensStrings)
 +            tokens.add(factory.fromString(tk));
 +        return tokens;
 +    }
 +
 +    /**
 +     * Remove stored tokens being used by another node
 +     */
 +    public static synchronized void removeEndpoint(InetAddress ep)
 +    {
 +        String req = "DELETE FROM system.%s WHERE peer = '%s'";
 +        processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
 +        forceBlockingFlush(PEERS_CF);
 +    }
 +
 +    /**
 +     * This method is used to update the System Keyspace with the new tokens for this node
 +    */
 +    public static synchronized void updateTokens(Collection<Token> tokens)
 +    {
 +        assert !tokens.isEmpty() : "removeEndpoint should be used instead";
 +        String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)";
 +        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokensAsSet(tokens)));
 +        forceBlockingFlush(LOCAL_CF);
 +    }
 +
 +    /**
 +     * Convenience method to update the list of tokens in the local system keyspace.
 +     *
 +     * @param addTokens tokens to add
 +     * @param rmTokens tokens to remove
 +     * @return the collection of persisted tokens
 +     */
 +    public static synchronized Collection<Token> updateLocalTokens(Collection<Token> addTokens, Collection<Token> rmTokens)
 +    {
 +        Collection<Token> tokens = getSavedTokens();
 +        tokens.removeAll(rmTokens);
 +        tokens.addAll(addTokens);
 +        updateTokens(tokens);
 +        return tokens;
 +    }
 +
 +    public static void forceBlockingFlush(String cfname)
 +    {
 +        if (!Boolean.getBoolean("cassandra.unsafesystem"))
 +            FBUtilities.waitOnFuture(Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfname).forceFlush());
 +    }
 +
 +    /**
 +     * Return a map of stored tokens to IP addresses
 +     *
 +     */
 +    public static SetMultimap<InetAddress, Token> loadTokens()
 +    {
 +        SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
 +        for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF))
 +        {
 +            InetAddress peer = row.getInetAddress("peer");
 +            if (row.has("tokens"))
 +                tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance)));
 +        }
 +
 +        return tokenMap;
 +    }
 +
 +    /**
 +     * Return a map of store host_ids to IP addresses
 +     *
 +     */
 +    public static Map<InetAddress, UUID> loadHostIds()
 +    {
 +        Map<InetAddress, UUID> hostIdMap = new HashMap<InetAddress, UUID>();
 +        for (UntypedResultSet.Row row : processInternal("SELECT peer, host_id FROM system." + PEERS_CF))
 +        {
 +            InetAddress peer = row.getInetAddress("peer");
 +            if (row.has("host_id"))
 +            {
 +                hostIdMap.put(peer, row.getUUID("host_id"));
 +            }
 +        }
 +        return hostIdMap;
 +    }
 +
 +    public static InetAddress getPreferredIP(InetAddress ep)
 +    {
 +        String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
 +        if (!result.isEmpty() && result.one().has("preferred_ip"))
 +            return result.one().getInetAddress("preferred_ip");
 +        return null;
 +    }
 +
 +    /**
 +     * Return a map of IP addresses containing a map of dc and rack info
 +     */
 +    public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
 +    {
 +        Map<InetAddress, Map<String, String>> result = new HashMap<InetAddress, Map<String, String>>();
 +        for (UntypedResultSet.Row row : processInternal("SELECT peer, data_center, rack from system." + PEERS_CF))
 +        {
 +            InetAddress peer = row.getInetAddress("peer");
 +            if (row.has("data_center") && row.has("rack"))
 +            {
 +                Map<String, String> dcRack = new HashMap<String, String>();
 +                dcRack.put("data_center", row.getString("data_center"));
 +                dcRack.put("rack", row.getString("rack"));
 +                result.put(peer, dcRack);
 +            }
 +        }
 +        return result;
 +    }
 +
 +    /**
 +     * One of three things will happen if you try to read the system keyspace:
 +     * 1. files are present and you can read them: great
 +     * 2. no files are there: great (new node is assumed)
 +     * 3. files are present but you can't read them: bad
 +     * @throws ConfigurationException
 +     */
 +    public static void checkHealth() throws ConfigurationException
 +    {
 +        Keyspace keyspace;
 +        try
 +        {
 +            keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
 +        }
 +        catch (AssertionError err)
 +        {
 +            // this happens when a user switches from OPP to RP.
 +            ConfigurationException ex = new ConfigurationException("Could not read system keyspace!");
 +            ex.initCause(err);
 +            throw ex;
 +        }
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL_CF);
 +
 +        String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +
 +        if (result.isEmpty() || !result.one().has("cluster_name"))
 +        {
 +            // this is a brand new node
 +            if (!cfs.getSSTables().isEmpty())
 +                throw new ConfigurationException("Found system keyspace files, but they couldn't be loaded!");
 +
 +            // no system files.  this is a new node.
 +            req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', '%s')";
 +            processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, DatabaseDescriptor.getClusterName()));
 +            return;
 +        }
 +
 +        String savedClusterName = result.one().getString("cluster_name");
 +        if (!DatabaseDescriptor.getClusterName().equals(savedClusterName))
 +            throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName());
 +    }
 +
 +    public static Collection<Token> getSavedTokens()
 +    {
 +        String req = "SELECT tokens FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +        return result.isEmpty() || !result.one().has("tokens")
 +             ? Collections.<Token>emptyList()
 +             : deserializeTokens(result.one().<String>getSet("tokens", UTF8Type.instance));
 +    }
 +
 +    public static int incrementAndGetGeneration()
 +    {
 +        String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +
 +        int generation;
 +        if (result.isEmpty() || !result.one().has("gossip_generation"))
 +        {
 +            // seconds-since-epoch isn't a foolproof new generation
 +            // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"),
 +            // but it's as close as sanely possible
 +            generation = (int) (System.currentTimeMillis() / 1000);
 +        }
 +        else
 +        {
 +            // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen.
 +            final int storedGeneration = result.one().getInt("gossip_generation") + 1;
 +            final int now = (int) (System.currentTimeMillis() / 1000);
 +            if (storedGeneration >= now)
 +            {
 +                logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}.  See CASSANDRA-3654 if you experience problems",
 +                            storedGeneration, now);
 +                generation = storedGeneration;
 +            }
 +            else
 +            {
 +                generation = now;
 +            }
 +        }
 +
 +        req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', %d)";
 +        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, generation));
 +        forceBlockingFlush(LOCAL_CF);
 +
 +        return generation;
 +    }
 +
 +    public static BootstrapState getBootstrapState()
 +    {
 +        String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +
 +        if (result.isEmpty() || !result.one().has("bootstrapped"))
 +            return BootstrapState.NEEDS_BOOTSTRAP;
 +
 +        return BootstrapState.valueOf(result.one().getString("bootstrapped"));
 +    }
 +
 +    public static boolean bootstrapComplete()
 +    {
 +        return getBootstrapState() == BootstrapState.COMPLETED;
 +    }
 +
 +    public static boolean bootstrapInProgress()
 +    {
 +        return getBootstrapState() == BootstrapState.IN_PROGRESS;
 +    }
 +
 +    public static void setBootstrapState(BootstrapState state)
 +    {
 +        String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')";
 +        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, state.name()));
 +        forceBlockingFlush(LOCAL_CF);
 +    }
 +
 +    public static boolean isIndexBuilt(String keyspaceName, String indexName)
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
 +        QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)),
 +                                                        INDEX_CF,
 +                                                        ByteBufferUtil.bytes(indexName),
 +                                                        System.currentTimeMillis());
 +        return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
 +    }
 +
 +    public static void setIndexBuilt(String keyspaceName, String indexName)
 +    {
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, INDEX_CF);
 +        cf.addColumn(new Column(ByteBufferUtil.bytes(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
 +        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName), cf);
 +        rm.apply();
 +        forceBlockingFlush(INDEX_CF);
 +    }
 +
 +    public static void setIndexRemoved(String keyspaceName, String indexName)
 +    {
 +        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName));
 +        rm.delete(INDEX_CF, ByteBufferUtil.bytes(indexName), FBUtilities.timestampMicros());
 +        rm.apply();
 +        forceBlockingFlush(INDEX_CF);
 +    }
 +
 +    /**
 +     * Read the host ID from the system keyspace, creating (and storing) one if
 +     * none exists.
 +     */
 +    public static UUID getLocalHostId()
 +    {
 +        UUID hostId = null;
 +
 +        String req = "SELECT host_id FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
 +
 +        // Look up the Host UUID (return it if found)
 +        if (!result.isEmpty() && result.one().has("host_id"))
 +        {
 +            return result.one().getUUID("host_id");
 +        }
 +
 +        // ID not found, generate a new one, persist, and then return it.
 +        hostId = UUID.randomUUID();
 +        logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId);
++        return setLocalHostId(hostId);
++    }
 +
-         req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
++    /**
++     * Sets the local host ID explicitly.  Should only be called outside of SystemTable when replacing a node.
++     */
++    public static UUID setLocalHostId(UUID hostId)
++    {
++        String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
 +        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId));
 +        return hostId;
 +    }
 +
 +    /**
 +     * Read the current local node id from the system keyspace or null if no
 +     * such node id is recorded.
 +     */
 +    public static CounterId getCurrentLocalCounterId()
 +    {
 +        Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
 +
 +        // Get the last CounterId (since CounterId are timeuuid is thus ordered from the older to the newer one)
 +        QueryFilter filter = QueryFilter.getSliceFilter(decorate(ALL_LOCAL_NODE_ID_KEY),
 +                                                        COUNTER_ID_CF,
 +                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
 +                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
 +                                                        true,
 +                                                        1,
 +                                                        System.currentTimeMillis());
 +        ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
 +        if (cf != null && cf.getColumnCount() != 0)
 +            return CounterId.wrap(cf.iterator().next().name());
 +        else
 +            return null;
 +    }
 +
 +    /**
 +     * Write a new current local node id to the system keyspace.
 +     *
 +     * @param newCounterId the new current local node id to record
 +     * @param now microsecond time stamp.
 +     */
 +    public static void writeCurrentLocalCounterId(CounterId newCounterId, long now)
 +    {
 +        ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getBroadcastAddress().getAddress());
 +
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, COUNTER_ID_CF);
 +        cf.addColumn(new Column(newCounterId.bytes(), ip, now));
 +        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf);
 +        rm.apply();
 +        forceBlockingFlush(COUNTER_ID_CF);
 +    }
 +
 +    public static List<CounterId.CounterIdRecord> getOldLocalCounterIds()
 +    {
 +        List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
 +
 +        Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
 +        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
 +        ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
 +
 +        CounterId previous = null;
 +        for (Column c : cf)
 +        {
 +            if (previous != null)
 +                l.add(new CounterId.CounterIdRecord(previous, c.timestamp()));
 +
 +            // this will ignore the last column on purpose since it is the
 +            // current local node id
 +            previous = CounterId.wrap(c.name());
 +        }
 +        return l;
 +    }
 +
 +    /**
 +     * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
 +     * @return CFS responsible to hold low-level serialized schema
 +     */
 +    public static ColumnFamilyStore schemaCFS(String cfName)
 +    {
 +        return Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfName);
 +    }
 +
 +    public static List<Row> serializedSchema()
 +    {
 +        List<Row> schema = new ArrayList<>();
 +
 +        schema.addAll(serializedSchema(SCHEMA_KEYSPACES_CF));
 +        schema.addAll(serializedSchema(SCHEMA_COLUMNFAMILIES_CF));
 +        schema.addAll(serializedSchema(SCHEMA_COLUMNS_CF));
 +        schema.addAll(serializedSchema(SCHEMA_TRIGGERS_CF));
 +
 +        return schema;
 +    }
 +
 +    /**
 +     * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
 +     * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily)
 +     */
 +    public static List<Row> serializedSchema(String schemaCfName)
 +    {
 +        Token minToken = StorageService.getPartitioner().getMinimumToken();
 +
 +        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
 +                                                     null,
 +                                                     new IdentityQueryFilter(),
 +                                                     Integer.MAX_VALUE,
 +                                                     System.currentTimeMillis());
 +    }
 +
 +    public static Collection<RowMutation> serializeSchema()
 +    {
 +        Map<DecoratedKey, RowMutation> mutationMap = new HashMap<>();
 +
 +        serializeSchema(mutationMap, SCHEMA_KEYSPACES_CF);
 +        serializeSchema(mutationMap, SCHEMA_COLUMNFAMILIES_CF);
 +        serializeSchema(mutationMap, SCHEMA_COLUMNS_CF);
 +        serializeSchema(mutationMap, SCHEMA_TRIGGERS_CF);
 +
 +        return mutationMap.values();
 +    }
 +
 +    private static void serializeSchema(Map<DecoratedKey, RowMutation> mutationMap, String schemaCfName)
 +    {
 +        for (Row schemaRow : serializedSchema(schemaCfName))
 +        {
 +            if (Schema.ignoredSchemaRow(schemaRow))
 +                continue;
 +
 +            RowMutation mutation = mutationMap.get(schemaRow.key);
 +            if (mutation == null)
 +            {
 +                mutation = new RowMutation(Keyspace.SYSTEM_KS, schemaRow.key.key);
 +                mutationMap.put(schemaRow.key, mutation);
 +            }
 +
 +            mutation.add(schemaRow.cf);
 +        }
 +    }
 +
 +    public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName)
 +    {
 +        Map<DecoratedKey, ColumnFamily> schema = new HashMap<DecoratedKey, ColumnFamily>();
 +
 +        for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName))
 +            schema.put(schemaEntity.key, schemaEntity.cf);
 +
 +        return schema;
 +    }
 +
 +    public static ByteBuffer getSchemaKSKey(String ksName)
 +    {
 +        return AsciiType.instance.fromString(ksName);
 +    }
 +
 +    public static Row readSchemaRow(String ksName)
 +    {
 +        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
 +
 +        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SCHEMA_KEYSPACES_CF);
 +        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF, System.currentTimeMillis()));
 +
 +        return new Row(key, result);
 +    }
 +
 +    /**
 +     * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace+table pair.
 +     *
 +     * @param schemaCfName the schema table to get the data from (schema_columnfamilies, schema_columns or schema_triggers)
 +     * @param ksName the keyspace of the table we are interested in
 +     * @param cfName the table we are interested in
 +     * @return a Row containing the schema data of a particular type for the table
 +     */
 +    public static Row readSchemaRow(String schemaCfName, String ksName, String cfName)
 +    {
 +        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
 +        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName);
 +        ColumnFamily cf = schemaCFS.getColumnFamily(key,
 +                                                    DefsTables.searchComposite(cfName, true),
 +                                                    DefsTables.searchComposite(cfName, false),
 +                                                    false,
 +                                                    Integer.MAX_VALUE,
 +                                                    System.currentTimeMillis());
 +        return new Row(key, cf);
 +    }
 +
 +    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
 +    {
 +        String req = "SELECT * FROM system.%s WHERE row_key = 0x%s AND cf_id = %s";
 +        UntypedResultSet results = processInternal(String.format(req, PAXOS_CF, ByteBufferUtil.bytesToHex(key), metadata.cfId));
 +        if (results.isEmpty())
 +            return new PaxosState(key, metadata);
 +        UntypedResultSet.Row row = results.one();
 +        Commit promised = new Commit(key, row.getUUID("in_progress_ballot"), EmptyColumns.factory.create(metadata));
 +        // either we have both a recently accepted ballot and update or we have neither
 +        Commit accepted = row.has("proposal")
 +                        ? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal")))
 +                        : Commit.emptyCommit(key, metadata);
 +        // either most_recent_commit and most_recent_commit_at will both be set, or neither
 +        Commit mostRecent = row.has("most_recent_commit")
 +                          ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit")))
 +                          : Commit.emptyCommit(key, metadata);
 +        return new PaxosState(promised, accepted, mostRecent);
 +    }
 +
 +    public static void savePaxosPromise(Commit promise)
 +    {
 +        String req = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s WHERE row_key = 0x%s AND cf_id = %s";
 +        processInternal(String.format(req,
 +                                      PAXOS_CF,
 +                                      UUIDGen.microsTimestamp(promise.ballot),
 +                                      paxosTtl(promise.update.metadata),
 +                                      promise.ballot,
 +                                      ByteBufferUtil.bytesToHex(promise.key),
 +                                      promise.update.id()));
 +    }
 +
 +    public static void savePaxosProposal(Commit proposal)
 +    {
 +        processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = %s, proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
 +                                      PAXOS_CF,
 +                                      UUIDGen.microsTimestamp(proposal.ballot),
 +                                      paxosTtl(proposal.update.metadata),
 +                                      proposal.ballot,
 +                                      ByteBufferUtil.bytesToHex(proposal.update.toBytes()),
 +                                      ByteBufferUtil.bytesToHex(proposal.key),
 +                                      proposal.update.id()));
 +    }
 +
 +    private static int paxosTtl(CFMetaData metadata)
 +    {
 +        // keep paxos state around for at least 3h
 +        return Math.max(3 * 3600, metadata.getGcGraceSeconds());
 +    }
 +
 +    public static void savePaxosCommit(Commit commit)
 +    {
 +        // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
 +        // even though that's really just an optimization  since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
 +        String cql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = null, proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
 +        processInternal(String.format(cql,
 +                                      PAXOS_CF,
 +                                      UUIDGen.microsTimestamp(commit.ballot),
 +                                      paxosTtl(commit.update.metadata),
 +                                      commit.ballot,
 +                                      ByteBufferUtil.bytesToHex(commit.update.toBytes()),
 +                                      ByteBufferUtil.bytesToHex(commit.key),
 +                                      commit.update.id()));
 +    }
 +
 +    /**
 +     * Returns a RestorableMeter tracking the average read rate of a particular SSTable, restoring the last-seen rate
 +     * from values in system.sstable_activity if present.
 +     * @param keyspace the keyspace the sstable belongs to
 +     * @param table the table the sstable belongs to
 +     * @param generation the generation number for the sstable
 +     */
 +    public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation)
 +    {
 +        String cql = "SELECT * FROM %s WHERE keyspace_name='%s' and columnfamily_name='%s' and generation=%d";
 +        UntypedResultSet results = processInternal(String.format(cql,
 +                                                                 SSTABLE_ACTIVITY_CF,
 +                                                                 keyspace,
 +                                                                 table,
 +                                                                 generation));
 +
 +        if (results.isEmpty())
 +            return new RestorableMeter();
 +
 +        UntypedResultSet.Row row = results.one();
 +        double m15rate = row.getDouble("rate_15m");
 +        double m120rate = row.getDouble("rate_120m");
 +        return new RestorableMeter(m15rate, m120rate);
 +    }
 +
 +    /**
 +     * Writes the current read rates for a given SSTable to system.sstable_activity
 +     */
 +    public static void persistSSTableReadMeter(String keyspace, String table, int generation, RestorableMeter meter)
 +    {
 +        // Store values with a one-day TTL to handle corner cases where cleanup might not occur
 +        String cql = "INSERT INTO %s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES ('%s', '%s', %d, %f, %f) USING TTL 864000";
 +        processInternal(String.format(cql,
 +                                      SSTABLE_ACTIVITY_CF,
 +                                      keyspace,
 +                                      table,
 +                                      generation,
 +                                      meter.fifteenMinuteRate(),
 +                                      meter.twoHourRate()));
 +    }
 +
 +    /**
 +     * Clears persisted read rates from system.sstable_activity for SSTables that have been deleted.
 +     */
 +    public static void clearSSTableReadMeter(String keyspace, String table, int generation)
 +    {
 +        String cql = "DELETE FROM %s WHERE keyspace_name='%s' AND columnfamily_name='%s' and generation=%d";
 +        processInternal(String.format(cql, SSTABLE_ACTIVITY_CF, keyspace, table, generation));
 +    }
- }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0841195/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 57863ae,b2af3a2..3d42f0e
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@@ -49,8 -49,9 +49,9 @@@ public class GossipDigestAckVerbHandle
          GossipDigestAck gDigestAckMessage = message.payload;
          List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
          Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
+         logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
  
 -        if ( epStateMap.size() > 0 )
 +        if (epStateMap.size() > 0)
          {
              /* Notify the Failure Detector */
              Gossiper.instance.notifyFailureDetector(epStateMap);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0841195/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 4b1564d,476cb72..337c115
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@@ -76,10 -76,10 +76,10 @@@ public class GossipDigestSynVerbHandle
          List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
          Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
          Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
- 
+         logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size());
          MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
 -                                                                                                      new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap),
 -                                                                                                      GossipDigestAck.serializer);
 +                                                                                        new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap),
 +                                                                                        GossipDigestAck.serializer);
          if (logger.isTraceEnabled())
              logger.trace("Sending a GossipDigestAckMessage to {}", from);
          Gossiper.instance.checkSeedContact(from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0841195/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0841195/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 33fa25d,d2c69d0..4489f59
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -378,7 -387,37 +378,37 @@@ public class StorageService extends Not
          return initialized;
      }
  
+     public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException
+     {
+         logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
+         MessagingService.instance().listen(FBUtilities.getLocalAddress());
+ 
+         // make magic happen
+         Gossiper.instance.doShadowRound();
+ 
+         Collection<Token> tokens = new ArrayList<Token>();
+         UUID hostId = null;
+         // now that we've gossiped at least once, we should be able to find the node we're replacing
+         if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+             throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
+         hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+         try
+         {
+             if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
+                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
+             tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException(e);
+         }
+         SystemTable.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
+         MessagingService.instance().shutdown();
+         Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
+         return tokens;
+     }
+ 
 -    public synchronized void initClient() throws IOException, ConfigurationException
 +    public synchronized void initClient() throws ConfigurationException
      {
          // We don't wait, because we're going to actually try to work on
          initClient(0);
@@@ -545,16 -609,16 +584,16 @@@
          // have to start the gossip service before we can see any info on other nodes.  this is necessary
          // for bootstrap to get the load info it needs.
          // (we won't be part of the storage ring though until we add a counterId to our state, below.)
-         Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>();
+         // Seed the host ID-to-endpoint map with our own ID.
+         getTokenMetadata().updateHostId(SystemTable.getLocalHostId(), FBUtilities.getBroadcastAddress());
          appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
 -        appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(SystemTable.getLocalHostId()));
 +        appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(SystemKeyspace.getLocalHostId()));
          appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
-         if (DatabaseDescriptor.isReplacing())
-             appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
          appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
+         logger.info("Starting up server gossip");
          Gossiper.instance.register(this);
          Gossiper.instance.register(migrationManager);
 -        Gossiper.instance.start(SystemTable.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
 +        Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
          // gossip snitch infos (local DC and rack)
          gossipSnitchInfo();
          // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
@@@ -577,20 -641,19 +616,19 @@@
          // We attempted to replace this with a schema-presence check, but you need a meaningful sleep
          // to get schema info from gossip which defeats the purpose.  See CASSANDRA-4427 for the gory details.
          Set<InetAddress> current = new HashSet<InetAddress>();
-         Collection<Token> tokens;
          logger.debug("Bootstrap variables: {} {} {} {}",
 -                      new Object[]{ DatabaseDescriptor.isAutoBootstrap(),
 -                                    SystemTable.bootstrapInProgress(),
 -                                    SystemTable.bootstrapComplete(),
 -                                    DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())});
 +                     DatabaseDescriptor.isAutoBootstrap(),
 +                     SystemKeyspace.bootstrapInProgress(),
 +                     SystemKeyspace.bootstrapComplete(),
 +                     DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()));
          if (DatabaseDescriptor.isAutoBootstrap()
 -            && !SystemTable.bootstrapComplete()
 +            && !SystemKeyspace.bootstrapComplete()
              && !DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
          {
 -            if (SystemTable.bootstrapInProgress())
 +            if (SystemKeyspace.bootstrapInProgress())
                  logger.warn("Detected previous bootstrap failure; retrying");
              else
 -                SystemTable.setBootstrapState(SystemTable.BootstrapState.IN_PROGRESS);
 +                SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.IN_PROGRESS);
              setMode(Mode.JOINING, "waiting for ring information", true);
              // first sleep the delay to make sure we see all our peers
              for (int i = 0; i < delay; i += 1000)
@@@ -1306,10 -1379,16 +1337,16 @@@
  
          // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
          if (Gossiper.instance.usesHostId(endpoint))
-             tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
+         {
+             UUID hostId = Gossiper.instance.getHostId(endpoint);
+             if (DatabaseDescriptor.isReplacing() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
+                 logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
+             else
+                 tokenMetadata.updateHostId(hostId, endpoint);
+         }
  
          Set<Token> tokensToUpdateInMetadata = new HashSet<Token>();
 -        Set<Token> tokensToUpdateInSystemTable = new HashSet<Token>();
 +        Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<Token>();
          Set<Token> localTokensToRemove = new HashSet<Token>();
          Set<InetAddress> endpointsToRemove = new HashSet<InetAddress>();
          Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();


Mime
View raw message