cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasobr...@apache.org
Subject [4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11
Date Thu, 19 Jan 2017 22:15:04 GMT
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e3d26b6a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e3d26b6a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e3d26b6a

Branch: refs/heads/trunk
Commit: e3d26b6aa18a48a52ec4f5511bdc484a45ec3ce4
Parents: 74559de 7a06df7
Author: Jason Brown <jasedbrown@gmail.com>
Authored: Thu Jan 19 14:08:37 2017 -0800
Committer: Jason Brown <jasedbrown@gmail.com>
Committed: Thu Jan 19 14:13:21 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/index/SecondaryIndexManager.java  |  86 ++++++++++-----
 .../service/pager/AbstractQueryPager.java       |  56 ++++++++--
 .../apache/cassandra/index/CustomIndexTest.java | 106 +++++++++++++++++++
 .../org/apache/cassandra/index/StubIndex.java   |   4 +
 5 files changed, 216 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3d26b6a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8b613e6,a85386b..224ef5d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,121 -1,12 +1,122 @@@
 -3.0.11
 +3.10
 + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
 + * Fixed query monitoring for range queries (CASSANDRA-13050)
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion
enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876
(CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
+  * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
   * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
 - * Stress daemon help is incorrect (CASSANDRA-12563)
 + * Stress daemon help is incorrect(CASSANDRA-12563)
   * Remove ALTER TYPE support (CASSANDRA-12443)
   * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
 - * Set javac encoding to utf-8 (CASSANDRA-11077)
   * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
 - * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
   * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
   * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
   * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3d26b6a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 6adefbd,d39b607..08b4f8b
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@@ -558,40 -537,69 +557,69 @@@ public class SecondaryIndexManager impl
              int nowInSec = cmd.nowInSec();
              boolean readStatic = false;
  
 -            SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION);
 +            SinglePartitionPager pager = new SinglePartitionPager(cmd, null, ProtocolVersion.CURRENT);
              while (!pager.isExhausted())
              {
 -                try (ReadOrderGroup readGroup = cmd.startOrderGroup();
 +                try (ReadExecutionController controller = cmd.executionController();
                       OpOrder.Group writeGroup = Keyspace.writeOrder.start();
-                      RowIterator partition =
-                         PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize,
controller),
-                                                           cmd))
 -                     UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata,
pageSize, readGroup))
++                     UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata,
pageSize, controller))
                  {
-                     Set<Index.Indexer> indexers = indexes.stream()
-                                                          .map(index -> index.indexerFor(key,
-                                                                                        
partition.columns(),
-                                                                                        
nowInSec,
-                                                                                        
writeGroup,
-                                                                                        
IndexTransaction.Type.UPDATE))
-                                                          .filter(Objects::nonNull)
-                                                          .collect(Collectors.toSet());
- 
-                     indexers.forEach(Index.Indexer::begin);
- 
-                     // only process the static row once per partition
-                     if (!readStatic && !partition.staticRow().isEmpty())
-                     {
-                         indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
-                         readStatic = true;
+                     if (!page.hasNext())
+                         break;
+ 
+                     try (UnfilteredRowIterator partition = page.next()) {
+                         Set<Index.Indexer> indexers = indexes.stream()
+                                                              .map(index -> index.indexerFor(key,
+                                                                                        
    partition.columns(),
+                                                                                        
    nowInSec,
+                                                                                        
    writeGroup,
+                                                                                        
    IndexTransaction.Type.UPDATE))
+                                                              .filter(Objects::nonNull)
+                                                              .collect(Collectors.toSet());
+ 
+                         // Short-circuit empty partitions if static row is processed or
isn't read
+                         if (!readStatic && partition.isEmpty() && partition.staticRow().isEmpty())
+                             break;
+ 
+                         indexers.forEach(Index.Indexer::begin);
+ 
+                         if (!readStatic)
+                         {
+                             if (!partition.staticRow().isEmpty())
+                                 indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
+                             indexers.forEach((Index.Indexer i) -> i.partitionDelete(partition.partitionLevelDeletion()));
+                             readStatic = true;
+                         }
+ 
+                         MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(partition.partitionLevelDeletion(),
baseCfs.getComparator(), false);
+ 
+                         while (partition.hasNext())
+                         {
+                             Unfiltered unfilteredRow = partition.next();
+ 
+                             if (unfilteredRow.isRow())
+                             {
+                                 Row row = (Row) unfilteredRow;
+                                 indexers.forEach(indexer -> indexer.insertRow(row));
+                             }
+                             else
+                             {
+                                 assert unfilteredRow.isRangeTombstoneMarker();
+                                 RangeTombstoneMarker marker = (RangeTombstoneMarker) unfilteredRow;
+                                 deletionBuilder.add(marker);
+                             }
+                         }
+ 
+                         MutableDeletionInfo deletionInfo = deletionBuilder.build();
+                         if (deletionInfo.hasRanges())
+                         {
+                             Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
+                             while (iter.hasNext())
+                                 indexers.forEach(indexer -> indexer.rangeTombstone(iter.next()));
+                         }
+ 
+                         indexers.forEach(Index.Indexer::finish);
                      }
- 
-                     while (partition.hasNext())
-                     {
-                         Row row = partition.next();
-                         indexers.forEach(indexer -> indexer.insertRow(row));
-                     }
- 
-                     indexers.forEach(Index.Indexer::finish);
                  }
              }
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3d26b6a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 22ddc84,74ec47d..e937a0d
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@@ -62,27 -64,63 +63,64 @@@ abstract class AbstractQueryPager imple
              return EmptyIterators.partition();
  
          pageSize = Math.min(pageSize, remaining);
-         Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
- 
+         Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
 -        return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState),
pager);
 +        return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState,
queryStartNanoTime), pager);
      }
  
 -    public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup)
throws RequestValidationException, RequestExecutionException
 +    public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
      {
          if (isExhausted())
              return EmptyIterators.partition();
  
          pageSize = Math.min(pageSize, remaining);
-         Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
- 
+         RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
 -        return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(orderGroup),
pager);
 +        return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController),
pager);
      }
  
-     private class Pager extends Transformation<RowIterator>
 -    public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int pageSize,
ReadOrderGroup orderGroup)
++    public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int pageSize,
ReadExecutionController executionController)
+     {
+         if (isExhausted())
+             return EmptyIterators.unfilteredPartition(cfm, false);
+ 
+         pageSize = Math.min(pageSize, remaining);
+         UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec());
+ 
 -        return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(orderGroup),
pager);
++        return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(executionController),
pager);
+     }
+ 
+     private class UnfilteredPager extends Pager<Unfiltered>
+     {
+ 
+         private UnfilteredPager(DataLimits pageLimits, int nowInSec)
+         {
+             super(pageLimits, nowInSec);
+         }
+ 
+         protected BaseRowIterator<Unfiltered> apply(BaseRowIterator<Unfiltered>
partition)
+         {
+             return Transformation.apply(counter.applyTo((UnfilteredRowIterator) partition),
this);
+         }
+     }
+ 
+     private class RowPager extends Pager<Row>
+     {
+ 
+         private RowPager(DataLimits pageLimits, int nowInSec)
+         {
+             super(pageLimits, nowInSec);
+         }
+ 
+         protected BaseRowIterator<Row> apply(BaseRowIterator<Row> partition)
+         {
+             return Transformation.apply(counter.applyTo((RowIterator) partition), this);
+         }
+     }
+ 
+     private abstract class Pager<T extends Unfiltered> extends Transformation<BaseRowIterator<T>>
      {
          private final DataLimits pageLimits;
-         private final DataLimits.Counter counter;
+         protected final DataLimits.Counter counter;
 +        private DecoratedKey currentKey;
          private Row lastRow;
          private boolean isFirstPartition = true;
  
@@@ -93,9 -131,12 +131,9 @@@
          }
  
          @Override
-         public RowIterator applyToPartition(RowIterator partition)
+         public BaseRowIterator<T> applyToPartition(BaseRowIterator<T> partition)
          {
 -            DecoratedKey key = partition.partitionKey();
 -            if (lastKey == null || !lastKey.equals(key))
 -                remainingInPartition = limits.perPartitionCount();
 -            lastKey = key;
 +            currentKey = partition.partitionKey();
  
              // If this is the first partition of this page, this could be the continuation
of a partition we've started
              // on the previous page. In which case, we could have the problem that the partition
has no more "regular"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3d26b6a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3d26b6a/test/unit/org/apache/cassandra/index/StubIndex.java
----------------------------------------------------------------------


Mime
View raw message