Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 597311804F for ; Tue, 22 Sep 2015 20:13:14 +0000 (UTC) Received: (qmail 79608 invoked by uid 500); 22 Sep 2015 20:13:13 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 79487 invoked by uid 500); 22 Sep 2015 20:13:13 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 79231 invoked by uid 99); 22 Sep 2015 20:13:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Sep 2015 20:13:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 60F74E0440; Tue, 22 Sep 2015 20:13:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blerer@apache.org To: commits@cassandra.apache.org Date: Tue, 22 Sep 2015 20:13:18 -0000 Message-Id: In-Reply-To: <8ad7bcc28f3a4a1abeb1c86171e9eae1@git.apache.org> References: <8ad7bcc28f3a4a1abeb1c86171e9eae1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [7/9] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0 http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Keyspace.java index 0676e45,92a0950..cf34e9a --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@@ -302,16 -264,14 +302,16 @@@ public class Keyspac createReplicationStrategy(metadata); this.metric = new KeyspaceMetrics(this); - for (CFMetaData cfm : new ArrayList<>(metadata.cfMetaData().values())) + this.viewManager = new ViewManager(this); + for (CFMetaData cfm : metadata.tablesAndViews()) { - logger.debug("Initializing {}.{}", getName(), cfm.cfName); + logger.trace("Initializing {}.{}", getName(), cfm.cfName); initCf(cfm.cfId, cfm.cfName, loadSSTables); } + this.viewManager.reload(); } - private Keyspace(KSMetaData metadata) + private Keyspace(KeyspaceMetadata metadata) { this.metadata = metadata; createReplicationStrategy(metadata); @@@ -415,37 -379,6 +415,37 @@@ if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS)) throw new RuntimeException("Testing write failures"); + Lock lock = null; + boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false); + + if (requiresViewUpdate) + { + lock = ViewManager.acquireLockFor(mutation.key().getKey()); + + if (lock == null) + { + if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) + { - logger.debug("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); ++ logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); + Tracing.trace("Could not acquire MV lock"); + throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); + } + else + { + //This view update can't happen right now. so rather than keep this thread busy + // we will re-apply ourself to the queue and try again later + StageManager.getStage(Stage.MUTATION).execute(() -> { + if (writeCommitLog) + mutation.apply(); + else + mutation.applyUnsafe(); + }); + + return; + } + } + } + int nowInSec = FBUtilities.nowInSeconds(); try (OpOrder.Group opGroup = writeOrder.start()) { // write the mutation to the commitlog and memtables @@@ -502,22 -415,30 +502,22 @@@ /** * @param key row to index - * @param cfs ColumnFamily to index row in - * @param idxNames columns to index, in comparator order + * @param cfs ColumnFamily to index partition in + * @param indexes the indexes to submit the row to */ - public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set idxNames) + public static void indexPartition(DecoratedKey key, ColumnFamilyStore cfs, Set indexes) { - if (logger.isDebugEnabled()) - logger.debug("Indexing partition {} ", cfs.metadata.getKeyValidator().getString(key.getKey())); + if (logger.isTraceEnabled()) - logger.trace("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.getKey())); ++ logger.trace("Indexing partition {} ", cfs.metadata.getKeyValidator().getString(key.getKey())); - try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start()) - { - Set indexes = cfs.indexManager.getIndexesByNames(idxNames); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, + FBUtilities.nowInSeconds(), + key); - Iterator pager = QueryPagers.pageRowLocally(cfs, key.getKey(), DEFAULT_PAGE_SIZE); - while (pager.hasNext()) - { - ColumnFamily cf = pager.next(); - ColumnFamily cf2 = cf.cloneMeShallow(); - for (Cell cell : cf) - { - if (cfs.indexManager.indexes(cell.name(), indexes)) - cf2.addColumn(cell); - } - cfs.indexManager.indexRow(key.getKey(), cf2, opGroup); - } + try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start(); + UnfilteredRowIterator partition = cmd.queryMemtableAndDisk(cfs, opGroup)) + { + cfs.indexManager.indexPartition(partition, opGroup, indexes, cmd.nowInSec()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Memtable.java index 7af65d1,e96a71e..81decde --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@@ -354,17 -355,18 +354,17 @@@ public class Memtable implements Compar protected Directories getDirectories() { - return cfs.directories; + return cfs.getDirectories(); } - private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory) + private Collection writeSortedContents(ReplayPosition context, File sstableDirectory) { - logger.info("Writing {}", Memtable.this.toString()); + logger.debug("Writing {}", Memtable.this.toString()); - SSTableReader ssTable; - // errors when creating the writer that may leave empty temp files. - try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory))) + Collection ssTables; + try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get())) { - boolean trackContention = logger.isDebugEnabled(); + boolean trackContention = logger.isTraceEnabled(); int heavilyContendedRowCount = 0; // (we can't clear out the map as-we-go to free up memory, // since the memtable is being used for queries in the "pending flush" category) @@@ -392,26 -394,26 +392,26 @@@ if (writer.getFilePointer() > 0) { - logger.info(String.format("Completed flushing %s (%s) for commitlog position %s", - writer.getFilename(), - FBUtilities.prettyPrintMemory(writer.getFilePointer()), - context)); + logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s", - writer.getFilename(), - FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()), - context)); ++ writer.getFilename(), ++ FBUtilities.prettyPrintMemory(writer.getFilePointer()), ++ context)); - // temp sstables should contain non-repaired data. - ssTable = writer.finish(true); + // sstables should contain non-repaired data. + ssTables = writer.finish(true); } else { - logger.info("Completed flushing {}; nothing needed to be retained. Commitlog position was {}", + logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}", writer.getFilename(), context); writer.abort(); - ssTable = null; + ssTables = null; } if (heavilyContendedRowCount > 0) - logger.debug(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString())); - logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), Memtable.this.toString())); ++ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString())); - return ssTable; + return ssTables; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java index 6502ed3,ab934c6..3666b27 --- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java @@@ -39,9 -39,9 +39,9 @@@ public class MigrationRequestVerbHandle public void doVerb(MessageIn message, int id) { - logger.debug("Received migration request from {}.", message.from); + logger.trace("Received migration request from {}.", message.from); MessageOut> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE, - LegacySchemaTables.convertSchemaToMutations(), + SchemaKeyspace.convertSchemaToMutations(), MigrationManager.MigrationsSerializer.instance); MessagingService.instance().sendReply(response, id, message.from); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index 6b2585e,308edcd..5c47e1e --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -1320,36 -1122,6 +1320,36 @@@ public final class SystemKeyspac return result.one().getString("release_version"); } + /** + * Check data directories for old files that can be removed when migrating from 2.1 or 2.2 to 3.0, + * these checks can be removed in 4.0, see CASSANDRA-7066 + */ + public static void migrateDataDirs() + { + Iterable dirs = Arrays.asList(DatabaseDescriptor.getAllDataFileLocations()); + for (String dataDir : dirs) + { - logger.debug("Checking directory {} for old files", dataDir); ++ logger.trace("Checking directory {} for old files", dataDir); + File dir = new File(dataDir); + assert dir.exists() : dir + " should have been created by startup checks"; + + for (File ksdir : dir.listFiles((d, n) -> d.isDirectory())) + { + for (File cfdir : ksdir.listFiles((d, n) -> d.isDirectory())) + { + if (Descriptor.isLegacyFile(cfdir)) + { + FileUtils.deleteRecursive(cfdir); + } + else + { + FileUtils.delete(cfdir.listFiles((d, n) -> Descriptor.isLegacyFile(new File(d, n)))); + } + } + } + } + } + private static ByteBuffer rangeToBytes(Range range) { try (DataOutputBuffer out = new DataOutputBuffer()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 7049191,cb02a8c..2668bba --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -404,7 -388,11 +404,7 @@@ public class CommitLogReplaye if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection)) break; } - logger.info("Finished reading {}", file); - } - finally - { - FileUtils.closeQuietly(reader); + logger.debug("Finished reading {}", file); } } @@@ -562,8 -551,8 +562,8 @@@ return; } - if (logger.isDebugEnabled()) - logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}"); + if (logger.isTraceEnabled()) - logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}"); ++ logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}"); Runnable runnable = new WrappedRunnable() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 75d50e7,ea20a1f..77c0cbb --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -157,10 -165,10 +157,10 @@@ public class CompactionManager implemen return Collections.emptyList(); } - logger.debug("Scheduling a background task check for {}.{} with {}", + logger.trace("Scheduling a background task check for {}.{} with {}", cfs.keyspace.getName(), cfs.name, - cfs.getCompactionStrategy().getName()); + cfs.getCompactionStrategyManager().getName()); List> futures = new ArrayList<>(); // we must schedule it at least once, otherwise compaction will stop for a CF until next flush if (executor.isShutdown()) @@@ -210,11 -218,11 +210,11 @@@ return; } - AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); - AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs)); + CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); + AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, FBUtilities.nowInSeconds())); if (task == null) { - logger.debug("No tasks available"); + logger.trace("No tasks available"); return; } task.execute(metrics); @@@ -452,8 -460,8 +452,8 @@@ LifecycleTransaction txn, long repairedAt) throws InterruptedException, IOException { - logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size()); + logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getLiveSSTables()); - logger.debug("Starting anticompaction for ranges {}", ranges); + logger.trace("Starting anticompaction for ranges {}", ranges); Set sstables = new HashSet<>(validatedForRepair); Set mutatedRepairStatuses = new HashSet<>(); Set nonAnticompacting = new HashSet<>(); @@@ -781,10 -788,10 +781,10 @@@ long totalkeysWritten = 0; - long expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), + long expectedBloomFilterSize = Math.max(cfs.metadata.params.minIndexInterval, SSTableReader.getApproximateKeyCount(txn.originals())); - if (logger.isDebugEnabled()) - logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize); + if (logger.isTraceEnabled()) + logger.trace("Expected bloom filter size : {}", expectedBloomFilterSize); logger.info("Cleaning up {}", sstable); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 47c8de8,0000000..bd72c64 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@@ -1,513 -1,0 +1,513 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + + +import java.util.*; +import java.util.concurrent.Callable; + +import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.notifications.*; +import org.apache.cassandra.schema.CompactionParams; +import org.apache.cassandra.service.ActiveRepairService; + +/** + * Manages the compaction strategies. + * + * Currently has two instances of actual compaction strategies - one for repaired data and one for + * unrepaired data. This is done to be able to totally separate the different sets of sstables. + */ +public class CompactionStrategyManager implements INotificationConsumer +{ + private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class); + private final ColumnFamilyStore cfs; + private volatile AbstractCompactionStrategy repaired; + private volatile AbstractCompactionStrategy unrepaired; + private volatile boolean enabled = true; + public boolean isActive = true; + private volatile CompactionParams params; + /* + We keep a copy of the schema compaction parameters here to be able to decide if we + should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER. + + If a user changes the local compaction strategy and then later ALTERs a compaction parameter, + we will use the new compaction parameters. + */ + private CompactionParams schemaCompactionParams; + + public CompactionStrategyManager(ColumnFamilyStore cfs) + { + cfs.getTracker().subscribe(this); - logger.debug("{} subscribed to the data tracker.", this); ++ logger.trace("{} subscribed to the data tracker.", this); + this.cfs = cfs; + reload(cfs.metadata); + params = cfs.metadata.params.compaction; + enabled = params.isEnabled(); + } + + /** + * Return the next background task + * + * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks) + * + */ + public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) + { + if (!isEnabled()) + return null; + + maybeReload(cfs.metadata); + + if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks()) + { + AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore); + if (repairedTask != null) + return repairedTask; + return unrepaired.getNextBackgroundTask(gcBefore); + } + else + { + AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore); + if (unrepairedTask != null) + return unrepairedTask; + return repaired.getNextBackgroundTask(gcBefore); + } + } + + public boolean isEnabled() + { + return enabled && isActive; + } + + public synchronized void resume() + { + isActive = true; + } + + /** + * pause compaction while we cancel all ongoing compactions + * + * Separate call from enable/disable to not have to save the enabled-state externally + */ + public synchronized void pause() + { + isActive = false; + } + + + private void startup() + { + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) + { + if (sstable.openReason != SSTableReader.OpenReason.EARLY) + getCompactionStrategyFor(sstable).addSSTable(sstable); + } + repaired.startup(); + unrepaired.startup(); + } + + /** + * return the compaction strategy for the given sstable + * + * returns differently based on the repaired status + * @param sstable + * @return + */ + private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable) + { + if (sstable.isRepaired()) + return repaired; + else + return unrepaired; + } + + public void shutdown() + { + isActive = false; + repaired.shutdown(); + unrepaired.shutdown(); + } + + public synchronized void maybeReload(CFMetaData metadata) + { + // compare the old schema configuration to the new one, ignore any locally set changes. + if (metadata.params.compaction.equals(schemaCompactionParams)) + return; + reload(metadata); + } + + /** + * Reload the compaction strategies + * + * Called after changing configuration and at startup. + * @param metadata + */ + public synchronized void reload(CFMetaData metadata) + { + boolean disabledWithJMX = !enabled && shouldBeEnabled(); + setStrategy(metadata.params.compaction); + schemaCompactionParams = metadata.params.compaction; + + if (disabledWithJMX || !shouldBeEnabled()) + disable(); + else + enable(); + startup(); + } + + public void replaceFlushed(Memtable memtable, Collection sstables) + { + cfs.getTracker().replaceFlushed(memtable, sstables); + if (sstables != null && !sstables.isEmpty()) + CompactionManager.instance.submitBackground(cfs); + } + + public int getUnleveledSSTables() + { + if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy) + { + int count = 0; + count += ((LeveledCompactionStrategy)repaired).getLevelSize(0); + count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0); + return count; + } + return 0; + } + + public synchronized int[] getSSTableCountPerLevel() + { + if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy) + { + int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT]; + int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize(); + res = sumArrays(res, repairedCountPerLevel); + int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize(); + res = sumArrays(res, unrepairedCountPerLevel); + return res; + } + return null; + } + + private static int[] sumArrays(int[] a, int[] b) + { + int[] res = new int[Math.max(a.length, b.length)]; + for (int i = 0; i < res.length; i++) + { + if (i < a.length && i < b.length) + res[i] = a[i] + b[i]; + else if (i < a.length) + res[i] = a[i]; + else + res[i] = b[i]; + } + return res; + } + + public boolean shouldDefragment() + { + assert repaired.getClass().equals(unrepaired.getClass()); + return repaired.shouldDefragment(); + } + + public Directories getDirectories() + { + assert repaired.getClass().equals(unrepaired.getClass()); + return repaired.getDirectories(); + } + + public synchronized void handleNotification(INotification notification, Object sender) + { + if (notification instanceof SSTableAddedNotification) + { + SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification; + for (SSTableReader sstable : flushedNotification.added) + { + if (sstable.isRepaired()) + repaired.addSSTable(sstable); + else + unrepaired.addSSTable(sstable); + } + } + else if (notification instanceof SSTableListChangedNotification) + { + SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; + Set repairedRemoved = new HashSet<>(); + Set repairedAdded = new HashSet<>(); + Set unrepairedRemoved = new HashSet<>(); + Set unrepairedAdded = new HashSet<>(); + + for (SSTableReader sstable : listChangedNotification.removed) + { + if (sstable.isRepaired()) + repairedRemoved.add(sstable); + else + unrepairedRemoved.add(sstable); + } + for (SSTableReader sstable : listChangedNotification.added) + { + if (sstable.isRepaired()) + repairedAdded.add(sstable); + else + unrepairedAdded.add(sstable); + } + if (!repairedRemoved.isEmpty()) + { + repaired.replaceSSTables(repairedRemoved, repairedAdded); + } + else + { + for (SSTableReader sstable : repairedAdded) + repaired.addSSTable(sstable); + } + + if (!unrepairedRemoved.isEmpty()) + { + unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded); + } + else + { + for (SSTableReader sstable : unrepairedAdded) + unrepaired.addSSTable(sstable); + } + } + else if (notification instanceof SSTableRepairStatusChanged) + { + for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable) + { + if (sstable.isRepaired()) + { + unrepaired.removeSSTable(sstable); + repaired.addSSTable(sstable); + } + else + { + repaired.removeSSTable(sstable); + unrepaired.addSSTable(sstable); + } + } + } + else if (notification instanceof SSTableDeletingNotification) + { + SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting; + if (sstable.isRepaired()) + repaired.removeSSTable(sstable); + else + unrepaired.removeSSTable(sstable); + } + } + + public void enable() + { + if (repaired != null) + repaired.enable(); + if (unrepaired != null) + unrepaired.enable(); + // enable this last to make sure the strategies are ready to get calls. + enabled = true; + } + + public void disable() + { + // disable this first avoid asking disabled strategies for compaction tasks + enabled = false; + if (repaired != null) + repaired.disable(); + if (unrepaired != null) + unrepaired.disable(); + } + + /** + * Create ISSTableScanner from the given sstables + * + * Delegates the call to the compaction strategies to allow LCS to create a scanner + * @param sstables + * @param range + * @return + */ + @SuppressWarnings("resource") + public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection sstables, Collection> ranges) + { + List repairedSSTables = new ArrayList<>(); + List unrepairedSSTables = new ArrayList<>(); + for (SSTableReader sstable : sstables) + { + if (sstable.isRepaired()) + repairedSSTables.add(sstable); + else + unrepairedSSTables.add(sstable); + } + + Set scanners = new HashSet<>(sstables.size()); + + for (Range range : ranges) + { + AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range); + AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range); + + for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners)) + { + if (!scanners.add(scanner)) + scanner.close(); + } + } + + return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners)); + } + + public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection sstables) + { + return getScanners(sstables, Collections.singleton(null)); + } + + public Collection> groupSSTablesForAntiCompaction(Collection sstablesToGroup) + { + return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup); + } + + public long getMaxSSTableBytes() + { + return unrepaired.getMaxSSTableBytes(); + } + + public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes) + { + return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes); + } + + public Collection getMaximalTasks(final int gcBefore, final boolean splitOutput) + { + // runWithCompactionsDisabled cancels active compactions and disables them, then we are able + // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the + // sstables are marked the compactions are re-enabled + return cfs.runWithCompactionsDisabled(new Callable>() + { + @Override + public Collection call() throws Exception + { + synchronized (CompactionStrategyManager.this) + { + Collection repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput); + Collection unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput); + + if (repairedTasks == null && unrepairedTasks == null) + return null; + + if (repairedTasks == null) + return unrepairedTasks; + if (unrepairedTasks == null) + return repairedTasks; + + List tasks = new ArrayList<>(); + tasks.addAll(repairedTasks); + tasks.addAll(unrepairedTasks); + return tasks; + } + } + }, false, false); + } + + public AbstractCompactionTask getUserDefinedTask(Collection sstables, int gcBefore) + { + return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore); + } + + public int getEstimatedRemainingTasks() + { + int tasks = 0; + tasks += repaired.getEstimatedRemainingTasks(); + tasks += unrepaired.getEstimatedRemainingTasks(); + + return tasks; + } + + public boolean shouldBeEnabled() + { + return params.isEnabled(); + } + + public String getName() + { + return unrepaired.getName(); + } + + public List getStrategies() + { + return Arrays.asList(repaired, unrepaired); + } + + public synchronized void setNewLocalCompactionStrategy(CompactionParams params) + { + logger.info("Switching local compaction strategy from {} to {}}", this.params, params); + setStrategy(params); + if (shouldBeEnabled()) + enable(); + else + disable(); + startup(); + } + + private void setStrategy(CompactionParams params) + { + if (repaired != null) + repaired.shutdown(); + if (unrepaired != null) + unrepaired.shutdown(); + repaired = CFMetaData.createCompactionStrategyInstance(cfs, params); + unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params); + this.params = params; + } + + public CompactionParams getCompactionParams() + { + return params; + } + + public boolean onlyPurgeRepairedTombstones() + { + return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES)); + } + + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn) + { + if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) + { + return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + } + else + { + return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 1d96324,575c326..be81c80 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -142,11 -138,13 +142,11 @@@ public class CompactionTask extends Abs ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel())); } ssTableLoggerMsg.append("]"); - String taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString(); - logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); - logger.info("Compacting ({}) {}", taskId, ssTableLoggerMsg); - long start = System.nanoTime(); ++ logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg); + long start = System.nanoTime(); long totalKeysWritten = 0; - long estimatedKeys = 0; try (CompactionController controller = getCompactionController(transaction.originals())) { @@@ -213,11 -220,11 +213,11 @@@ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; long totalSourceRows = 0; - String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); + String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize); - logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", + logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); + taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); - logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); - logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten)); + logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten)); if (offline) Refs.release(Refs.selfRefs(newSStables)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 7fd5717,d90318f..b7bf83f --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@@ -338,9 -338,9 +338,9 @@@ public class LeveledManifes { int nextLevel = getNextLevel(candidates); candidates = getOverlappingStarvedSSTables(nextLevel, candidates); - if (logger.isDebugEnabled()) - logger.debug("Compaction candidates for L{} are {}", i, toString(candidates)); + if (logger.isTraceEnabled()) + logger.trace("Compaction candidates for L{} are {}", i, toString(candidates)); - return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategy().getMaxSSTableBytes()); + return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategyManager().getMaxSSTableBytes()); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 05f446c,b4125bb..f8a8240 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@@ -80,10 -79,10 +80,10 @@@ public class SizeTieredCompactionStrate int minThreshold = cfs.getMinimumCompactionThreshold(); int maxThreshold = cfs.getMaximumCompactionThreshold(); - Iterable candidates = filterSuspectSSTables(Sets.intersection(cfs.getUncompactingSSTables(), sstables)); + Iterable candidates = filterSuspectSSTables(filter(cfs.getUncompactingSSTables(), sstables::contains)); List> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize); - logger.debug("Compaction buckets are {}", buckets); + logger.trace("Compaction buckets are {}", buckets); updateEstimatedCompactionsByTasks(buckets); List mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold); if (!mostInteresting.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 07ca3d0,ed07df9..796391c --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@@ -82,10 -81,19 +82,10 @@@ public class SplittingSizeTieredCompact } } ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex); - File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex]))); long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]); currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]); - @SuppressWarnings("resource") - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), - currentPartitionsToWrite, - minRepairedAt, - cfs.metadata, - cfs.partitioner, - new MetadataCollector(allSSTables, cfs.metadata.comparator, 0)); - - sstableWriter.switchWriter(writer); + switchCompactionLocation(getWriteDirectory(currentBytesToWrite)); - logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite); + logger.trace("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite); } @Override @@@ -100,20 -118,4 +100,20 @@@ } return rie != null; } -} + + public void switchCompactionLocation(Directories.DataDirectory location) + { + long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys); + @SuppressWarnings("resource") + SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))), + currentPartitionsToWrite, + minRepairedAt, + cfs.metadata, + new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), + SerializationHeader.make(cfs.metadata, nonExpiredSSTables), + txn); - logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite); ++ logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite); + sstableWriter.switchWriter(writer); + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 59bbc7d,9b52269..83d0f82 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@@ -222,14 -149,9 +222,14 @@@ public class LifecycleTransaction exten public Throwable doCommit(Throwable accumulate) { assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit"; - logger.debug("Committing update:{}, obsolete:{}", staged.update, staged.obsolete); - + logger.trace("Committing update:{}, obsolete:{}", staged.update, staged.obsolete); + // accumulate must be null if we have been used correctly, so fail immediately if it is not + maybeFail(accumulate); + + // transaction log commit failure means we must abort; safe commit is not possible + maybeFail(log.commit(null)); + // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size // and notification status for the obsolete and new files @@@ -247,23 -167,18 +247,23 @@@ */ public Throwable doAbort(Throwable accumulate) { - if (logger.isDebugEnabled()) - logger.debug("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete); + if (logger.isTraceEnabled()) + logger.trace("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete); + accumulate = abortObsoletion(obsoletions, accumulate); + if (logged.isEmpty() && staged.isEmpty()) - return accumulate; + return log.abort(accumulate); // mark obsolete all readers that are not versions of those present in the original set Iterable obsolete = filterOut(concatUniq(staged.update, logged.update), originals); - logger.debug("Obsoleting {}", obsolete); + logger.trace("Obsoleting {}", obsolete); - // we don't pass the tracker in for the obsoletion, since these readers have never been notified externally - // nor had their size accounting affected - accumulate = markObsolete(null, obsolete, accumulate); + + accumulate = prepareForObsoletion(obsolete, log, obsoletions = new ArrayList<>(), accumulate); + // it's safe to abort even if committed, see maybeFail in doCommit() above, in this case it will just report + // a failure to abort, which is useful information to have for debug + accumulate = log.abort(accumulate); + accumulate = markObsolete(obsoletions, accumulate); // replace all updated readers with a version restored to its original state accumulate = tracker.apply(updateLiveSet(logged.update, restoreUpdatedOriginals()), accumulate); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java index da6d78d,0000000..78ea0f1 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java @@@ -1,419 -1,0 +1,419 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Runnables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LogRecord.Type; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SnapshotDeletingTask; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.Ref; +import org.apache.cassandra.utils.concurrent.RefCounted; +import org.apache.cassandra.utils.concurrent.Transactional; + +/** + * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction, + * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent + * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also + * *requires* that the prepareToCommit() phase only take actions that can be rolled back. + * + * IMPORTANT: The transaction must complete (commit or abort) before any temporary files are deleted, even though the + * txn log file itself will not be deleted until all tracked files are deleted. This is required by FileLister to ensure + * a consistent disk state. LifecycleTransaction ensures this requirement, so this class should really never be used + * outside of LT. @see FileLister.classifyFiles(TransactionData txn) + * + * A class that tracks sstable files involved in a transaction across sstables: + * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails. + * + * The transaction log file contains new and old sstables as follows: + * + * add:[sstable-2][CRC] + * remove:[sstable-1,max_update_time,num files][CRC] + * + * where sstable-2 is a new sstable to be retained if the transaction succeeds and sstable-1 is an old sstable to be + * removed. CRC is an incremental CRC of the file content up to this point. For old sstable files we also log the + * last update time of all files for the sstable descriptor and a checksum of vital properties such as update times + * and file sizes. + * + * Upon commit we add a final line to the log file: + * + * commit:[commit_time][CRC] + * + * When the transaction log is cleaned-up by the TransactionTidier, which happens only after any old sstables have been + * osoleted, then any sstable files for old sstables are removed before deleting the transaction log if the transaction + * was committed, vice-versa if the transaction was aborted. + * + * On start-up we look for any transaction log files and repeat the cleanup process described above. + * + * See CASSANDRA-7066 for full details. + */ +class LogTransaction extends Transactional.AbstractTransactional implements Transactional +{ + private static final Logger logger = LoggerFactory.getLogger(LogTransaction.class); + + /** + * If the format of the lines in the transaction log is wrong or the checksum + * does not match, then we throw this exception. + */ + public static final class CorruptTransactionLogException extends RuntimeException + { + public final LogFile file; + + public CorruptTransactionLogException(String message, LogFile file) + { + super(message); + this.file = file; + } + } + + private final Tracker tracker; + private final LogFile data; + private final Ref selfRef; + // Deleting sstables is tricky because the mmapping might not have been finalized yet, + // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs). + // Additionally, we need to make sure to delete the data file first, so on restart the others + // will be recognized as GCable. + private static final Queue failedDeletions = new ConcurrentLinkedQueue<>(); + + LogTransaction(OperationType opType, CFMetaData metadata) + { + this(opType, metadata, null); + } + + LogTransaction(OperationType opType, CFMetaData metadata, Tracker tracker) + { + this(opType, new Directories(metadata), tracker); + } + + LogTransaction(OperationType opType, Directories directories, Tracker tracker) + { + this(opType, directories.getDirectoryForNewSSTables(), tracker); + } + + LogTransaction(OperationType opType, File folder, Tracker tracker) + { + this.tracker = tracker; + int folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath()); + this.data = new LogFile(opType, folder, folderDescriptor, UUIDGen.getTimeUUID()); + this.selfRef = new Ref<>(this, new TransactionTidier(data, folderDescriptor)); + - if (logger.isDebugEnabled()) - logger.debug("Created transaction logs with id {}", data.id); ++ if (logger.isTraceEnabled()) ++ logger.trace("Created transaction logs with id {}", data.id); + } + + /** + * Track a reader as new. + **/ + void trackNew(SSTable table) + { + data.add(Type.ADD, table); + } + + /** + * Stop tracking a reader as new. + */ + void untrackNew(SSTable table) + { + data.remove(Type.ADD, table); + } + + /** + * Schedule a reader for deletion as soon as it is fully unreferenced. + */ + SSTableTidier obsoleted(SSTableReader reader) + { + if (data.contains(Type.ADD, reader)) + { + if (data.contains(Type.REMOVE, reader)) + throw new IllegalArgumentException(); + + return new SSTableTidier(reader, true, this); + } + + data.add(Type.REMOVE, reader); + + if (tracker != null) + tracker.notifyDeleting(reader); + + return new SSTableTidier(reader, false, this); + } + + OperationType getType() + { + return data.getType(); + } + + UUID getId() + { + return data.getId(); + } + + @VisibleForTesting + String getDataFolder() + { + return data.folder.getPath(); + } + + @VisibleForTesting + LogFile getLogFile() + { + return data; + } + + static void delete(File file) + { + try + { - if (logger.isDebugEnabled()) - logger.debug("Deleting {}", file); ++ if (logger.isTraceEnabled()) ++ logger.trace("Deleting {}", file); + + Files.delete(file.toPath()); + } + catch (NoSuchFileException e) + { + logger.error("Unable to delete {} as it does not exist", file); + } + catch (IOException e) + { + logger.error("Unable to delete {}", file, e); + throw new RuntimeException(e); + } + } + + /** + * The transaction tidier. + * + * When the transaction reference is fully released we try to delete all the obsolete files + * depending on the transaction result, as well as the transaction log file. + */ + private static class TransactionTidier implements RefCounted.Tidy, Runnable + { + private final LogFile data; + private final int folderDescriptor; + + TransactionTidier(LogFile data, int folderDescriptor) + { + this.data = data; + this.folderDescriptor = folderDescriptor; + } + + public void tidy() throws Exception + { + run(); + } + + public String name() + { + return data.toString(); + } + + public void run() + { - if (logger.isDebugEnabled()) - logger.debug("Removing files for transaction {}", name()); ++ if (logger.isTraceEnabled()) ++ logger.trace("Removing files for transaction {}", name()); + + assert data.completed() : "Expected a completed transaction: " + data; + + Throwable err = data.removeUnfinishedLeftovers(null); + + if (err != null) + { + logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err); + failedDeletions.add(this); + } + else + { - if (logger.isDebugEnabled()) - logger.debug("Closing file transaction {}", name()); ++ if (logger.isTraceEnabled()) ++ logger.trace("Closing file transaction {}", name()); + CLibrary.tryCloseFD(folderDescriptor); + } + } + } + + static class Obsoletion + { + final SSTableReader reader; + final SSTableTidier tidier; + + Obsoletion(SSTableReader reader, SSTableTidier tidier) + { + this.reader = reader; + this.tidier = tidier; + } + } + + /** + * The SSTableReader tidier. When a reader is fully released and no longer referenced + * by any one, we run this. It keeps a reference to the parent transaction and releases + * it when done, so that the final transaction cleanup can run when all obsolete readers + * are released. + */ + public static class SSTableTidier implements Runnable + { + // must not retain a reference to the SSTableReader, else leak detection cannot kick in + private final Descriptor desc; + private final long sizeOnDisk; + private final Tracker tracker; + private final boolean wasNew; + private final Ref parentRef; + + public SSTableTidier(SSTableReader referent, boolean wasNew, LogTransaction parent) + { + this.desc = referent.descriptor; + this.sizeOnDisk = referent.bytesOnDisk(); + this.tracker = parent.tracker; + this.wasNew = wasNew; + this.parentRef = parent.selfRef.tryRef(); + } + + public void run() + { + SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation); + + try + { + // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier + File datafile = new File(desc.filenameFor(Component.DATA)); + + delete(datafile); + // let the remainder be cleaned up by delete + SSTable.delete(desc, SSTable.discoverComponentsFor(desc)); + } + catch (Throwable t) + { + logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc); + failedDeletions.add(this); + return; + } + + if (tracker != null && tracker.cfstore != null && !wasNew) + tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk); + + // release the referent to the parent so that the all transaction files can be released + parentRef.release(); + } + + public void abort() + { + parentRef.release(); + } + } + + + static void rescheduleFailedDeletions() + { + Runnable task; + while ( null != (task = failedDeletions.poll())) + ScheduledExecutors.nonPeriodicTasks.submit(task); + + // On Windows, snapshots cannot be deleted so long as a segment of the root element is memory-mapped in NTFS. + SnapshotDeletingTask.rescheduleFailedTasks(); + } + + static void waitForDeletions() + { + FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(Runnables.doNothing(), 0, TimeUnit.MILLISECONDS)); + } + + @VisibleForTesting + Throwable complete(Throwable accumulate) + { + try + { + accumulate = selfRef.ensureReleased(accumulate); + return accumulate; + } + catch (Throwable t) + { + logger.error("Failed to complete file transaction {}", getId(), t); + return Throwables.merge(accumulate, t); + } + } + + protected Throwable doCommit(Throwable accumulate) + { + data.commit(); + return complete(accumulate); + } + + protected Throwable doAbort(Throwable accumulate) + { + data.abort(); + return complete(accumulate); + } + + protected void doPrepare() { } + + /** + * Called on startup to scan existing folders for any unfinished leftovers of + * operations that were ongoing when the process exited. Also called by the standalone + * sstableutil tool when the cleanup option is specified, @see StandaloneSSTableUtil. + * + */ + static void removeUnfinishedLeftovers(CFMetaData metadata) + { + for (File dir : new Directories(metadata).getCFDirectories()) + { + int folderDescriptor = CLibrary.tryOpenDirectory(dir.getPath()); + try + { + File[] logs = dir.listFiles(LogFile::isLogFile); + + for (File log : logs) + { + LogFile data = LogFile.make(log, folderDescriptor); + data.readRecords(); + if (data.verify()) + { + Throwable failure = data.removeUnfinishedLeftovers(null); + if (failure != null) + logger.error("Failed to remove unfinished transaction leftovers for log {}", log, failure); + } + else + { + logger.error("Unexpected disk state: failed to read transaction log {}", log); + } + } + } + finally + { + CLibrary.tryCloseFD(folderDescriptor); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/dht/BootStrapper.java index 31fda34,a6b1ad7..c0f0402 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@@ -178,34 -181,6 +178,34 @@@ public class BootStrapper extends Progr return getRandomTokens(metadata, numTokens); } + private static Collection getSpecifiedTokens(final TokenMetadata metadata, + Collection initialTokens) + { - logger.debug("tokens manually specified as {}", initialTokens); ++ logger.trace("tokens manually specified as {}", initialTokens); + List tokens = new ArrayList<>(initialTokens.size()); + for (String tokenString : initialTokens) + { + Token token = metadata.partitioner.getTokenFactory().fromString(tokenString); + if (metadata.getEndpoint(token) != null) + throw new ConfigurationException("Bootstrapping to existing token " + tokenString + " is not allowed (decommission/removenode the old node first)."); + tokens.add(token); + } + return tokens; + } + + static Collection allocateTokens(final TokenMetadata metadata, + InetAddress address, + String allocationKeyspace, + int numTokens) + { + Keyspace ks = Keyspace.open(allocationKeyspace); + if (ks == null) + throw new ConfigurationException("Problem opening token allocation keyspace " + allocationKeyspace); + AbstractReplicationStrategy rs = ks.getReplicationStrategy(); + + return TokenAllocation.allocateTokens(metadata, rs, address, numTokens); + } + public static Collection getRandomTokens(TokenMetadata metadata, int numTokens) { Set tokens = new HashSet<>(numTokens); http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java index 3f74c33,36da92d..ec5167b --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java @@@ -98,238 -78,4 +98,238 @@@ public class CqlInputFormat extends org return new CqlRecordReader(); } + protected void validateConfiguration(Configuration conf) + { + if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null) + { + throw new UnsupportedOperationException("you must set the keyspace and table with setInputColumnFamily()"); + } + if (ConfigHelper.getInputInitialAddress(conf) == null) + throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress"); + if (ConfigHelper.getInputPartitioner(conf) == null) + throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner"); + } + + public List getSplits(JobContext context) throws IOException + { + Configuration conf = HadoopCompat.getConfiguration(context); + + validateConfiguration(conf); + + keyspace = ConfigHelper.getInputKeyspace(conf); + cfName = ConfigHelper.getInputColumnFamily(conf); + partitioner = ConfigHelper.getInputPartitioner(conf); - logger.debug("partitioner is {}", partitioner); ++ logger.trace("partitioner is {}", partitioner); + + // canonical ranges and nodes holding replicas + Map> masterRangeNodes = getRangeMap(conf, keyspace); + + // canonical ranges, split into pieces, fetching the splits in parallel + ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue()); + List splits = new ArrayList<>(); + + try + { + List>> splitfutures = new ArrayList<>(); + KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); + Range jobRange = null; + if (jobKeyRange != null) + { + if (jobKeyRange.start_key != null) + { + if (!partitioner.preservesOrder()) + throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner"); + if (jobKeyRange.start_token != null) + throw new IllegalArgumentException("only start_key supported"); + if (jobKeyRange.end_token != null) + throw new IllegalArgumentException("only start_key supported"); + jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key), + partitioner.getToken(jobKeyRange.end_key)); + } + else if (jobKeyRange.start_token != null) + { + jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token), + partitioner.getTokenFactory().fromString(jobKeyRange.end_token)); + } + else + { + logger.warn("ignoring jobKeyRange specified without start_key or start_token"); + } + } + + session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect(); + Metadata metadata = session.getCluster().getMetadata(); + + for (TokenRange range : masterRangeNodes.keySet()) + { + if (jobRange == null) + { + // for each tokenRange, pick a live owner and ask it to compute bite-sized splits + splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf))); + } + else + { + TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange); + if (range.intersects(jobTokenRange)) + { + for (TokenRange intersection: range.intersectWith(jobTokenRange)) + { + // for each tokenRange, pick a live owner and ask it to compute bite-sized splits + splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf))); + } + } + } + } + + // wait until we have all the results back + for (Future> futureInputSplits : splitfutures) + { + try + { + splits.addAll(futureInputSplits.get()); + } + catch (Exception e) + { + throw new IOException("Could not get input splits", e); + } + } + } + finally + { + executor.shutdownNow(); + } + + assert splits.size() > 0; + Collections.shuffle(splits, new Random(System.nanoTime())); + return splits; + } + + private TokenRange rangeToTokenRange(Metadata metadata, Range range) + { + return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)), + metadata.newToken(partitioner.getTokenFactory().toString(range.right))); + } + + private Map getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException + { + int splitSize = ConfigHelper.getInputSplitSize(conf); + try + { + return describeSplits(keyspace, cfName, range, splitSize); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + private Map> getRangeMap(Configuration conf, String keyspace) + { + try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect()) + { + Map> map = new HashMap<>(); + Metadata metadata = session.getCluster().getMetadata(); + for (TokenRange tokenRange : metadata.getTokenRanges()) + map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange)); + return map; + } + } + + private Map describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize) + { + String query = String.format("SELECT mean_partition_size, partitions_count " + + "FROM %s.%s " + + "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?", + SystemKeyspace.NAME, + SystemKeyspace.SIZE_ESTIMATES); + + ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()); + + Row row = resultSet.one(); + // If we have no data on this split, return the full split i.e., do not sub-split + // Assume smallest granularity of partition count available from CASSANDRA-7688 + if (row == null) + { + Map wrappedTokenRange = new HashMap<>(); + wrappedTokenRange.put(tokenRange, (long) 128); + return wrappedTokenRange; + } + + long meanPartitionSize = row.getLong("mean_partition_size"); + long partitionCount = row.getLong("partitions_count"); + + int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize); + List splitRanges = tokenRange.splitEvenly(splitCount); + Map rangesWithLength = new HashMap<>(); + for (TokenRange range : splitRanges) + rangesWithLength.put(range, partitionCount/splitCount); + + return rangesWithLength; + } + + // Old Hadoop API + public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException + { + TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID()); + List newInputSplits = this.getSplits(tac); + InputSplit[] oldInputSplits = new InputSplit[newInputSplits.size()]; + for (int i = 0; i < newInputSplits.size(); i++) + oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i); + return oldInputSplits; + } + + /** + * Gets a token tokenRange and splits it up according to the suggested + * size into input splits that Hadoop can use. + */ + class SplitCallable implements Callable> + { + + private final TokenRange tokenRange; + private final Set hosts; + private final Configuration conf; + + public SplitCallable(TokenRange tr, Set hosts, Configuration conf) + { + this.tokenRange = tr; + this.hosts = hosts; + this.conf = conf; + } + + public List call() throws Exception + { + ArrayList splits = new ArrayList<>(); + Map subSplits; + subSplits = getSubSplits(keyspace, cfName, tokenRange, conf); + // turn the sub-ranges into InputSplits + String[] endpoints = new String[hosts.size()]; + + // hadoop needs hostname, not ip + int endpointIndex = 0; + for (Host endpoint : hosts) + endpoints[endpointIndex++] = endpoint.getAddress().getHostName(); + + boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner; + + for (TokenRange subSplit : subSplits.keySet()) + { + List ranges = subSplit.unwrap(); + for (TokenRange subrange : ranges) + { + ColumnFamilySplit split = + new ColumnFamilySplit( + partitionerIsOpp ? + subrange.getStart().toString().substring(2) : subrange.getStart().toString(), + partitionerIsOpp ? + subrange.getEnd().toString().substring(2) : subrange.getStart().toString(), + subSplits.get(subSplit), + endpoints); + - logger.debug("adding {}", split); ++ logger.trace("adding {}", split); + splits.add(split); + } + } + return splits; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/hints/HintVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hints/HintVerbHandler.java index 458d01f,0000000..b2c7b6a mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java +++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java @@@ -1,89 -1,0 +1,89 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.hints; + +import java.net.InetAddress; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.service.StorageService; + +/** + * Verb handler used both for hint dispatch and streaming. + * + * With the non-sstable format, we cannot just stream hint sstables on node decommission. So sometimes, at decommission + * time, we might have to stream hints to a non-owning host (say, if the owning host B is down during decommission of host A). + * In that case the handler just stores the received hint in its local hint store. + */ +public final class HintVerbHandler implements IVerbHandler +{ + private static final Logger logger = LoggerFactory.getLogger(HintVerbHandler.class); + + public void doVerb(MessageIn message, int id) + { + UUID hostId = message.payload.hostId; + Hint hint = message.payload.hint; + + // If we see an unknown table id, it means the table, or one of the tables in the mutation, had been dropped. + // In that case there is nothing we can really do, or should do, other than log it go on. + // This will *not* happen due to a not-yet-seen table, because we don't transfer hints unless there + // is schema agreement between the sender and the receiver. + if (hint == null) + { - logger.debug("Failed to decode and apply a hint for {} - table with id {} is unknown", ++ logger.trace("Failed to decode and apply a hint for {} - table with id {} is unknown", + hostId, + message.payload.unknownTableID); + reply(id, message.from); + return; + } + + // We must perform validation before applying the hint, and there is no other place to do it other than here. + try + { + hint.mutation.getPartitionUpdates().forEach(PartitionUpdate::validate); + } + catch (MarshalException e) + { + logger.warn("Failed to validate a hint for {} (table id {}) - skipped", hostId); + reply(id, message.from); + return; + } + + // Apply the hint if this node is the destination, store for future dispatch if this node isn't (must have gotten + // it from a decommissioned node that had streamed it before going out). + if (hostId.equals(StorageService.instance.getLocalHostUUID())) + hint.apply(); + else + HintsService.instance.write(hostId, hint); + + reply(id, message.from); + } + + private static void reply(int id, InetAddress to) + { + MessagingService.instance().sendReply(HintResponse.message, id, to); + } +}