cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [2/3] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Date Fri, 09 Jan 2015 17:22:46 GMT
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/service/StorageProxy.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f62e292
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f62e292
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f62e292

Branch: refs/heads/trunk
Commit: 7f62e292867bb6159592bfc8b0423f89f518a2b5
Parents: 14b2d7a dd62f7b
Author: Tyler Hobbs <tyler@datastax.com>
Authored: Fri Jan 9 11:19:37 2015 -0600
Committer: Tyler Hobbs <tyler@datastax.com>
Committed: Fri Jan 9 11:19:37 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                          |  2 ++
 .../cassandra/cql3/statements/SelectStatement.java   |  6 +++++-
 .../apache/cassandra/db/AbstractRangeCommand.java    | 13 +++++++++++++
 .../org/apache/cassandra/db/ColumnFamilyStore.java   |  4 +++-
 src/java/org/apache/cassandra/db/DataRange.java      | 12 ++++++++++++
 .../apache/cassandra/db/filter/ExtendedFilter.java   |  6 ++++++
 .../apache/cassandra/db/filter/SliceQueryFilter.java |  6 ++++++
 .../org/apache/cassandra/service/StorageProxy.java   | 15 ++++++++-------
 8 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2028633,0c7e9a2..abe3fce
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,56 -1,6 +1,58 @@@
 -2.0.12:
 +2.1.3
 + * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
 + * Fix case-sensitivity of index name on CREATE and DROP INDEX
 +   statements (CASSANDRA-8365)
 + * Better detection/logging for corruption in compressed sstables (CASSANDRA-8192)
 + * Use the correct repairedAt value when closing writer (CASSANDRA-8570)
 + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
 + * Properly calculate expected write size during compaction (CASSANDRA-8532)
 + * Invalidate affected prepared statements when a table's columns
 +   are altered (CASSANDRA-7910)
 + * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
 + * Fix regression in SSTableRewriter causing some rows to become unreadable 
 +   during compaction (CASSANDRA-8429)
 + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
 + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
 +   is disabled (CASSANDRA-8288)
 + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
 + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
 + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
 + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
 + * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
 + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
 + * Disable mmap on Windows (CASSANDRA-6993)
 + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
 + * Add auth support to cassandra-stress (CASSANDRA-7985)
 + * Fix ArrayIndexOutOfBoundsException when generating error message
 +   for some CQL syntax errors (CASSANDRA-8455)
 + * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
 + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
 + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
 + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
 + * Remove tmplink files for offline compactions (CASSANDRA-8321)
 + * Reduce maxHintsInProgress (CASSANDRA-8415)
 + * BTree updates may call provided update function twice (CASSANDRA-8018)
 + * Release sstable references after anticompaction (CASSANDRA-8386)
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
 +Merged from 2.0:
+  * Fix DISTINCT queries with LIMITs or paging when some partitions
+    contain only tombstones (CASSANDRA-8490)
   * Introduce background cache refreshing to permissions cache
     (CASSANDRA-8194)
   * Fix race condition in StreamTransferTask that could lead to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 487c6f0,858578f..7508c4e
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@@ -44,7 -40,14 +44,13 @@@ import org.apache.cassandra.tracing.Tra
  public class SliceQueryFilter implements IDiskAtomFilter
  {
      private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class);
 -    public static final Serializer serializer = new Serializer();
  
+     /**
+      * A special value for compositesToGroup that indicates that partitioned tombstones
should not be included in results
+      * or count towards the limit.  See CASSANDRA-8490 for more details on why this is needed
(and done this way).
+      **/
+     public static final int IGNORE_TOMBSTONED_PARTITIONS = -2;
+ 
      public final ColumnSlice[] slices;
      public final boolean reversed;
      public volatile int count;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f62e292/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 780df21,45af1c8..72577a6
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1631,141 -1517,109 +1632,141 @@@ public class StorageProxy implements St
              List<InetAddress> nextFilteredEndpoints = null;
              while (i < ranges.size())
              {
 -                AbstractBounds<RowPosition> range = nextRange == null
 -                                                  ? ranges.get(i)
 -                                                  : nextRange;
 -                List<InetAddress> liveEndpoints = nextEndpoints == null
 -                                                ? getLiveSortedEndpoints(keyspace, range.right)
 -                                                : nextEndpoints;
 -                List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
 -                                                    ? consistency_level.filterForQuery(keyspace,
liveEndpoints)
 -                                                    : nextFilteredEndpoints;
 -                ++i;
 -
 -                // getRestrictedRange has broken the queried range into per-[vnode] token
ranges, but this doesn't take
 -                // the replication factor into account. If the intersection of live endpoints
for 2 consecutive ranges
 -                // still meets the CL requirements, then we can merge both ranges into the
same RangeSliceCommand.
 -                while (i < ranges.size())
 +                List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>>
scanHandlers = new ArrayList<>(concurrencyFactor);
 +                int concurrentFetchStartingIndex = i;
 +                int concurrentRequests = 0;
 +                while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
                  {
 -                    nextRange = ranges.get(i);
 -                    nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
 -                    nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
 -
 -                    /*
 -                     * If the current range right is the min token, we should stop merging
because CFS.getRangeSlice
 -                     * don't know how to deal with a wrapping range.
 -                     * Note: it would be slightly more efficient to have CFS.getRangeSlice
on the destination nodes unwraps
 -                     * the range if necessary and deal with it. However, we can't start
sending wrapped range without breaking
 -                     * wire compatibility, so It's likely easier not to bother;
 -                     */
 -                    if (range.right.isMinimum())
 -                        break;
 -
 -                    List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
 -
 -                    // Check if there is enough endpoint for the merge to be possible.
 -                    if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
 -                        break;
 -
 -                    List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace,
merged);
 -
 -                    // Estimate whether merging will be a win or not
 -                    if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged,
filteredEndpoints, nextFilteredEndpoints))
 -                        break;
 -
 -                    // If we get there, merge this range and the next one
 -                    range = range.withNewRight(nextRange.right);
 -                    liveEndpoints = merged;
 -                    filteredEndpoints = filteredMerged;
 +                    AbstractBounds<RowPosition> range = nextRange == null
 +                                                      ? ranges.get(i)
 +                                                      : nextRange;
 +                    List<InetAddress> liveEndpoints = nextEndpoints == null
 +                                                    ? getLiveSortedEndpoints(keyspace, range.right)
 +                                                    : nextEndpoints;
 +                    List<InetAddress> filteredEndpoints = nextFilteredEndpoints ==
null
 +                                                        ? consistency_level.filterForQuery(keyspace,
liveEndpoints)
 +                                                        : nextFilteredEndpoints;
                      ++i;
 -                }
 +                    ++concurrentRequests;
  
 -                AbstractRangeCommand nodeCmd = command.forSubRange(range);
 -
 -                // collect replies and resolve according to consistency level
 -                RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace,
command.timestamp);
 -                List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0,
Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
 -                ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver,
consistency_level, nodeCmd, minimalEndpoints);
 -                handler.assureSufficientLiveNodes();
 -                resolver.setSources(filteredEndpoints);
 -                if (filteredEndpoints.size() == 1
 -                    && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
 -                    && OPTIMIZE_LOCAL_REQUESTS)
 -                {
 -                    StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd,
handler));
 -                }
 -                else
 -                {
 -                    MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
 -                    for (InetAddress endpoint : filteredEndpoints)
 +                    // getRestrictedRange has broken the queried range into per-[vnode]
token ranges, but this doesn't take
 +                    // the replication factor into account. If the intersection of live
endpoints for 2 consecutive ranges
 +                    // still meets the CL requirements, then we can merge both ranges into
the same RangeSliceCommand.
 +                    while (i < ranges.size())
                      {
 -                        Tracing.trace("Enqueuing request to {}", endpoint);
 -                        MessagingService.instance().sendRR(message, endpoint, handler);
 +                        nextRange = ranges.get(i);
 +                        nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
 +                        nextFilteredEndpoints = consistency_level.filterForQuery(keyspace,
nextEndpoints);
 +
 +                        // If the current range right is the min token, we should stop merging
because CFS.getRangeSlice
 +                        // don't know how to deal with a wrapping range.
 +                        // Note: it would be slightly more efficient to have CFS.getRangeSlice
on the destination nodes unwraps
 +                        // the range if necessary and deal with it. However, we can't start
sending wrapped range without breaking
 +                        // wire compatibility, so It's likely easier not to bother;
 +                        if (range.right.isMinimum())
 +                            break;
 +
 +                        List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
 +
 +                        // Check if there is enough endpoint for the merge to be possible.
 +                        if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
 +                            break;
 +
 +                        List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace,
merged);
 +
 +                        // Estimate whether merging will be a win or not
 +                        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged,
filteredEndpoints, nextFilteredEndpoints))
 +                            break;
 +
 +                        // If we get there, merge this range and the next one
 +                        range = range.withNewRight(nextRange.right);
 +                        liveEndpoints = merged;
 +                        filteredEndpoints = filteredMerged;
 +                        ++i;
                      }
 -                }
  
 -                try
 -                {
 -                    for (Row row : handler.get())
 +                    AbstractRangeCommand nodeCmd = command.forSubRange(range);
 +
 +                    // collect replies and resolve according to consistency level
 +                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace,
command.timestamp);
 +                    List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0,
Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
 +                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = new
ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
 +                    handler.assureSufficientLiveNodes();
 +                    resolver.setSources(filteredEndpoints);
 +                    if (filteredEndpoints.size() == 1
 +                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
 +                        && OPTIMIZE_LOCAL_REQUESTS)
                      {
 -                        rows.add(row);
 -                        if (countLiveRows)
 -                            liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
 +                        StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd,
handler), Tracing.instance.get());
                      }
 -                    FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
 +                    else
 +                    {
 +                        MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
 +                        for (InetAddress endpoint : filteredEndpoints)
 +                        {
 +                            Tracing.trace("Enqueuing request to {}", endpoint);
 +                            MessagingService.instance().sendRR(message, endpoint, handler);
 +                        }
 +                    }
 +                    scanHandlers.add(Pair.create(nodeCmd, handler));
                  }
 -                catch (ReadTimeoutException ex)
 +                Tracing.trace("Submitted {} concurrent range requests covering {} ranges",
concurrentRequests, i - concurrentFetchStartingIndex);
 +
 +                List<AsyncOneResponse> repairResponses = new ArrayList<>();
 +                for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>
cmdPairHandler : scanHandlers)
                  {
 -                    // we timed out waiting for responses
 -                    int blockFor = consistency_level.blockFor(keyspace);
 -                    int responseCount = resolver.responses.size();
 -                    String gotData = responseCount > 0
 -                                     ? resolver.isDataPresent() ? " (including data)" :
" (only digests)"
 -                                     : "";
 +                    AbstractRangeCommand nodeCmd = cmdPairHandler.left;
 +                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
 +                    RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
  
 -                    if (Tracing.isTracing())
 +                    try
                      {
 -                        Tracing.trace("Timed out; received {} of {} responses{} for range
{} of {}",
 -                                new Object[]{ responseCount, blockFor, gotData, i, ranges.size()
});
 +                        for (Row row : handler.get())
 +                        {
 +                            rows.add(row);
-                             if (nodeCmd.countCQL3Rows())
-                                 cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
++                            if (countLiveRows)
++                                liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
 +                        }
 +                        repairResponses.addAll(resolver.repairResults);
                      }
 -                    else if (logger.isDebugEnabled())
 +                    catch (ReadTimeoutException ex)
                      {
 -                        logger.debug("Range slice timeout; received {} of {} responses{}
for range {} of {}",
 -                                responseCount, blockFor, gotData, i, ranges.size());
 +                        // we timed out waiting for responses
 +                        int blockFor = consistency_level.blockFor(keyspace);
 +                        int responseCount = resolver.responses.size();
 +                        String gotData = responseCount > 0
 +                                         ? resolver.isDataPresent() ? " (including data)"
: " (only digests)"
 +                                         : "";
 +
 +                        if (Tracing.isTracing())
 +                        {
 +                            Tracing.trace("Timed out; received {} of {} responses{} for
range {} of {}",
 +                                          new Object[]{ responseCount, blockFor, gotData,
i, ranges.size() });
 +                        }
 +                        else if (logger.isDebugEnabled())
 +                        {
 +                            logger.debug("Range slice timeout; received {} of {} responses{}
for range {} of {}",
 +                                         responseCount, blockFor, gotData, i, ranges.size());
 +                        }
 +                        throw ex;
                      }
 -                    throw ex;
 +                    catch (DigestMismatchException e)
 +                    {
 +                        throw new AssertionError(e); // no digests in range slices yet
 +                    }
 +
 +                    // if we're done, great, otherwise, move to the next range
-                     int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
++                    int count = countLiveRows ? liveRowCount : rows.size();
 +                    if (count >= nodeCmd.limit())
 +                    {
 +                        haveSufficientRows = true;
 +                        break;
 +                    }
 +                }
 +
 +                try
 +                {
 +                    FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
                  }
                  catch (TimeoutException ex)
                  {
@@@ -1777,31 -1631,15 +1778,31 @@@
                          logger.debug("Range slice timeout while read-repairing after receiving
all {} data and digest responses", blockFor);
                      throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor,
true);
                  }
 -                catch (DigestMismatchException e)
 +
 +                if (haveSufficientRows)
 +                    return trim(command, rows);
 +
 +                // we didn't get enough rows in our concurrent fetch; recalculate our concurrency
factor
 +                // based on the results we've seen so far (as long as we still have ranges
left to query)
 +                if (i < ranges.size())
                  {
-                     float fetchedRows = command.countCQL3Rows() ? cql3RowCount : rows.size();
 -                    throw new AssertionError(e); // no digests in range slices yet
++                    float fetchedRows = countLiveRows ? liveRowCount : rows.size();
 +                    float remainingRows = command.limit() - fetchedRows;
 +                    float actualRowsPerRange;
 +                    if (fetchedRows == 0.0)
 +                    {
 +                        // we haven't actually gotten any results, so query all remaining
ranges at once
 +                        actualRowsPerRange = 0.0f;
 +                        concurrencyFactor = ranges.size() - i;
 +                    }
 +                    else
 +                    {
 +                        actualRowsPerRange = i / fetchedRows;
 +                        concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows
/ actualRowsPerRange)));
 +                    }
 +                    logger.debug("Didn't get enough response rows; actual rows per range:
{}; remaining rows: {}, new concurrent requests: {}",
 +                                 actualRowsPerRange, (int) remainingRows, concurrencyFactor);
                  }
 -
 -                // if we're done, great, otherwise, move to the next range
 -                int count = countLiveRows ? liveRowCount : rows.size();
 -                if (count >= nodeCmd.limit())
 -                    break;
              }
          }
          finally


Mime
View raw message