cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [06/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2
Date Thu, 26 May 2016 00:14:02 GMT
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/907c8263
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/907c8263
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/907c8263

Branch: refs/heads/trunk
Commit: 907c8263b7c814de31f4b4e17610e56e68b3a4aa
Parents: 148f369 6100eb2
Author: Yuki Morishita <yukim@apache.org>
Authored: Wed May 25 18:49:20 2016 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Wed May 25 18:49:20 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/dht/BootStrapper.java  |  1 +
 .../org/apache/cassandra/dht/RangeStreamer.java | 48 +++++++++++++++-----
 .../cassandra/service/StorageService.java       |  2 +-
 4 files changed, 40 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index acdf2e9,d914420..d6750ab
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,24 -1,5 +1,25 @@@
 -2.1.15
 +2.2.7
 + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches and
 +   report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
+  * Do not consider local node a valid source during replace (CASSANDRA-11848)
   * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739)
   * Add message dropped tasks to nodetool netstats (CASSANDRA-11855)
   * Don't compute expensive MaxPurgeableTimestamp until we've verified there's an 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/BootStrapper.java
index a6b1ad7,dfefbe9..26fa6b3
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@@ -60,21 -55,17 +60,22 @@@ public class BootStrapper extends Progr
  
          this.address = address;
          this.tokens = tokens;
 -        tokenMetadata = tmd;
 +        this.tokenMetadata = tmd;
      }
  
 -    public void bootstrap()
 +    public ListenableFuture<StreamState> bootstrap(StreamStateStore stateStore, boolean
useStrictConsistency)
      {
 -        if (logger.isDebugEnabled())
 -            logger.debug("Beginning bootstrap process");
 -
 -        RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, "Bootstrap");
 +        logger.trace("Beginning bootstrap process");
 +
 +        RangeStreamer streamer = new RangeStreamer(tokenMetadata,
 +                                                   tokens,
 +                                                   address,
 +                                                   "Bootstrap",
 +                                                   useStrictConsistency,
 +                                                   DatabaseDescriptor.getEndpointSnitch(),
 +                                                   stateStore);
          streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
+         streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
  
          for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/RangeStreamer.java
index 8f2dc12,121a351..aef588e
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@@ -111,13 -105,18 +111,24 @@@ public class RangeStreame
          }
      }
  
+     /**
+      * Source filter which excludes the current node from source calculations
+      */
+     public static class ExcludeLocalNodeFilter implements ISourceFilter
+     {
+         public boolean shouldInclude(InetAddress endpoint)
+         {
+             return !FBUtilities.getBroadcastAddress().equals(endpoint);
+         }
+     }
+ 
 -    public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress
address, String description)
 +    public RangeStreamer(TokenMetadata metadata,
 +                         Collection<Token> tokens,
 +                         InetAddress address,
 +                         String description,
 +                         boolean useStrictConsistency,
 +                         IEndpointSnitch snitch,
 +                         StreamStateStore stateStore)
      {
          this.metadata = metadata;
          this.tokens = tokens;
@@@ -146,18 -144,18 +157,18 @@@
          Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictSourcesForRanges(keyspaceName)
                  ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : getAllRangesWithSourcesFor(keyspaceName,
ranges);
  
 -        if (logger.isDebugEnabled())
 +        if (logger.isTraceEnabled())
          {
              for (Map.Entry<Range<Token>, InetAddress> entry : rangesForKeyspace.entries())
 -                logger.debug(String.format("%s: range %s exists on %s", description, entry.getKey(),
entry.getValue()));
 +                logger.trace(String.format("%s: range %s exists on %s", description, entry.getKey(),
entry.getValue()));
          }
  
--        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry :
getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName).asMap().entrySet())
++        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry :
getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName, useStrictConsistency).asMap().entrySet())
          {
 -            if (logger.isDebugEnabled())
 +            if (logger.isTraceEnabled())
              {
                  for (Range<Token> r : entry.getValue())
 -                    logger.debug(String.format("%s: range %s from source %s for keyspace
%s", description, r, entry.getKey(), keyspaceName));
 +                    logger.trace(String.format("%s: range %s from source %s for keyspace
%s", description, r, entry.getKey(), keyspaceName));
              }
              toFetch.put(keyspaceName, entry);
          }
@@@ -272,11 -265,10 +283,12 @@@
       * @param rangesWithSources The ranges we want to fetch (key) and their potential sources
(value)
       * @param sourceFilters A (possibly empty) collection of source filters to apply. In
addition to any filters given
       *                      here, we always exclude ourselves.
 -     * @return
 +     * @param keyspace keyspace name
 +     * @return Map of source endpoint to collection of ranges
       */
      private static Multimap<InetAddress, Range<Token>> getRangeFetchMap(Multimap<Range<Token>,
InetAddress> rangesWithSources,
--                                                                        Collection<ISourceFilter>
sourceFilters, String keyspace)
++                                                                        Collection<ISourceFilter>
sourceFilters, String keyspace,
++                                                                        boolean useStrictConsistency)
      {
          Multimap<InetAddress, Range<Token>> rangeFetchMapMap = HashMultimap.create();
          for (Range<Token> range : rangesWithSources.keySet())
@@@ -305,15 -297,28 +317,29 @@@
              }
  
              if (!foundSource)
-                 throw new IllegalStateException("unable to find sufficient sources for streaming
range " + range + " in keyspace " + keyspace);
+             {
+                 AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy();
+                 if (strat != null && strat.getReplicationFactor() == 1)
+                 {
 -                    if (isNotReplacingAndUsesStrictConsistency())
++                    if (useStrictConsistency)
+                         throw new IllegalStateException("Unable to find sufficient sources
for streaming range " + range + " in keyspace " + keyspace + " with RF=1." +
+                                                         "If you want to ignore this, consider
using system property -Dcassandra.consistent.rangemovement=false.");
+                     else
+                         logger.warn("Unable to find sufficient sources for streaming range
" + range + " in keyspace " + keyspace + " with RF=1. " +
+                                     "Keyspace might be missing data.");
+                 }
+                 else
+                     throw new IllegalStateException("Unable to find sufficient sources for
streaming range " + range + " in keyspace " + keyspace);
+             }
          }
  
          return rangeFetchMapMap;
      }
  
-     public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>,
InetAddress> rangesWithSourceTarget, String keyspace, IFailureDetector fd)
 -    public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>,
InetAddress> rangesWithSourceTarget, String keyspace)
++    public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>,
InetAddress> rangesWithSourceTarget, String keyspace,
++                                                                 IFailureDetector fd, boolean
useStrictConsistency)
      {
-         return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new
FailureDetectorSourceFilter(fd)), keyspace);
 -        return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new
FailureDetectorSourceFilter(FailureDetector.instance)), keyspace);
++        return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new
FailureDetectorSourceFilter(fd)), keyspace, useStrictConsistency);
      }
  
      // For testing purposes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 82d7c8f,507aedb..83639e0
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -3753,7 -3714,7 +3753,7 @@@ public class StorageService extends Not
                      }
  
                      // stream requests
-                     Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints,
keyspace, FailureDetector.instance);
 -                    Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints,
keyspace);
++                    Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints,
keyspace, FailureDetector.instance, useStrictConsistency);
                      for (InetAddress address : workMap.keySet())
                      {
                          logger.debug("Will request range {} of keyspace {} from endpoint
{}", workMap.get(address), keyspace, address);


Mime
View raw message