cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [8/9] git commit: merge from 2.0
Date Sat, 22 Mar 2014 02:52:22 GMT
merge from 2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fdae99d7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fdae99d7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fdae99d7

Branch: refs/heads/trunk
Commit: fdae99d76745263af6432b75627f46daf33f4f8f
Parents: 21a1d52 4347982
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Fri Mar 21 21:51:59 2014 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Fri Mar 21 21:51:59 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/service/ReadCallback.java  | 20 ++++++--------------
 .../apache/cassandra/service/StorageProxy.java  |  3 ++-
 3 files changed, 9 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fdae99d7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1b39e30,64dc248..535f894
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,5 +1,36 @@@
 -2.0.7
 +2.1.0-beta2
 + * Eliminate possibility of CL segment appearing twice in active list 
 +   (CASSANDRA-6557)
 + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
 + * Switch CRC component to Adler and include it for compressed sstables 
 +   (CASSANDRA-4165)
 + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
 + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
 + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
 + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
 + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
 + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
 + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
 + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
 + * Scrub should not always clear out repaired status (CASSANDRA-5351)
 + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
 + * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
 + * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
 + * Change caching option syntax (CASSANDRA-6745)
 + * Fix stress to do proper counter reads (CASSANDRA-6835)
 + * Fix help message for stress counter_write (CASSANDRA-6824)
 + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
 + * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849)
 + * Fix race condition in Batch CLE (CASSANDRA-6860)
 + * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
 + * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
 + * Proper compare function for CollectionType (CASSANDRA-6783)
 + * Update native server to Netty 4 (CASSANDRA-6236)
 + * Fix off-by-one error in stress (CASSANDRA-6883)
 + * Make OpOrder AutoCloseable (CASSANDRA-6901)
 + * Remove sync repair JMX interface (CASSANDRA-6900)
 +Merged from 2.0:
+  * Fix race processing range scan responses (CASSANDRA-6820)
   * Allow deleting snapshots from dropped keyspaces (CASSANDRA-6821)
   * Add uuid() function (CASSANDRA-6473)
   * Omit tombstones from schema digests (CASSANDRA-6862)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fdae99d7/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fdae99d7/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index c8a161c,12f9ece..f31e092
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1460,137 -1420,109 +1460,138 @@@ public class StorageProxy implements St
              List<InetAddress> nextFilteredEndpoints = null;
              while (i < ranges.size())
              {
 -                AbstractBounds<RowPosition> range = nextRange == null
 -                                                  ? ranges.get(i)
 -                                                  : nextRange;
 -                List<InetAddress> liveEndpoints = nextEndpoints == null
 -                                                ? getLiveSortedEndpoints(keyspace, range.right)
 -                                                : nextEndpoints;
 -                List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
 -                                                    ? consistency_level.filterForQuery(keyspace,
liveEndpoints)
 -                                                    : nextFilteredEndpoints;
 -                ++i;
 -
 -                // getRestrictedRange has broken the queried range into per-[vnode] token
ranges, but this doesn't take
 -                // the replication factor into account. If the intersection of live endpoints
for 2 consecutive ranges
 -                // still meets the CL requirements, then we can merge both ranges into the
same RangeSliceCommand.
 -                while (i < ranges.size())
 +                List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>>
scanHandlers = new ArrayList<>(concurrencyFactor);
 +                int concurrentFetchStartingIndex = i;
 +                while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
                  {
 -                    nextRange = ranges.get(i);
 -                    nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
 -                    nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
 -
 -                    /*
 -                     * If the current range right is the min token, we should stop merging
because CFS.getRangeSlice
 -                     * don't know how to deal with a wrapping range.
 -                     * Note: it would be slightly more efficient to have CFS.getRangeSlice
on the destination nodes unwraps
 -                     * the range if necessary and deal with it. However, we can't start
sending wrapped range without breaking
 -                     * wire compatibility, so It's likely easier not to bother;
 -                     */
 -                    if (range.right.isMinimum())
 -                        break;
 -
 -                    List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
 -
 -                    // Check if there is enough endpoint for the merge to be possible.
 -                    if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
 -                        break;
 -
 -                    List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace,
merged);
 -
 -                    // Estimate whether merging will be a win or not
 -                    if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged,
filteredEndpoints, nextFilteredEndpoints))
 -                        break;
 -
 -                    // If we get there, merge this range and the next one
 -                    range = range.withNewRight(nextRange.right);
 -                    liveEndpoints = merged;
 -                    filteredEndpoints = filteredMerged;
 +                    AbstractBounds<RowPosition> range = nextRange == null
 +                                                      ? ranges.get(i)
 +                                                      : nextRange;
 +                    List<InetAddress> liveEndpoints = nextEndpoints == null
 +                                                    ? getLiveSortedEndpoints(keyspace, range.right)
 +                                                    : nextEndpoints;
 +                    List<InetAddress> filteredEndpoints = nextFilteredEndpoints ==
null
 +                                                        ? consistency_level.filterForQuery(keyspace,
liveEndpoints)
 +                                                        : nextFilteredEndpoints;
                      ++i;
 -                }
  
 -                AbstractRangeCommand nodeCmd = command.forSubRange(range);
 -
 -                // collect replies and resolve according to consistency level
 -                RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace,
command.timestamp);
 -                List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0,
Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
 -                ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver,
consistency_level, nodeCmd, minimalEndpoints);
 -                handler.assureSufficientLiveNodes();
 -                resolver.setSources(filteredEndpoints);
 -                if (filteredEndpoints.size() == 1
 -                    && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
 -                    && OPTIMIZE_LOCAL_REQUESTS)
 -                {
 -                    StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd,
handler));
 -                }
 -                else
 -                {
 -                    MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
 -                    for (InetAddress endpoint : filteredEndpoints)
 +                    // getRestrictedRange has broken the queried range into per-[vnode]
token ranges, but this doesn't take
 +                    // the replication factor into account. If the intersection of live
endpoints for 2 consecutive ranges
 +                    // still meets the CL requirements, then we can merge both ranges into
the same RangeSliceCommand.
 +                    while (i < ranges.size())
                      {
 -                        Tracing.trace("Enqueuing request to {}", endpoint);
 -                        MessagingService.instance().sendRR(message, endpoint, handler);
 +                        nextRange = ranges.get(i);
 +                        nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
 +                        nextFilteredEndpoints = consistency_level.filterForQuery(keyspace,
nextEndpoints);
 +
 +                        // If the current range right is the min token, we should stop merging
because CFS.getRangeSlice
 +                        // don't know how to deal with a wrapping range.
 +                        // Note: it would be slightly more efficient to have CFS.getRangeSlice
on the destination nodes unwraps
 +                        // the range if necessary and deal with it. However, we can't start
sending wrapped range without breaking
 +                        // wire compatibility, so It's likely easier not to bother;
 +                        if (range.right.isMinimum())
 +                            break;
 +
 +                        List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
 +
 +                        // Check if there is enough endpoint for the merge to be possible.
 +                        if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
 +                            break;
 +
 +                        List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace,
merged);
 +
 +                        // Estimate whether merging will be a win or not
 +                        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged,
filteredEndpoints, nextFilteredEndpoints))
 +                            break;
 +
 +                        // If we get there, merge this range and the next one
 +                        range = range.withNewRight(nextRange.right);
 +                        liveEndpoints = merged;
 +                        filteredEndpoints = filteredMerged;
 +                        ++i;
                      }
 -                }
  
 -                try
 -                {
 -                    for (Row row : handler.get())
 +                    AbstractRangeCommand nodeCmd = command.forSubRange(range);
 +
 +                    // collect replies and resolve according to consistency level
 +                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace,
command.timestamp);
-                     ReadCallback<RangeSliceReply, Iterable<Row>> handler = new
ReadCallback<>(resolver, consistency_level, nodeCmd, filteredEndpoints);
++                    List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0,
Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
++                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = new
ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
 +                    handler.assureSufficientLiveNodes();
 +                    resolver.setSources(filteredEndpoints);
 +                    if (filteredEndpoints.size() == 1
 +                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
 +                        && OPTIMIZE_LOCAL_REQUESTS)
                      {
 -                        rows.add(row);
 -                        if (nodeCmd.countCQL3Rows())
 -                            cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
 +                        StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd,
handler));
                      }
 -                    FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
 +                    else
 +                    {
 +                        MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
 +                        for (InetAddress endpoint : filteredEndpoints)
 +                        {
 +                            Tracing.trace("Enqueuing request to {}", endpoint);
 +                            MessagingService.instance().sendRR(message, endpoint, handler);
 +                        }
 +                    }
 +                    scanHandlers.add(Pair.create(nodeCmd, handler));
                  }
 -                catch (ReadTimeoutException ex)
 +
 +                List<AsyncOneResponse> repairResponses = new ArrayList<>();
 +                for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>
cmdPairHandler : scanHandlers)
                  {
 -                    // we timed out waiting for responses
 -                    int blockFor = consistency_level.blockFor(keyspace);
 -                    int responseCount = resolver.responses.size();
 -                    String gotData = responseCount > 0
 -                                     ? resolver.isDataPresent() ? " (including data)" :
" (only digests)"
 -                                     : "";
 +                    AbstractRangeCommand nodeCmd = cmdPairHandler.left;
 +                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
 +                    RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
  
 -                    if (Tracing.isTracing())
 +                    try
                      {
 -                        Tracing.trace("Timed out; received {} of {} responses{} for range
{} of {}",
 -                                new Object[]{ responseCount, blockFor, gotData, i, ranges.size()
});
 +                        for (Row row : handler.get())
 +                        {
 +                            rows.add(row);
 +                            if (nodeCmd.countCQL3Rows())
 +                                cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
 +                        }
 +                        repairResponses.addAll(resolver.repairResults);
                      }
 -                    else if (logger.isDebugEnabled())
 +                    catch (ReadTimeoutException ex)
 +                    {
 +                        // we timed out waiting for responses
 +                        int blockFor = consistency_level.blockFor(keyspace);
 +                        int responseCount = resolver.responses.size();
 +                        String gotData = responseCount > 0
 +                                         ? resolver.isDataPresent() ? " (including data)"
: " (only digests)"
 +                                         : "";
 +
 +                        if (Tracing.isTracing())
 +                        {
 +                            Tracing.trace("Timed out; received {} of {} responses{} for
range {} of {}",
 +                                          new Object[]{ responseCount, blockFor, gotData,
i, ranges.size() });
 +                        }
 +                        else if (logger.isDebugEnabled())
 +                        {
 +                            logger.debug("Range slice timeout; received {} of {} responses{}
for range {} of {}",
 +                                         responseCount, blockFor, gotData, i, ranges.size());
 +                        }
 +                        throw ex;
 +                    }
 +                    catch (DigestMismatchException e)
                      {
 -                        logger.debug("Range slice timeout; received {} of {} responses{}
for range {} of {}",
 -                                responseCount, blockFor, gotData, i, ranges.size());
 +                        throw new AssertionError(e); // no digests in range slices yet
 +                    }
 +
 +                    // if we're done, great, otherwise, move to the next range
 +                    int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
 +                    if (count >= nodeCmd.limit())
 +                    {
 +                        haveSufficientRows = true;
 +                        break;
                      }
 -                    throw ex;
 +                }
 +
 +                try
 +                {
 +                    FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
                  }
                  catch (TimeoutException ex)
                  {


Mime
View raw message