cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject cassandra git commit: Resumable bootstrap streaming
Date Wed, 18 Mar 2015 17:22:33 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk f36fe9fb1 -> 690fbf3ba


Resumable bootstrap streaming

patch by yukim; reviewed by Stefania Alborghetti for CASSANDRA-8838


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/690fbf3b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/690fbf3b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/690fbf3b

Branch: refs/heads/trunk
Commit: 690fbf3ba90cee726eb58ed1f69700d178993f75
Parents: f36fe9f
Author: Yuki Morishita <yukim@apache.org>
Authored: Wed Mar 18 12:18:28 2015 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Wed Mar 18 12:18:28 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java | 83 ++++++++++++++++-
 .../org/apache/cassandra/dht/BootStrapper.java  | 23 +++--
 .../org/apache/cassandra/dht/RangeStreamer.java | 95 +++++++++++++-------
 .../apache/cassandra/dht/StreamStateStore.java  | 82 +++++++++++++++++
 .../cassandra/service/StorageService.java       | 53 +++++++----
 .../apache/cassandra/streaming/StreamEvent.java |  5 ++
 .../cassandra/streaming/StreamSession.java      |  4 +-
 .../apache/cassandra/db/SystemKeyspaceTest.java | 24 ++---
 .../apache/cassandra/dht/BootStrapperTest.java  |  5 +-
 .../cassandra/dht/StreamStateStoreTest.java     | 76 ++++++++++++++++
 11 files changed, 372 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c07599a..955d8e3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -71,6 +71,7 @@
  * Select optimal CRC32 implementation at runtime (CASSANDRA-8614)
  * Evaluate MurmurHash of Token once per query (CASSANDRA-7096)
  * Generalize progress reporting (CASSANDRA-8901)
+ * Resumable bootstrap streaming (CASSANDRA-8838)
 
 2.1.4
  * Use correct bounds for page cache eviction of compressed files (CASSANDRA-8746)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index dcd0e55..9fa3c6b 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInputStream;
+import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
@@ -27,6 +28,7 @@ import javax.management.openmbean.*;
 
 import com.google.common.base.Function;
 import com.google.common.collect.*;
+import com.google.common.io.ByteStreams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,12 +37,13 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -49,6 +52,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Commit;
@@ -78,6 +82,7 @@ public final class SystemKeyspace
     public static final String COMPACTION_HISTORY = "compaction_history";
     public static final String SSTABLE_ACTIVITY = "sstable_activity";
     public static final String SIZE_ESTIMATES = "size_estimates";
+    public static final String AVAILABLE_RANGES = "available_ranges";
 
     public static final CFMetaData Hints =
         compile(HINTS,
@@ -218,7 +223,7 @@ public final class SystemKeyspace
     private static final CFMetaData SizeEstimates =
         compile(SIZE_ESTIMATES,
                 "per-table primary range size estimates",
-                "CREATE TABLE %S ("
+                "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "table_name text,"
                 + "range_start text,"
@@ -228,6 +233,14 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))")
                 .gcGraceSeconds(0);
 
+    private static final CFMetaData AvailableRanges =
+        compile(AVAILABLE_RANGES,
+                "Available keyspace/ranges during bootstrap/replace that are ready to be served",
+                "CREATE TABLE %s ("
+                        + "keyspace_name text PRIMARY KEY,"
+                        + "ranges set<blob>"
+                        + ")");
+
     private static CFMetaData compile(String name, String description, String schema)
     {
         return CFMetaData.compile(String.format(schema, name), NAME)
@@ -249,7 +262,8 @@ public final class SystemKeyspace
                                            CompactionsInProgress,
                                            CompactionHistory,
                                            SSTableActivity,
-                                           SizeEstimates));
+                                           SizeEstimates,
+                                           AvailableRanges));
         return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables);
     }
 
@@ -954,4 +968,67 @@ public final class SystemKeyspace
         String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, SIZE_ESTIMATES);
         executeInternal(cql, keyspace, table);
     }
+
+    public static synchronized void updateAvailableRanges(String keyspace, Collection<Range<Token>> completedRanges)
+    {
+        String cql = "UPDATE system.%s SET ranges = ranges + ? WHERE keyspace_name = ?";
+        Set<ByteBuffer> rangesToUpdate = new HashSet<>(completedRanges.size());
+        for (Range<Token> range : completedRanges)
+        {
+            rangesToUpdate.add(rangeToBytes(range));
+        }
+        executeInternal(String.format(cql, AVAILABLE_RANGES), rangesToUpdate, keyspace);
+    }
+
+    public static synchronized Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner)
+    {
+        Set<Range<Token>> result = new HashSet<>();
+        String query = "SELECT * FROM system.%s WHERE keyspace_name=?";
+        UntypedResultSet rs = executeInternal(String.format(query, AVAILABLE_RANGES), keyspace);
+        for (UntypedResultSet.Row row : rs)
+        {
+            Set<ByteBuffer> rawRanges = row.getSet("ranges", BytesType.instance);
+            for (ByteBuffer rawRange : rawRanges)
+            {
+                result.add(byteBufferToRange(rawRange, partitioner));
+            }
+        }
+        return ImmutableSet.copyOf(result);
+    }
+
+    public static void resetAvailableRanges()
+    {
+        ColumnFamilyStore availableRanges = Keyspace.open(NAME).getColumnFamilyStore(AVAILABLE_RANGES);
+        availableRanges.truncateBlocking();
+    }
+
+    private static ByteBuffer rangeToBytes(Range<Token> range)
+    {
+        try
+        {
+            DataOutputBuffer out = new DataOutputBuffer();
+            Range.tokenSerializer.serialize(range, out, MessagingService.VERSION_30);
+            return out.asByteBuffer();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Range<Token> byteBufferToRange(ByteBuffer rawRange, IPartitioner partitioner)
+    {
+        try
+        {
+            return (Range<Token>) Range.tokenSerializer.deserialize(ByteStreams.newDataInput(ByteBufferUtil.getArray(rawRange)),
+                                                                    partitioner,
+                                                                    MessagingService.VERSION_30);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index cbbd100..8f52f7e 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -55,15 +55,20 @@ public class BootStrapper
 
         this.address = address;
         this.tokens = tokens;
-        tokenMetadata = tmd;
+        this.tokenMetadata = tmd;
     }
 
-    public void bootstrap()
+    public void bootstrap(StreamStateStore stateStore, boolean useStrictConsistency)
     {
-        if (logger.isDebugEnabled())
-            logger.debug("Beginning bootstrap process");
-
-        RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, "Bootstrap");
+        logger.debug("Beginning bootstrap process");
+
+        RangeStreamer streamer = new RangeStreamer(tokenMetadata,
+                                                   tokens,
+                                                   address,
+                                                   "Bootstrap",
+                                                   useStrictConsistency,
+                                                   DatabaseDescriptor.getEndpointSnitch(),
+                                                   stateStore);
         streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
 
         for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
@@ -83,7 +88,7 @@ public class BootStrapper
         }
         catch (ExecutionException e)
         {
-            throw new RuntimeException("Error during boostrap: " + e.getCause().getMessage(), e.getCause());
+            throw new RuntimeException("Error during bootstrap: " + e.getCause().getMessage(), e.getCause());
         }
     }
 
@@ -99,7 +104,7 @@ public class BootStrapper
         if (initialTokens.size() > 0)
         {
             logger.debug("tokens manually specified as {}",  initialTokens);
-            List<Token> tokens = new ArrayList<Token>(initialTokens.size());
+            List<Token> tokens = new ArrayList<>(initialTokens.size());
             for (String tokenString : initialTokens)
             {
                 Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
@@ -122,7 +127,7 @@ public class BootStrapper
 
     public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
     {
-        Set<Token> tokens = new HashSet<Token>(numTokens);
+        Set<Token> tokens = new HashSet<>(numTokens);
         while (tokens.size() < numTokens)
         {
             Token token = StorageService.getPartitioner().getRandomToken();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 72679cc..fecb308 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -20,24 +20,24 @@ package org.apache.cassandra.dht;
 import java.net.InetAddress;
 import java.util.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-import org.apache.cassandra.gms.EndpointState;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.utils.FBUtilities;
@@ -48,14 +48,21 @@ import org.apache.cassandra.utils.FBUtilities;
 public class RangeStreamer
 {
     private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
-    public static final boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement","true"));
+
+    /* bootstrap tokens. can be null if replacing the node. */
     private final Collection<Token> tokens;
+    /* current token ring */
     private final TokenMetadata metadata;
+    /* address of this node */
     private final InetAddress address;
+    /* streaming description */
     private final String description;
     private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create();
-    private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>();
+    private final Set<ISourceFilter> sourceFilters = new HashSet<>();
     private final StreamPlan streamPlan;
+    private final boolean useStrictConsistency;
+    private final IEndpointSnitch snitch;
+    private final StreamStateStore stateStore;
 
     /**
      * A filter applied to sources to stream from when constructing a fetch map.
@@ -104,18 +111,23 @@ public class RangeStreamer
         }
     }
 
-    public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress address, String description)
+    public RangeStreamer(TokenMetadata metadata,
+                         Collection<Token> tokens,
+                         InetAddress address,
+                         String description,
+                         boolean useStrictConsistency,
+                         IEndpointSnitch snitch,
+                         StreamStateStore stateStore)
     {
         this.metadata = metadata;
         this.tokens = tokens;
         this.address = address;
         this.description = description;
         this.streamPlan = new StreamPlan(description, true);
-    }
-
-    public RangeStreamer(TokenMetadata metadata, InetAddress address, String description)
-    {
-        this(metadata, null, address, description);
+        this.useStrictConsistency = useStrictConsistency;
+        this.snitch = snitch;
+        this.stateStore = stateStore;
+        streamPlan.listeners(this.stateStore);
     }
 
     public void addSourceFilter(ISourceFilter filter)
@@ -123,6 +135,12 @@ public class RangeStreamer
         sourceFilters.add(filter);
     }
 
+    /**
+     * Add ranges to be streamed for given keyspace.
+     *
+     * @param keyspaceName keyspace name
+     * @param ranges ranges to be streamed
+     */
     public void addRanges(String keyspaceName, Collection<Range<Token>> ranges)
     {
         Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictSourcesForRanges(keyspaceName)
@@ -145,11 +163,14 @@ public class RangeStreamer
         }
     }
 
+    /**
+     * @param keyspaceName keyspace name to check
+     * @return true when the node is bootstrapping, useStrictConsistency is true and # of nodes in the cluster is more than # of replica
+     */
     private boolean useStrictSourcesForRanges(String keyspaceName)
     {
         AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy();
-        return !DatabaseDescriptor.isReplacing()
-                && useStrictConsistency
+        return useStrictConsistency
                 && tokens != null
                 && metadata.getAllEndpoints().size() != strat.getReplicationFactor();
     }
@@ -157,6 +178,8 @@ public class RangeStreamer
     /**
      * Get a map of all ranges and their respective sources that are candidates for streaming the given ranges
      * to us. For each range, the list of sources is sorted by proximity relative to the given destAddress.
+     *
+     * @throws java.lang.IllegalStateException when there is no source to get data streamed
      */
     private Multimap<Range<Token>, InetAddress> getAllRangesWithSourcesFor(String keyspaceName, Collection<Range<Token>> desiredRanges)
     {
@@ -170,7 +193,7 @@ public class RangeStreamer
             {
                 if (range.contains(desiredRange))
                 {
-                    List<InetAddress> preferred = DatabaseDescriptor.getEndpointSnitch().getSortedListByProximity(address, rangeAddresses.get(range));
+                    List<InetAddress> preferred = snitch.getSortedListByProximity(address, rangeAddresses.get(range));
                     rangeSources.putAll(desiredRange, preferred);
                     break;
                 }
@@ -187,22 +210,23 @@ public class RangeStreamer
      * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
      * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
      * consistency.
+     *
+     * @throws java.lang.IllegalStateException when there is no source to get data streamed, or more than 1 source found.
      */
-    private Multimap<Range<Token>, InetAddress> getAllRangesWithStrictSourcesFor(String table, Collection<Range<Token>> desiredRanges)
+    private Multimap<Range<Token>, InetAddress> getAllRangesWithStrictSourcesFor(String keyspace, Collection<Range<Token>> desiredRanges)
     {
-
         assert tokens != null;
-        AbstractReplicationStrategy strat = Keyspace.open(table).getReplicationStrategy();
+        AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy();
 
-        //Active ranges
+        // Active ranges
         TokenMetadata metadataClone = metadata.cloneOnlyTokenMap();
-        Multimap<Range<Token>,InetAddress> addressRanges = strat.getRangeAddresses(metadataClone);
+        Multimap<Range<Token>, InetAddress> addressRanges = strat.getRangeAddresses(metadataClone);
 
-        //Pending ranges
+        // Pending ranges
         metadataClone.updateNormalTokens(tokens, address);
-        Multimap<Range<Token>,InetAddress> pendingRangeAddresses = strat.getRangeAddresses(metadataClone);
+        Multimap<Range<Token>, InetAddress> pendingRangeAddresses = strat.getRangeAddresses(metadataClone);
 
-        //Collects the source that will have its range moved to the new node
+        // Collects the source that will have its range moved to the new node
         Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create();
 
         for (Range<Token> desiredRange : desiredRanges)
@@ -214,8 +238,8 @@ public class RangeStreamer
                     Set<InetAddress> oldEndpoints = Sets.newHashSet(preEntry.getValue());
                     Set<InetAddress> newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange));
 
-                    //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
-                    //So we need to be careful to only be strict when endpoints == RF
+                    // Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+                    // So we need to be careful to only be strict when endpoints == RF
                     if (oldEndpoints.size() == strat.getReplicationFactor())
                     {
                         oldEndpoints.removeAll(newEndpoints);
@@ -226,7 +250,7 @@ public class RangeStreamer
                 }
             }
 
-            //Validate
+            // Validate
             Collection<InetAddress> addressList = rangeSources.get(desiredRange);
             if (addressList == null || addressList.isEmpty())
                 throw new IllegalStateException("No sources found for " + desiredRange);
@@ -237,7 +261,8 @@ public class RangeStreamer
             InetAddress sourceIp = addressList.iterator().next();
             EndpointState sourceState = Gossiper.instance.getEndpointStateForEndpoint(sourceIp);
             if (Gossiper.instance.isEnabled() && (sourceState == null || !sourceState.isAlive()))
-                throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+").  If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false");
+                throw new RuntimeException("A node required to move the data consistently is down (" + sourceIp + "). " +
+                                           "If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false");
         }
 
         return rangeSources;
@@ -247,7 +272,8 @@ public class RangeStreamer
      * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)
      * @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given
      *                      here, we always exclude ourselves.
-     * @return
+     * @param keyspace keyspace name
+     * @return Map of source endpoint to collection of ranges
      */
     private static Multimap<InetAddress, Range<Token>> getRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources,
                                                                         Collection<ISourceFilter> sourceFilters, String keyspace)
@@ -285,12 +311,13 @@ public class RangeStreamer
         return rangeFetchMapMap;
     }
 
-    public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String keyspace)
+    public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String keyspace, IFailureDetector fd)
     {
-        return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(FailureDetector.instance)), keyspace);
+        return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), keyspace);
     }
 
     // For testing purposes
+    @VisibleForTesting
     Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch()
     {
         return toFetch;
@@ -304,9 +331,17 @@ public class RangeStreamer
             InetAddress source = entry.getValue().getKey();
             InetAddress preferred = SystemKeyspace.getPreferredIP(source);
             Collection<Range<Token>> ranges = entry.getValue().getValue();
-            /* Send messages to respective folks to stream data over to me */
+
+            // filter out already streamed ranges
+            Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.getPartitioner());
+            if (ranges.removeAll(availableRanges))
+            {
+                logger.info(availableRanges + " already available. Skipping streaming.");
+            }
+
             if (logger.isDebugEnabled())
                 logger.debug("{}ing from {} ranges {}", description, source, StringUtils.join(ranges, ", "));
+            /* Send messages to respective folks to stream data over to me */
             streamPlan.requestRanges(source, preferred, keyspace, ranges);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/dht/StreamStateStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java
new file mode 100644
index 0000000..f6046aa
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.util.Set;
+
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamRequest;
+import org.apache.cassandra.streaming.StreamState;
+
+/**
+ * Store and update available ranges (data already received) to system keyspace.
+ */
+public class StreamStateStore implements StreamEventHandler
+{
+    public Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner)
+    {
+        return SystemKeyspace.getAvailableRanges(keyspace, partitioner);
+    }
+
+    /**
+     * Check if given token's data is available in this node.
+     *
+     * @param keyspace keyspace name
+     * @param token token to check
+     * @return true if given token in the keyspace is already streamed and ready to be served.
+     */
+    public boolean isDataAvailable(String keyspace, Token token)
+    {
+        Set<Range<Token>> availableRanges = getAvailableRanges(keyspace, token.getPartitioner());
+        for (Range<Token> range : availableRanges)
+        {
+            if (range.contains(token))
+                return true;
+        }
+        return false;
+    }
+
+    /**
+     * When StreamSession completes, make all keyspaces/ranges in session available to be served.
+     *
+     * @param event Stream event.
+     */
+    @Override
+    public void handleStreamEvent(StreamEvent event)
+    {
+        if (event.eventType == StreamEvent.Type.STREAM_COMPLETE)
+        {
+            StreamEvent.SessionCompleteEvent se = (StreamEvent.SessionCompleteEvent) event;
+            if (se.success)
+            {
+                for (StreamRequest request : se.requests)
+                {
+                    SystemKeyspace.updateAvailableRanges(request.keyspace, request.ranges);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onSuccess(StreamState streamState) {}
+
+    @Override
+    public void onFailure(Throwable throwable) {}
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 3e8ad9d..6a39945 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -162,6 +162,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private Collection<Token> bootstrapTokens = null;
 
+    // true when keeping strict consistency while bootstrapping
+    private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
+    private boolean replacing;
+
+    private final StreamStateStore streamStateStore = new StreamStateStore();
+
     public void finishBootstrapping()
     {
         isBootstrapMode = false;
@@ -425,13 +431,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                                      "Use cassandra.replace_address if you want to replace this node.",
                                                      FBUtilities.getBroadcastAddress()));
         }
-        if (RangeStreamer.useStrictConsistency)
+        if (useStrictConsistency)
         {
             for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
             {
-
-                if (entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
-                        continue;
+                // ignore local node or empty status
+                if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
+                    continue;
                 String[] pieces = entry.getValue().getApplicationState(ApplicationState.STATUS).value.split(VersionedValue.DELIMITER_STR, -1);
                 assert (pieces.length > 0);
                 String state = pieces[0];
@@ -556,6 +562,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }, "StorageServiceShutdownHook");
         Runtime.getRuntime().addShutdownHook(drainOnShutdown);
 
+        replacing = DatabaseDescriptor.isReplacing();
+
         prepareToJoin();
 
         // Has to be called after the host id has potentially changed in prepareToJoin().
@@ -603,11 +611,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             Map<ApplicationState, VersionedValue> appStates = new HashMap<>();
 
-            if (DatabaseDescriptor.isReplacing() && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
+            if (replacing && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
                 throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
             if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null)
                 throw new RuntimeException("Replace method removed; use cassandra.replace_address instead");
-            if (DatabaseDescriptor.isReplacing())
+            if (replacing)
             {
                 if (SystemKeyspace.bootstrapComplete())
                     throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
@@ -712,7 +720,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     ))
                 throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
 
-            if (!DatabaseDescriptor.isReplacing())
+            if (!replacing)
             {
                 if (tokenMetadata.isMember(FBUtilities.getBroadcastAddress()))
                 {
@@ -925,7 +933,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
 
-        RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), "Rebuild");
+        RangeStreamer streamer = new RangeStreamer(tokenMetadata,
+                                                   null,
+                                                   FBUtilities.getBroadcastAddress(),
+                                                   "Rebuild",
+                                                   !replacing && useStrictConsistency,
+                                                   DatabaseDescriptor.getEndpointSnitch(),
+                                                   streamStateStore);
         streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
         if (sourceDc != null)
             streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
@@ -999,10 +1013,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         isBootstrapMode = true;
         SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
-        if (!DatabaseDescriptor.isReplacing())
+        if (!replacing)
         {
             // if not an existing token then bootstrap
-            List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
+            List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>();
             states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
             states.add(Pair.create(ApplicationState.STATUS, valueFactory.bootstrapping(tokens)));
             Gossiper.instance.addLocalApplicationStates(states);
@@ -1017,8 +1031,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         if (!Gossiper.instance.seenAnySeed())
             throw new IllegalStateException("Unable to contact any seeds!");
+
+        if (Boolean.getBoolean("cassandra.reset_bootstrap_progress"))
+        {
+            logger.info("Resetting bootstrap progress to start fresh");
+            SystemKeyspace.resetAvailableRanges();
+        }
+
         setMode(Mode.JOINING, "Starting to bootstrap...", true);
-        new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata).bootstrap(); // handles token update
+        new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata).bootstrap(streamStateStore, !replacing && useStrictConsistency); // handles token update
         logger.info("Bootstrap completed! for the tokens {}", tokens);
     }
 
@@ -1544,7 +1565,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             UUID hostId = Gossiper.instance.getHostId(endpoint);
             InetAddress existing = tokenMetadata.getEndpointForHostId(hostId);
-            if (DatabaseDescriptor.isReplacing() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
+            if (replacing && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
                 logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
             else
             {
@@ -1624,7 +1645,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         for (InetAddress ep : endpointsToRemove)
         {
             removeEndpoint(ep);
-            if (DatabaseDescriptor.isReplacing() && DatabaseDescriptor.getReplaceAddress().equals(ep))
+            if (replacing && DatabaseDescriptor.getReplaceAddress().equals(ep))
                 Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
         }
         if (!tokensToUpdateInSystemKeyspace.isEmpty())
@@ -3208,7 +3229,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                             {
                                 List<InetAddress> endpoints = null;
 
-                                if (RangeStreamer.useStrictConsistency)
+                                if (useStrictConsistency)
                                 {
                                     Set<InetAddress> oldEndpoints = Sets.newHashSet(rangeAddresses.get(range));
                                     Set<InetAddress> newEndpoints = Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, tokenMetaCloneAllSettled));
@@ -3242,7 +3263,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                         if (addressList == null || addressList.isEmpty())
                             continue;
 
-                        if (RangeStreamer.useStrictConsistency)
+                        if (useStrictConsistency)
                         {
                             if (addressList.size() > 1)
                                 throw new IllegalStateException("Multiple strict sources found for " + toFetch);
@@ -3277,7 +3298,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     }
 
                     // stream requests
-                    Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace);
+                    Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace, FailureDetector.instance);
                     for (InetAddress address : workMap.keySet())
                     {
                         logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index e3cdce5..de3db9c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.streaming;
 
 import java.net.InetAddress;
+import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.collect.ImmutableSet;
+
 public abstract class StreamEvent
 {
     public static enum Type
@@ -43,6 +46,7 @@ public abstract class StreamEvent
         public final InetAddress peer;
         public final boolean success;
         public final int sessionIndex;
+        public final Set<StreamRequest> requests;
 
         public SessionCompleteEvent(StreamSession session)
         {
@@ -50,6 +54,7 @@ public abstract class StreamEvent
             this.peer = session.peer;
             this.success = session.isSuccess();
             this.sessionIndex = session.sessionIndex();
+            this.requests = ImmutableSet.copyOf(session.requests);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 1b529ed..5a056c4 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -45,8 +45,6 @@ import org.apache.cassandra.streaming.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.RefCounted;
-
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -132,7 +130,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     private StreamResultFuture streamResult;
 
     // stream requests to send to the peer
-    private final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
+    protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
     // streaming tasks are created and managed per ColumnFamily ID
     private final Map<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
     // data receivers, filled after receiving prepare message

http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
index b66a0bd..25bb584 100644
--- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
@@ -1,6 +1,4 @@
-package org.apache.cassandra.db;
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -9,25 +7,19 @@ package org.apache.cassandra.db;
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-
+package org.apache.cassandra.db;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 5d1d8c6..ababd99 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -31,6 +31,7 @@ import org.junit.runner.RunWith;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -75,7 +76,7 @@ public class BootStrapperTest
 
         TokenMetadata tmd = ss.getTokenMetadata();
         assertEquals(numOldNodes, tmd.sortedTokens().size());
-        RangeStreamer s = new RangeStreamer(tmd, myEndpoint, "Bootstrap");
+        RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore());
         IFailureDetector mockFailureDetector = new IFailureDetector()
         {
             public boolean isAlive(InetAddress ep)
@@ -96,7 +97,7 @@ public class BootStrapperTest
         Collection<Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName);
 
         // Check we get get RF new ranges in total
-        Set<Range<Token>> ranges = new HashSet<Range<Token>>();
+        Set<Range<Token>> ranges = new HashSet<>();
         for (Map.Entry<InetAddress, Collection<Range<Token>>> e : toFetch)
             ranges.addAll(e.getValue());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
new file mode 100644
index 0000000..c8b9f05
--- /dev/null
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.net.InetAddress;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StreamStateStoreTest
+{
+
+    @Test
+    public void testUpdateAndQueryAvailableRanges()
+    {
+        // let range (0, 100] of keyspace1 be bootstrapped.
+        IPartitioner p = new Murmur3Partitioner();
+        Token.TokenFactory factory = p.getTokenFactory();
+        Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100"));
+
+        InetAddress local = FBUtilities.getBroadcastAddress();
+        StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true);
+        session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"), 0);
+
+        StreamStateStore store = new StreamStateStore();
+        // session complete event that is not completed makes data not available for keyspace/ranges
+        store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
+        assertFalse(store.isDataAvailable("keyspace1", factory.fromString("50")));
+
+        // successfully completed session adds available keyspace/ranges
+        session.state(StreamSession.State.COMPLETE);
+        store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
+        // check if token in range (0, 100] appears available.
+        assertTrue(store.isDataAvailable("keyspace1", factory.fromString("50")));
+        // check if token out of range returns false
+        assertFalse(store.isDataAvailable("keyspace1", factory.fromString("0")));
+        assertFalse(store.isDataAvailable("keyspace1", factory.fromString("101")));
+        // check if different keyspace returns false
+        assertFalse(store.isDataAvailable("keyspace2", factory.fromString("50")));
+
+        // add different range within the same keyspace
+        Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200"));
+        session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true);
+        session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"), 0);
+        session.state(StreamSession.State.COMPLETE);
+        store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
+
+        // newly added range should be available
+        assertTrue(store.isDataAvailable("keyspace1", factory.fromString("101")));
+        // as well as the old one
+        assertTrue(store.isDataAvailable("keyspace1", factory.fromString("50")));
+    }
+}
\ No newline at end of file


Mime
View raw message