Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionManager.java
test/unit/org/apache/cassandra/db/ScrubTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4adf29d4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4adf29d4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4adf29d4
Branch: refs/heads/trunk
Commit: 4adf29d4e96eb411b97c2a939f67da0aa25f64d0
Parents: 9bba842 df01403
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Fri Apr 17 16:34:04 2015 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Fri Apr 17 16:34:04 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 32 ++++++++++++--------
.../unit/org/apache/cassandra/db/ScrubTest.java | 2 ++
3 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adf29d4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7f0170f,74ec921..1c55fa3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,100 -1,5 +1,101 @@@
+3.0
+ * Add user/role permissions for user-defined functions (CASSANDRA-7557)
+ * Allow cassandra config to be updated to restart daemon without unloading classes (CASSANDRA-9046)
+ * Don't initialize compaction writer before checking if iter is empty (CASSANDRA-9117)
+ * Remove line number generation from default logback.xml
+ * Don't execute any functions at prepare-time (CASSANDRA-9037)
+ * Share file handles between all instances of a SegmentedFile (CASSANDRA-8893)
+ * Make it possible to major compact LCS (CASSANDRA-7272)
+ * Make FunctionExecutionException extend RequestExecutionException
+ (CASSANDRA-9055)
+ * Add support for SELECT JSON, INSERT JSON syntax and new toJson(), fromJson()
+ functions (CASSANDRA-7970)
+ * Optimise max purgeable timestamp calculation in compaction (CASSANDRA-8920)
+ * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670)
+ * New tool added to validate all sstables in a node (CASSANDRA-5791)
+ * Push notification when tracing completes for an operation (CASSANDRA-7807)
+ * Delay "node up" and "node added" notifications until native protocol server is started
(CASSANDRA-8236)
+ * Compressed Commit Log (CASSANDRA-6809)
+ * Optimise IntervalTree (CASSANDRA-8988)
+ * Add a key-value payload for third party usage (CASSANDRA-8553)
+ * Bump metrics-reporter-config dependency for metrics 3.0 (CASSANDRA-8149)
+ * Partition intra-cluster message streams by size, not type (CASSANDRA-8789)
+ * Add WriteFailureException to native protocol, notify coordinator of
+ write failures (CASSANDRA-8592)
+ * Convert SequentialWriter to nio (CASSANDRA-8709)
+ * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 8761, 8850)
+ * Record client ip address in tracing sessions (CASSANDRA-8162)
+ * Indicate partition key columns in response metadata for prepared
+ statements (CASSANDRA-7660)
+ * Merge UUIDType and TimeUUIDType parse logic (CASSANDRA-8759)
+ * Avoid memory allocation when searching index summary (CASSANDRA-8793)
+ * Optimise (Time)?UUIDType Comparisons (CASSANDRA-8730)
+ * Make CRC32Ex into a separate maven dependency (CASSANDRA-8836)
+ * Use preloaded jemalloc w/ Unsafe (CASSANDRA-8714)
+ * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268)
+ * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657)
+ * Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
+ * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
+ * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
+ * Support direct buffer decompression for reads (CASSANDRA-8464)
+ * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
+ * Group sstables for anticompaction correctly (CASSANDRA-8578)
+ * Add ReadFailureException to native protocol, respond
+ immediately when replicas encounter errors while handling
+ a read request (CASSANDRA-7886)
+ * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
+ * Allow mixing token and partition key restrictions (CASSANDRA-7016)
+ * Support index key/value entries on map collections (CASSANDRA-8473)
+ * Modernize schema tables (CASSANDRA-8261)
+ * Support for user-defined aggregation functions (CASSANDRA-8053)
+ * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
+ * Refactor SelectStatement, return IN results in natural order instead
+ of IN value list order and ignore duplicate values in partition key IN restrictions (CASSANDRA-7981)
+ * Support UDTs, tuples, and collections in user-defined
+ functions (CASSANDRA-7563)
+ * Fix aggregate fn results on empty selection, result column name,
+ and cqlsh parsing (CASSANDRA-8229)
+ * Mark sstables as repaired after full repair (CASSANDRA-7586)
+ * Extend Descriptor to include a format value and refactor reader/writer
+ APIs (CASSANDRA-7443)
+ * Integrate JMH for microbenchmarks (CASSANDRA-8151)
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
+ * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
+ * Support for aggregation functions (CASSANDRA-4914)
+ * Remove cassandra-cli (CASSANDRA-7920)
+ * Accept dollar quoted strings in CQL (CASSANDRA-7769)
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any partition key column (CASSANDRA-7855)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063, 7813, 7708)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+ * Fail on very large batch sizes (CASSANDRA-8011)
+ * Improve concurrency of repair (CASSANDRA-6455, 8208)
+ * Select optimal CRC32 implementation at runtime (CASSANDRA-8614)
+ * Evaluate MurmurHash of Token once per query (CASSANDRA-7096)
+ * Generalize progress reporting (CASSANDRA-8901)
+ * Resumable bootstrap streaming (CASSANDRA-8838, CASSANDRA-8942)
+ * Allow scrub for secondary index (CASSANDRA-5174)
+ * Save repair data to system table (CASSANDRA-5839)
+
2.1.5
+ * Make anticompaction visible in compactionstats (CASSANDRA-9098)
* Improve nodetool getendpoints documentation about the partition
key parameter (CASSANDRA-6458)
* Don't check other keyspaces for schema changes when an user-defined
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adf29d4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 655f0e3,72deb21..e319631
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1159,78 -1059,68 +1159,86 @@@ public class CompactionManager implemen
if (!new File(sstable.getFilename()).exists())
{
logger.info("Skipping anticompaction for {}, required sstable was compacted
and is no longer available.", sstable);
+ i.remove();
continue;
}
+ if (groupMaxDataAge < sstable.maxDataAge)
+ groupMaxDataAge = sstable.maxDataAge;
+ }
- logger.info("Anticompacting {}", sstable);
- Set<SSTableReader> sstableAsSet = new HashSet<>();
- sstableAsSet.add(sstable);
+
+ if (anticompactionGroup.size() == 0)
+ {
+ logger.info("No valid anticompactions for this group, All sstables were compacted
and are no longer available");
+ return 0;
+ }
- File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet,
OperationType.ANTICOMPACTION));
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet,
sstable.maxDataAge, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet,
sstable.maxDataAge, false);
+ logger.info("Anticompacting {}", anticompactionGroup);
+ Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new
HashSet<>(Collections.singleton(sstable)));
- CompactionController controller = new CompactionController(cfs, sstableAsSet,
CFMetaData.DEFAULT_GC_GRACE_SECONDS))
- {
- int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
(int)sstable.estimatedKeys());
- repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination,
expectedBloomFilterSize, repairedAt, sstable));
- unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs,
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+ File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet,
OperationType.ANTICOMPACTION));
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge,
false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet,
groupMaxDataAge, false);
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION,
scanners.scanners, controller);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- metrics.beginCompaction(ci);
- try
+ long repairedKeyCount = 0;
+ long unrepairedKeyCount = 0;
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
+ CompactionController controller = new CompactionController(cfs, sstableAsSet,
CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+ {
+ int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
+
+ repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
+ unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
+
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION,
scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat());
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
- while(iter.hasNext())
++ metrics.beginCompaction(ci);
++ try
+ {
- AbstractCompactedRow row = iter.next();
- // if current range from sstable is repaired, save it into the new repaired
sstable
- if (Range.isInRanges(row.key.getToken(), ranges))
- {
- repairedSSTableWriter.append(row);
- repairedKeyCount++;
- }
- // otherwise save into the new 'non-repaired' table
- else
++ while (iter.hasNext())
{
- unRepairedSSTableWriter.append(row);
- unrepairedKeyCount++;
- while (iter.hasNext())
++ AbstractCompactedRow row = iter.next();
++ // if current range from sstable is repaired, save it into the new repaired
sstable
++ if (Range.isInRanges(row.key.getToken(), ranges))
+ {
- AbstractCompactedRow row = iter.next();
- // if current range from sstable is repaired, save it into the new
repaired sstable
- if (Range.isInRanges(row.key.getToken(), ranges))
- {
- repairedSSTableWriter.append(row);
- repairedKeyCount++;
- }
- // otherwise save into the new 'non-repaired' table
- else
- {
- unRepairedSSTableWriter.append(row);
- unrepairedKeyCount++;
- }
++ repairedSSTableWriter.append(row);
++ repairedKeyCount++;
++ }
++ // otherwise save into the new 'non-repaired' table
++ else
++ {
++ unRepairedSSTableWriter.append(row);
++ unrepairedKeyCount++;
+ }
}
- finally
- {
- metrics.finishCompaction(ci);
- }
- anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
- anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables,
OperationType.ANTICOMPACTION);
}
- catch (Throwable e)
++ finally
+ {
- JVMStabilityInspector.inspectThrowable(e);
- logger.error("Error anticompacting " + sstable, e);
- repairedSSTableWriter.abort();
- unRepairedSSTableWriter.abort();
++ metrics.finishCompaction(ci);
+ }
+ // we have the same readers being rewritten by both writers, so we ask the first
one NOT to close them
+ // so that the second one can do so safely, without leaving us with references
< 0 or any other ugliness
+ List<SSTableReader> anticompactedSSTables = new ArrayList<>();
+ anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
+ anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables,
OperationType.ANTICOMPACTION);
+
+ logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
+ repairedKeyCount
+ unrepairedKeyCount,
+ cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(),
+ anticompactionGroup);
+ return anticompactedSSTables.size();
}
- String format = "Repaired {} keys of {} for {}/{}";
- logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount),
cfs.keyspace, cfs.getColumnFamilyName());
- String format2 = "Anticompaction completed successfully, anticompacted from {} to
{} sstable(s).";
- logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
-
- return anticompactedSSTables;
+ catch (Throwable e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ logger.error("Error anticompacting " + anticompactionGroup, e);
+ repairedSSTableWriter.abort();
+ unRepairedSSTableWriter.abort();
+ }
+ return 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adf29d4/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
|