Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 576446459 for ; Fri, 29 Jul 2011 05:10:36 +0000 (UTC) Received: (qmail 15364 invoked by uid 500); 29 Jul 2011 05:10:35 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 15190 invoked by uid 500); 29 Jul 2011 05:10:29 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 15181 invoked by uid 99); 29 Jul 2011 05:10:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Jul 2011 05:10:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Jul 2011 05:10:23 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 251082388897 for ; Fri, 29 Jul 2011 05:10:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1152107 - in /cassandra/trunk: ./ contrib/ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ doc/cql/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/gms/ src/java/... Date: Fri, 29 Jul 2011 05:10:01 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110729051003.251082388897@eris.apache.org> Author: jbellis Date: Fri Jul 29 05:09:59 2011 New Revision: 1152107 URL: http://svn.apache.org/viewvc?rev=1152107&view=rev Log: merge from 0.8 Modified: cassandra/trunk/ (props changed) cassandra/trunk/CHANGES.txt cassandra/trunk/NEWS.txt cassandra/trunk/contrib/ (props changed) cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java cassandra/trunk/doc/cql/CQL.textile cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jul 29 05:09:59 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7:1026516-1149015,1149716 +/cassandra/branches/cassandra-0.7:1026516-1151306 /cassandra/branches/cassandra-0.7.0:1053690-1055654 -/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1149725,1150103,1151495,1151497 +/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1151495,1151497 /cassandra/branches/cassandra-0.8.0:1125021-1130369 /cassandra/branches/cassandra-0.8.1:1101014-1125018 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689 Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Fri Jul 29 05:09:59 2011 @@ -26,6 +26,19 @@ (CASSANDRA-1951) +0.8.3 + * add ability to drop local reads/writes that are going to timeout + (CASSANDRA-2943) + * revamp token removal process, keep gossip states for 3 days (CASSANDRA-2946) + * don't accept extra args for 0-arg nodetool commands (CASSANDRA-2740) + * log unavailableexception details at debug level (CASSANDRA-2856) + * expose data_dir though jmx (CASSANDRA-2770) + * don't include tmp files as sstable when create cfs (CASSANDRA-2929) + * log Java classpath on startup (CASSANDRA-2895) + * keep gossipped version in sync with actual on migration coordinator + (CASSANDRA-2946) + + 0.8.2 * CQL: - include only one row per unique key for IN queries (CASSANDRA-2717) Modified: cassandra/trunk/NEWS.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/NEWS.txt (original) +++ cassandra/trunk/NEWS.txt Fri Jul 29 05:09:59 2011 @@ -7,6 +7,16 @@ Upgrading sstableloader tool instead. +0.8.3 +===== + +Upgrading +--------- + - Token removal has been revamped. Removing tokens in a mixed cluster with + 0.8.3 will not work, so the entire cluster will need to be running 0.8.3 + first, except for the dead node. + + 0.8.2 ===== Propchange: cassandra/trunk/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jul 29 05:09:59 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009 -/cassandra/branches/cassandra-0.7/contrib:1026516-1149015,1149716 +/cassandra/branches/cassandra-0.7/contrib:1026516-1151306 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654 -/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1149725,1150103,1151495,1151497 +/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1151495,1151497 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689 Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Jul 29 05:09:59 2011 @@ -69,8 +69,6 @@ public class CassandraStorage extends Lo public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; - private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema"; - private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); @@ -79,6 +77,8 @@ public class CassandraStorage extends Lo private boolean slice_reverse = false; private String keyspace; private String column_family; + private String loadSignature; + private String storeSignature; private Configuration conf; private RecordReader reader; @@ -113,7 +113,7 @@ public class CassandraStorage extends Lo if (!reader.nextKeyValue()) return null; - CfDef cfDef = getCfDef(); + CfDef cfDef = getCfDef(loadSignature); ByteBuffer key = (ByteBuffer)reader.getCurrentKey(); SortedMap cf = (SortedMap)reader.getCurrentValue(); assert key != null && cf != null; @@ -166,11 +166,11 @@ public class CassandraStorage extends Lo return pair; } - private CfDef getCfDef() + private CfDef getCfDef(String signature) { UDFContext context = UDFContext.getUDFContext(); Properties property = context.getUDFProperties(CassandraStorage.class); - return cfdefFromString(property.getProperty(getSchemaContextKey())); + return cfdefFromString(property.getProperty(signature)); } private List getDefaultMarshallers(CfDef cfDef) throws IOException @@ -290,7 +290,7 @@ public class CassandraStorage extends Lo } ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); - initSchema(); + initSchema(loadSignature); } @Override @@ -299,9 +299,16 @@ public class CassandraStorage extends Lo return location; } + @Override + public void setUDFContextSignature(String signature) + { + this.loadSignature = signature; + } + /* StoreFunc methods */ public void setStoreFuncUDFContextSignature(String signature) { + this.storeSignature = signature; } public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException @@ -315,7 +322,7 @@ public class CassandraStorage extends Lo setLocationFromUri(location); ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); - initSchema(); + initSchema(storeSignature); } public OutputFormat getOutputFormat() @@ -347,7 +354,7 @@ public class CassandraStorage extends Lo ByteBuffer key = objToBB(t.get(0)); DefaultDataBag pairs = (DefaultDataBag) t.get(1); ArrayList mutationList = new ArrayList(); - CfDef cfDef = getCfDef(); + CfDef cfDef = getCfDef(storeSignature); List marshallers = getDefaultMarshallers(cfDef); Map validators = getValidatorMap(cfDef); try @@ -412,7 +419,7 @@ public class CassandraStorage extends Lo } catch (ClassCastException e) { - throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily"); + throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily", e); } try { @@ -430,14 +437,13 @@ public class CassandraStorage extends Lo /* Methods to get the column family schema from Cassandra */ - private void initSchema() + private void initSchema(String signature) { UDFContext context = UDFContext.getUDFContext(); Properties property = context.getUDFProperties(CassandraStorage.class); - String schemaContextKey = getSchemaContextKey(); // Only get the schema if we haven't already gotten it - if (!property.containsKey(schemaContextKey)) + if (!property.containsKey(signature)) { Cassandra.Client client = null; try @@ -455,7 +461,7 @@ public class CassandraStorage extends Lo break; } } - property.setProperty(schemaContextKey, cfdefToString(cfDef)); + property.setProperty(signature, cfdefToString(cfDef)); } catch (TException e) { @@ -521,14 +527,4 @@ public class CassandraStorage extends Lo } return cfDef; } - - private String getSchemaContextKey() - { - StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX); - sb.append('.'); - sb.append(keyspace); - sb.append('.'); - sb.append(column_family); - return sb.toString(); - } } Modified: cassandra/trunk/doc/cql/CQL.textile URL: http://svn.apache.org/viewvc/cassandra/trunk/doc/cql/CQL.textile?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/doc/cql/CQL.textile (original) +++ cassandra/trunk/doc/cql/CQL.textile Fri Jul 29 05:09:59 2011 @@ -1,4 +1,4 @@ -h1. Cassandra Query Language (CQL) v1.0.0 +h1. Cassandra Query Language (CQL) v1.1.0 h2. Table of Contents @@ -364,5 +364,8 @@ Versioning of the CQL language adheres t h1. Changes pre. +Sat, 01 Jun 2011 15:58:00 -0600 - Pavel Yaskevich + * Updated to support ALTER (CASSANDRA-1709) + Tue, 22 Mar 2011 18:10:28 -0700 - Eric Evans * Initial version, 1.0.0 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jul 29 05:09:59 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1149015,1149716 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1151306 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1151495,1151497 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jul 29 05:09:59 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1149015,1149716 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1151306 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1151495,1151497 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jul 29 05:09:59 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1149015,1149716 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1151306 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1151495,1151497 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jul 29 05:09:59 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1149015,1149716 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1151306 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1151495,1151497 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jul 29 05:09:59 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1149015,1149716 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1151306 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1151495,1151497 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jul 29 05:09:59 2011 @@ -204,6 +204,9 @@ public class HintedHandOffManager implem } waited = 0; // then wait for the correct schema version. + // usually we use DD.getDefsVersion, which checks the local schema uuid as stored in the system table. + // here we check the one in gossip instead; this serves as a canary to warn us if we introduce a bug that + // causes the two to diverge (see CASSANDRA-2946) while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals( gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value)) { Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Fri Jul 29 05:09:59 2011 @@ -29,6 +29,7 @@ public enum ApplicationState DC, RACK, RELEASE_VERSION, + REMOVAL_COORDINATOR, INTERNAL_IP, // pad to allow adding new states to existing cluster X1, Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Fri Jul 29 05:09:59 2011 @@ -27,6 +27,7 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.*; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.db.SystemTable; import org.apache.cassandra.net.MessageProducer; import org.apache.cassandra.config.ConfigurationException; @@ -58,6 +59,8 @@ public class Gossiper implements IFailur private static final RetryingScheduledThreadPoolExecutor executor = new RetryingScheduledThreadPoolExecutor("GossipTasks"); static final ApplicationState[] STATES = ApplicationState.values(); + static final List DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT); + private ScheduledFuture scheduledGossipTask; public final static int intervalInMillis = 1000; public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2; @@ -264,17 +267,21 @@ public class Gossiper implements IFailur } /** - * Removes the endpoint from unreachable endpoint set + * Removes the endpoint from gossip completely * * @param endpoint endpoint to be removed from the current membership. */ private void evictFromMembership(InetAddress endpoint) { unreachableEndpoints.remove(endpoint); + endpointStateMap.remove(endpoint); + justRemovedEndpoints.put(endpoint, System.currentTimeMillis()); + if (logger.isDebugEnabled()) + logger.debug("evicting " + endpoint + " from gossip"); } /** - * Removes the endpoint completely from Gossip + * Removes the endpoint from Gossip but retains endpoint state */ public void removeEndpoint(InetAddress endpoint) { @@ -288,6 +295,8 @@ public class Gossiper implements IFailur FailureDetector.instance.remove(endpoint); versions.remove(endpoint); justRemovedEndpoints.put(endpoint, System.currentTimeMillis()); + if (logger.isDebugEnabled()) + logger.debug("removing endpoint " + endpoint); } /** @@ -328,6 +337,67 @@ public class Gossiper implements IFailur } } + /** + * This method will begin removing an existing endpoint from the cluster by spoofing its state + * This should never be called unless this coordinator has had 'removetoken' invoked + * + * @param endpoint - the endpoint being removed + * @param token - the token being removed + * @param mytoken - my own token for replication coordination + */ + public void advertiseRemoving(InetAddress endpoint, Token token, Token mytoken) + { + EndpointState epState = endpointStateMap.get(endpoint); + // remember this node's generation + int generation = epState.getHeartBeatState().getGeneration(); + logger.info("Removing token: " + token); + logger.info("Sleeping for " + StorageService.RING_DELAY + "ms to ensure " + endpoint + " does not change"); + try + { + Thread.sleep(StorageService.RING_DELAY); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + // make sure it did not change + epState = endpointStateMap.get(endpoint); + if (epState.getHeartBeatState().getGeneration() != generation) + throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it"); + // update the other node's generation to mimic it as if it had changed it itself + logger.info("Advertising removal for " + endpoint); + epState.updateTimestamp(); // make sure we don't evict it too soon + epState.getHeartBeatState().forceNewerGenerationUnsafe(); + epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(token)); + epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(mytoken)); + endpointStateMap.put(endpoint, epState); + } + + /** + * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN + * This should only be called after advertiseRemoving + * @param endpoint + * @param token + */ + public void advertiseTokenRemoved(InetAddress endpoint, Token token) + { + EndpointState epState = endpointStateMap.get(endpoint); + epState.updateTimestamp(); // make sure we don't evict it too soon + epState.getHeartBeatState().forceNewerGenerationUnsafe(); + epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(token)); + logger.info("Completing removal of " + endpoint); + endpointStateMap.put(endpoint, epState); + // ensure at least one gossip round occurs before returning + try + { + Thread.sleep(intervalInMillis * 2); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + } + public boolean isKnownEndpoint(InetAddress endpoint) { return endpointStateMap.containsKey(endpoint); @@ -456,23 +526,18 @@ public class Gossiper implements IFailur { long duration = now - epState.getUpdateTimestamp(); + if (StorageService.instance.getTokenMetadata().isMember(endpoint)) + epState.setHasToken(true); // check if this is a fat client. fat clients are removed automatically from // gosip after FatClientTimeout - if (!epState.hasToken() && !epState.isAlive() && (duration > FatClientTimeout)) + if (!epState.hasToken() && !epState.isAlive() && !justRemovedEndpoints.containsKey(endpoint) && (duration > FatClientTimeout)) { - if (StorageService.instance.getTokenMetadata().isMember(endpoint)) - epState.setHasToken(true); - else - { - if (!justRemovedEndpoints.containsKey(endpoint)) // if the node was decommissioned, it will have been removed but still appear as a fat client - { - logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip"); - removeEndpoint(endpoint); // after quarantine justRemoveEndpoints will remove the state - } - } + logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip"); + removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay + evictFromMembership(endpoint); // can get rid of the state immediately } - if ( !epState.isAlive() && (duration > aVeryLongTime) ) + if ( !epState.isAlive() && (duration > aVeryLongTime) && (!StorageService.instance.getTokenMetadata().isMember(endpoint))) { evictFromMembership(endpoint); } @@ -488,7 +553,6 @@ public class Gossiper implements IFailur if (logger.isDebugEnabled()) logger.debug(QUARANTINE_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over"); justRemovedEndpoints.remove(entry.getKey()); - endpointStateMap.remove(entry.getKey()); } } } @@ -585,6 +649,7 @@ public class Gossiper implements IFailur int remoteGeneration = remoteEndpointState.getHeartBeatState().getGeneration(); if ( remoteGeneration > localGeneration ) { + localEndpointState.updateTimestamp(); fd.report(endpoint); return; } @@ -595,6 +660,7 @@ public class Gossiper implements IFailur int remoteVersion = remoteEndpointState.getHeartBeatState().getHeartBeatVersion(); if ( remoteVersion > localVersion ) { + localEndpointState.updateTimestamp(); fd.report(endpoint); } } @@ -607,6 +673,7 @@ public class Gossiper implements IFailur if (logger.isTraceEnabled()) logger.trace("marking as alive {}", addr); localState.markAlive(); + localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime liveEndpoints.add(addr); unreachableEndpoints.remove(addr); logger.info("InetAddress {} is now UP", addr); @@ -638,10 +705,13 @@ public class Gossiper implements IFailur */ private void handleMajorStateChange(InetAddress ep, EndpointState epState) { - if (endpointStateMap.get(ep) != null) - logger.info("Node {} has restarted, now UP again", ep); - else - logger.info("Node {} is now part of the cluster", ep); + if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value)) + { + if (endpointStateMap.get(ep) != null) + logger.info("Node {} has restarted, now UP again", ep); + else + logger.info("Node {} is now part of the cluster", ep); + } if (logger.isTraceEnabled()) logger.trace("Adding endpoint state for " + ep); endpointStateMap.put(ep, epState); @@ -651,11 +721,31 @@ public class Gossiper implements IFailur for (IEndpointStateChangeSubscriber subscriber : subscribers) subscriber.onDead(ep, epState); } - markAlive(ep, epState); + if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value)) + markAlive(ep, epState); + else + { + logger.debug("Not marking " + ep + " alive due to dead state"); + epState.markDead(); + epState.setHasToken(true); // fat clients won't have a dead state + } for (IEndpointStateChangeSubscriber subscriber : subscribers) subscriber.onJoin(ep, epState); } + private Boolean isDeadState(String value) + { + String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); + assert (pieces.length > 0); + String state = pieces[0]; + for (String deadstate : DEAD_STATES) + { + if (state.equals(deadstate)) + return true; + } + return false; + } + void applyStateLocally(Map epStateMap) { for (Entry entry : epStateMap.entrySet()) Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java Fri Jul 29 05:09:59 2011 @@ -71,6 +71,11 @@ class HeartBeatState { return version; } + + void forceNewerGenerationUnsafe() + { + generation += 1; + } } class HeartBeatStateSerializer implements ICompactSerializer Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Fri Jul 29 05:09:59 2011 @@ -49,7 +49,7 @@ public class VersionedValue implements C public final static char DELIMITER = ','; public final static String DELIMITER_STR = new String(new char[] { DELIMITER }); - // values for State.STATUS + // values for ApplicationState.STATUS public final static String STATUS_BOOTSTRAPPING = "BOOT"; public final static String STATUS_NORMAL = "NORMAL"; public final static String STATUS_LEAVING = "LEAVING"; @@ -59,6 +59,9 @@ public class VersionedValue implements C public final static String REMOVING_TOKEN = "removing"; public final static String REMOVED_TOKEN = "removed"; + // values for ApplicationState.REMOVAL_COORDINATOR + public final static String REMOVAL_COORDINATOR = "REMOVER"; + public final int version; public final String value; @@ -129,20 +132,19 @@ public class VersionedValue implements C return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } - public VersionedValue removingNonlocal(Token localToken, Token token) + public VersionedValue removingNonlocal(Token token) + { + return new VersionedValue(VersionedValue.REMOVING_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + } + + public VersionedValue removedNonlocal(Token token) { - return new VersionedValue(VersionedValue.STATUS_NORMAL - + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken) - + VersionedValue.DELIMITER + VersionedValue.REMOVING_TOKEN - + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(VersionedValue.REMOVED_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } - public VersionedValue removedNonlocal(Token localToken, Token token) + public VersionedValue removalCoordinator(Token token) { - return new VersionedValue(VersionedValue.STATUS_NORMAL - + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken) - + VersionedValue.DELIMITER + VersionedValue.REMOVED_TOKEN - + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(VersionedValue.REMOVAL_COORDINATOR + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } public VersionedValue datacenter(String dcId) Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jul 29 05:09:59 2011 @@ -472,6 +472,10 @@ public final class MessagingService impl public void receive(Message message, String id) { + if (logger_.isTraceEnabled()) + logger_.trace(FBUtilities.getLocalAddress() + " received " + message.getVerb() + + " from " + id + "@" + message.getFrom()); + message = SinkManager.processServerMessage(message, id); if (message == null) return; Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Fri Jul 29 05:09:59 2011 @@ -119,6 +119,7 @@ public abstract class AbstractCassandraD { logger.info("JVM vendor/version: {}/{}", System.getProperty("java.vm.name"), System.getProperty("java.version") ); logger.info("Heap size: {}/{}", Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory()); + logger.info("Classpath: {}", System.getProperty("java.class.path")); CLibrary.tryMlockall(); listenPort = DatabaseDescriptor.getRpcPort(); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Jul 29 05:09:59 2011 @@ -349,7 +349,7 @@ public class StorageProxy implements Sto { if (logger.isDebugEnabled()) logger.debug("insert writing local " + rm.toString(true)); - Runnable runnable = new WrappedRunnable() + Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION) { public void runMayThrow() throws IOException { @@ -431,7 +431,7 @@ public class StorageProxy implements Sto if (logger.isDebugEnabled()) logger.debug("insert writing local & replicate " + mutation.toString(true)); - Runnable runnable = new WrappedRunnable() + Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION) { public void runMayThrow() throws IOException { @@ -447,7 +447,7 @@ public class StorageProxy implements Sto { // We do the replication on another stage because it involves a read (see CM.makeReplicationMutation) // and we want to avoid blocking too much the MUTATION stage - StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new WrappedRunnable() + StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(StorageService.Verb.READ) { public void runMayThrow() throws IOException { @@ -616,7 +616,7 @@ public class StorageProxy implements Sto return rows; } - static class LocalReadRunnable extends WrappedRunnable + static class LocalReadRunnable extends DroppableRunnable { private final ReadCommand command; private final ReadCallback handler; @@ -624,6 +624,7 @@ public class StorageProxy implements Sto LocalReadRunnable(ReadCommand command, ReadCallback handler) { + super(StorageService.Verb.READ); this.command = command; this.handler = handler; } @@ -1078,4 +1079,35 @@ public class StorageProxy implements Sto { public void apply(IMutation mutation, Multimap hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException; } + + private static abstract class DroppableRunnable implements Runnable + { + private final long constructionTime = System.currentTimeMillis(); + private final StorageService.Verb verb; + + public DroppableRunnable(StorageService.Verb verb) + { + this.verb = verb; + } + + public final void run() + { + if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout()) + { + MessagingService.instance().incrementDroppedMessages(verb); + return; + } + + try + { + runMayThrow(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + abstract protected void runMayThrow() throws Exception; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Jul 29 05:09:59 2011 @@ -335,6 +335,11 @@ public class StorageService implements I public synchronized void initClient() throws IOException, ConfigurationException { + initClient(RING_DELAY); + } + + public synchronized void initClient(int delay) throws IOException, ConfigurationException + { if (initialized) { if (!isClientMode) @@ -352,7 +357,7 @@ public class StorageService implements I // sleep a while to allow gossip to warm up (the other nodes need to know about this one before they can reply). try { - Thread.sleep(RING_DELAY); + Thread.sleep(delay); } catch (Exception ex) { @@ -622,29 +627,35 @@ public class StorageService implements I } /* - * onChange only ever sees one ApplicationState piece change at a time, so we perform a kind of state machine here. - * We are concerned with two events: knowing the token associated with an endpoint, and knowing its operation mode. - * Nodes can start in either bootstrap or normal mode, and from bootstrap mode can change mode to normal. - * A node in bootstrap mode needs to have pendingranges set in TokenMetadata; a node in normal mode - * should instead be part of the token ring. + * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the + * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update + * from somewhere else. + * + * onChange only ever sees one ApplicationState piece change at a time (even if many ApplicationState updates were + * received at the same time), so we perform a kind of state machine here. We are concerned with two events: knowing + * the token associated with an endpoint, and knowing its operation mode. Nodes can start in either bootstrap or + * normal mode, and from bootstrap mode can change mode to normal. A node in bootstrap mode needs to have + * pendingranges set in TokenMetadata; a node in normal mode should instead be part of the token ring. * - * Normal MOVE_STATE progression of a node should be like this: - * STATE_BOOTSTRAPPING,token + * Normal progression of ApplicationState.STATUS values for a node should be like this: + * STATUS_BOOTSTRAPPING,token * if bootstrapping. stays this way until all files are received. - * STATE_NORMAL,token + * STATUS_NORMAL,token * ready to serve reads and writes. - * STATE_NORMAL,token,REMOVE_TOKEN,token - * specialized normal state in which this node acts as a proxy to tell the cluster about a dead node whose - * token is being removed. this value becomes the permanent state of this node (unless it coordinates another - * removetoken in the future). - * STATE_LEAVING,token - * get ready to leave the cluster as part of a decommission or move - * STATE_LEFT,token - * set after decommission or move is completed. - * STATE_MOVE,token - * set if node if currently moving to a new token in the ring - * - * Note: Any time a node state changes from STATE_NORMAL, it will not be visible to new nodes. So it follows that + * STATUS_LEAVING,token + * get ready to leave the cluster as part of a decommission + * STATUS_LEFT,token + * set after decommission is completed. + * + * Other STATUS values that may be seen (possibly anywhere in the normal progression): + * STATUS_MOVING,newtoken + * set if node is currently moving to a new token in the ring + * REMOVING_TOKEN,deadtoken + * set if the node is dead and is being removed by its REMOVAL_COORDINATOR + * REMOVED_TOKEN,deadtoken + * set if the node is dead and has been removed by its REMOVAL_COORDINATOR + * + * Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that * you should never bootstrap a new node during a removetoken, decommission or move. */ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) @@ -665,6 +676,8 @@ public class StorageService implements I handleStateBootstrap(endpoint, pieces); else if (moveName.equals(VersionedValue.STATUS_NORMAL)) handleStateNormal(endpoint, pieces); + else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN)) + handleStateRemoving(endpoint, pieces); else if (moveName.equals(VersionedValue.STATUS_LEAVING)) handleStateLeaving(endpoint, pieces); else if (moveName.equals(VersionedValue.STATUS_LEFT)) @@ -731,7 +744,7 @@ public class StorageService implements I * in reads. * * @param endpoint node - * @param pieces STATE_NORMAL,token[,other_state,token] + * @param pieces STATE_NORMAL,token */ private void handleStateNormal(InetAddress endpoint, String[] pieces) { @@ -773,12 +786,6 @@ public class StorageService implements I endpoint, currentOwner, token, endpoint)); } - if (pieces.length > 2) - { - assert pieces.length == 4; - handleStateRemoving(endpoint, getPartitioner().getTokenFactory().fromString(pieces[3]), pieces[2]); - } - if (tokenMetadata_.isMoving(endpoint)) // if endpoint was moving to a new token tokenMetadata_.removeFromMoving(endpoint); @@ -860,37 +867,50 @@ public class StorageService implements I * Handle notification that a node being actively removed from the ring via 'removetoken' * * @param endpoint node - * @param state either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored) + * @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored) */ - private void handleStateRemoving(InetAddress endpoint, Token removeToken, String state) + private void handleStateRemoving(InetAddress endpoint, String[] pieces) { - InetAddress removeEndpoint = tokenMetadata_.getEndpoint(removeToken); - - if (removeEndpoint == null) - return; - - if (removeEndpoint.equals(FBUtilities.getBroadcastAddress())) - { - logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?"); - return; - } + String state = pieces[0]; + assert (pieces.length > 0); - if (VersionedValue.REMOVED_TOKEN.equals(state)) + if (endpoint.equals(FBUtilities.getBroadcastAddress())) { - excise(removeToken, removeEndpoint); + logger_.info("Received removeToken gossip about myself. Is this node rejoining after an explicit removetoken?"); + try + { + drain(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + return; } - else if (VersionedValue.REMOVING_TOKEN.equals(state)) + if (tokenMetadata_.isMember(endpoint)) { - if (logger_.isDebugEnabled()) - logger_.debug("Token " + removeToken + " removed manually (endpoint was " + removeEndpoint + ")"); + Token removeToken = tokenMetadata_.getToken(endpoint); - // Note that the endpoint is being removed - tokenMetadata_.addLeavingEndpoint(removeEndpoint); - calculatePendingRanges(); + if (VersionedValue.REMOVED_TOKEN.equals(state)) + { + excise(removeToken, endpoint); + } + else if (VersionedValue.REMOVING_TOKEN.equals(state)) + { + if (logger_.isDebugEnabled()) + logger_.debug("Token " + removeToken + " removed manually (endpoint was " + endpoint + ")"); - // grab any data we are now responsible for and notify responsible node - restoreReplicaCount(removeEndpoint, endpoint); - } + // Note that the endpoint is being removed + tokenMetadata_.addLeavingEndpoint(endpoint); + calculatePendingRanges(); + + // find the endpoint coordinating this removal that we need to notify when we're done + String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1); + Token coordtoken = getPartitioner().getTokenFactory().fromString(coordinator[1]); + // grab any data we are now responsible for and notify responsible node + restoreReplicaCount(endpoint, tokenMetadata_.getEndpoint(coordtoken)); + } + } // not a member, nothing to do } private void excise(Token token, InetAddress endpoint) @@ -1059,6 +1079,8 @@ public class StorageService implements I // notify the remote token Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0], Gossiper.instance.getVersion(remote)); IFailureDetector failureDetector = FailureDetector.instance; + if (logger_.isDebugEnabled()) + logger_.debug("Notifying " + remote.toString() + " of replication completion\n"); while (failureDetector.isAlive(remote)) { IAsyncResult iar = MessagingService.instance().sendRR(msg, remote); @@ -1993,9 +2015,14 @@ public class StorageService implements I */ public void forceRemoveCompletion() { - if (!replicatingNodes.isEmpty()) + if (!replicatingNodes.isEmpty() || !tokenMetadata_.getLeavingEndpoints().isEmpty()) { logger_.warn("Removal not confirmed for for " + StringUtils.join(this.replicatingNodes, ",")); + for (InetAddress endpoint : tokenMetadata_.getLeavingEndpoints()) + { + Gossiper.instance.advertiseTokenRemoved(endpoint, tokenMetadata_.getToken(endpoint)); + tokenMetadata_.removeEndpoint(endpoint); + } replicatingNodes.clear(); } else @@ -2059,9 +2086,9 @@ public class StorageService implements I tokenMetadata_.addLeavingEndpoint(endpoint); calculatePendingRanges(); - // bundle two states together. include this nodes state to keep the status quo, - // but indicate the leaving token so that it can be dealt with. - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removingNonlocal(localToken, token)); + // the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us + // we add our own token so other nodes to let us know when they're done + Gossiper.instance.advertiseRemoving(endpoint, token, localToken); // kick off streaming commands restoreReplicaCount(endpoint, myAddress); @@ -2081,8 +2108,8 @@ public class StorageService implements I excise(token, endpoint); - // indicate the token has left - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removedNonlocal(localToken, token)); + // gossiper will indicate the token has left + Gossiper.instance.advertiseTokenRemoved(endpoint, token); replicatingNodes.clear(); removingNode = null; @@ -2090,8 +2117,18 @@ public class StorageService implements I public void confirmReplication(InetAddress node) { - assert !replicatingNodes.isEmpty(); - replicatingNodes.remove(node); + // replicatingNodes can be empty in the case where this node used to be a removal coordinator, + // but restarted before all 'replication finished' messages arrived. In that case, we'll + // still go ahead and acknowledge it. + if (!replicatingNodes.isEmpty()) + { + replicatingNodes.remove(node); + } + else + { + logger_.info("Received unexpected REPLICATION_FINISHED message from " + node + + ". Was this node recently a removal coordinator?"); + } } public boolean isClientMode() Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java Fri Jul 29 05:09:59 2011 @@ -30,6 +30,6 @@ public class InitClientTest // extends C @Test public void testInitClientStartup() throws IOException, ConfigurationException { - StorageService.instance.initClient(); + StorageService.instance.initClient(0); } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java?rev=1152107&r1=1152106&r2=1152107&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java Fri Jul 29 05:09:59 2011 @@ -36,7 +36,7 @@ public class StorageServiceClientTest { CleanupHelper.mkdirs(); CleanupHelper.cleanup(); - StorageService.instance.initClient(); + StorageService.instance.initClient(0); // verify that no storage directories were created. for (String path : DatabaseDescriptor.getAllDataFileLocations())