Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 384F8200B35 for ; Mon, 20 Jun 2016 15:23:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 36F92160A24; Mon, 20 Jun 2016 13:23:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2F969160A55 for ; Mon, 20 Jun 2016 15:23:15 +0200 (CEST) Received: (qmail 86556 invoked by uid 500); 20 Jun 2016 13:23:14 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 86526 invoked by uid 99); 20 Jun 2016 13:23:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jun 2016 13:23:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F1F86DFE65; Mon, 20 Jun 2016 13:23:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samt@apache.org To: commits@cassandra.apache.org Date: Mon, 20 Jun 2016 13:23:13 -0000 Message-Id: <14da22f0d6084989b22a2c8bc1e7a6c4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/6] cassandra git commit: Don't send new node notification on restart archived-at: Mon, 20 Jun 2016 13:23:16 -0000 Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 fd91e1593 -> 142f358f6 refs/heads/cassandra-3.0 863dbc787 -> 0a0e97df5 refs/heads/trunk 0e9d6bfe1 -> 88229a47a Don't send new node notification on restart Patch by Sam Tunnicliffe; reviewed by Joel Knighton for CASSANDRA-11038 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/142f358f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/142f358f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/142f358f Branch: refs/heads/cassandra-2.2 Commit: 142f358f6958695c4248acb94b89b64e95ccc609 Parents: fd91e15 Author: Sam Tunnicliffe Authored: Fri May 20 17:00:42 2016 +0100 Committer: Sam Tunnicliffe Committed: Mon Jun 20 12:55:27 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/StorageService.java | 142 +++---------------- .../org/apache/cassandra/transport/Server.java | 21 ++- 3 files changed, 38 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/142f358f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2c66ef9..76e601c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.7 + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038) * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) * Persist local metadata earlier in startup sequence (CASSANDRA-11742) * Run CommitLog tests with different compression settings (CASSANDRA-9039) http://git-wip-us.apache.org/repos/asf/cassandra/blob/142f358f/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 6b64664..a877074 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -17,62 +17,24 @@ */ package org.apache.cassandra.service; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.File; -import java.io.IOException; +import java.io.*; import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import javax.management.JMX; -import javax.management.MBeanServer; -import javax.management.NotificationBroadcasterSupport; -import javax.management.ObjectName; +import javax.management.*; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Collections2; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.Uninterruptibles; +import com.google.common.collect.*; +import com.google.common.util.concurrent.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,88 +48,31 @@ import org.apache.cassandra.auth.AuthMigrationListener; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.BatchlogManager; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.CounterMutationVerbHandler; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DefinitionsUpdateVerbHandler; -import org.apache.cassandra.db.HintedHandOffManager; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.MigrationRequestVerbHandler; -import org.apache.cassandra.db.MutationVerbHandler; -import org.apache.cassandra.db.ReadRepairVerbHandler; -import org.apache.cassandra.db.ReadVerbHandler; -import org.apache.cassandra.db.SchemaCheckVerbHandler; -import org.apache.cassandra.db.SizeEstimatesRecorder; -import org.apache.cassandra.db.SnapshotDetailsTabularData; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.TruncateVerbHandler; +import org.apache.cassandra.config.*; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.dht.BootStrapper; -import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.RangeStreamer; -import org.apache.cassandra.dht.RingPosition; -import org.apache.cassandra.dht.StreamStateStore; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.AlreadyExistsException; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.exceptions.UnavailableException; -import org.apache.cassandra.gms.ApplicationState; -import org.apache.cassandra.gms.EndpointState; -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.gms.GossipDigestAck2VerbHandler; -import org.apache.cassandra.gms.GossipDigestAckVerbHandler; -import org.apache.cassandra.gms.GossipDigestSynVerbHandler; -import org.apache.cassandra.gms.GossipShutdownVerbHandler; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; -import org.apache.cassandra.gms.IFailureDetector; -import org.apache.cassandra.gms.TokenSerializer; -import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.gms.*; import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.DynamicEndpointSnitch; -import org.apache.cassandra.locator.IEndpointSnitch; -import org.apache.cassandra.locator.LocalStrategy; -import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.locator.*; import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.net.AsyncOneResponse; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.ResponseVerbHandler; -import org.apache.cassandra.repair.RepairMessageVerbHandler; -import org.apache.cassandra.repair.RepairParallelism; -import org.apache.cassandra.repair.RepairRunnable; -import org.apache.cassandra.repair.SystemDistributedKeyspace; +import org.apache.cassandra.net.*; +import org.apache.cassandra.repair.*; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.service.paxos.CommitVerbHandler; import org.apache.cassandra.service.paxos.PrepareVerbHandler; import org.apache.cassandra.service.paxos.ProposeVerbHandler; -import org.apache.cassandra.streaming.ReplicationFinishedVerbHandler; -import org.apache.cassandra.streaming.StreamManager; -import org.apache.cassandra.streaming.StreamPlan; -import org.apache.cassandra.streaming.StreamResultFuture; -import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.streaming.*; import org.apache.cassandra.thrift.EndpointDetails; import org.apache.cassandra.thrift.TokenRange; import org.apache.cassandra.thrift.cassandraConstants; import org.apache.cassandra.tracing.TraceKeyspace; -import org.apache.cassandra.utils.BackgroundActivityMonitor; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.OutputHandler; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.WindowsTimer; -import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventType; import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport; @@ -1800,14 +1705,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void notifyRpcChange(InetAddress endpoint, boolean ready) { if (ready) - { notifyUp(endpoint); - notifyJoined(endpoint); - } else - { notifyDown(endpoint); - } } private void notifyUp(InetAddress endpoint) @@ -1827,7 +1727,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void notifyJoined(InetAddress endpoint) { - if (!isRpcReady(endpoint) || !isStatus(endpoint, VersionedValue.STATUS_NORMAL)) + if (!isStatus(endpoint, VersionedValue.STATUS_NORMAL)) return; for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) @@ -1851,7 +1751,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getStatus().equals(status); } - private boolean isRpcReady(InetAddress endpoint) + public boolean isRpcReady(InetAddress endpoint) { return MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_22 || Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady(); @@ -2019,7 +1919,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - boolean isMoving = tokenMetadata.isMoving(endpoint); // capture because updateNormalTokens clears moving status + // capture because updateNormalTokens clears moving and member status + boolean isMember = tokenMetadata.isMember(endpoint); + boolean isMoving = tokenMetadata.isMoving(endpoint); tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint); for (InetAddress ep : endpointsToRemove) { @@ -2035,7 +1937,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE tokenMetadata.removeFromMoving(endpoint); notifyMoved(endpoint); } - else + else if (!isMember) // prior to this, the node was not a member { notifyJoined(endpoint); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/142f358f/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 43d07fc..5c0d9d2 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.EnumMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -431,9 +429,14 @@ public class Server implements CassandraDaemon.Server { private final Server server; - // We keep track of the latest events we have sent to avoid sending duplicates - // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236) + // We keep track of the latest status change events we have sent to avoid sending duplicates + // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156) private final Map latestEvents = new ConcurrentHashMap<>(); + // We also want to delay delivering a NEW_NODE notification until the new node has set its RPC ready + // state. This tracks the endpoints which have joined, but not yet signalled they're ready for clients + private final Set endpointsPendingJoinedNotification = + Collections.newSetFromMap(new ConcurrentHashMap()); + private static final InetAddress bindAll; static { @@ -496,7 +499,10 @@ public class Server implements CassandraDaemon.Server public void onJoinCluster(InetAddress endpoint) { - onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort())); + if (!StorageService.instance.isRpcReady(endpoint)) + endpointsPendingJoinedNotification.add(endpoint); + else + onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort())); } public void onLeaveCluster(InetAddress endpoint) @@ -511,6 +517,9 @@ public class Server implements CassandraDaemon.Server public void onUp(InetAddress endpoint) { + if (endpointsPendingJoinedNotification.remove(endpoint)) + onJoinCluster(endpoint); + onStatusChange(endpoint, Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); }