Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8A3556F0A for ; Thu, 21 Jul 2011 09:12:53 +0000 (UTC) Received: (qmail 99601 invoked by uid 500); 21 Jul 2011 09:12:53 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 99407 invoked by uid 500); 21 Jul 2011 09:12:50 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 99393 invoked by uid 99); 21 Jul 2011 09:12:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jul 2011 09:12:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jul 2011 09:12:42 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C790C238885D for ; Thu, 21 Jul 2011 09:12:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1149085 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/io/util/ src/jav... Date: Thu, 21 Jul 2011 09:12:16 -0000 To: commits@cassandra.apache.org From: slebresne@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110721091221.C790C238885D@eris.apache.org> Author: slebresne Date: Thu Jul 21 09:12:06 2011 New Revision: 1149085 URL: http://svn.apache.org/viewvc?rev=1149085&view=rev Log: Use reference counting to delete sstables instead of relying on the GC patch by slebresne; reviewed by jbellis for CASSANDRA-2521 Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java Removed: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java cassandra/trunk/src/java/org/apache/cassandra/db/Table.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Thu Jul 21 09:12:06 2011 @@ -13,6 +13,8 @@ * reset CF and SC deletion times after gc_grace (CASSANDRA-2317) * optimize away seek when compacting wide rows (CASSANDRA-2879) * single-pass streaming (CASSANDRA-2677) + * use reference counting for deleting sstables instead of relying on the GC + (CASSANDRA-2521) 0.8.2 Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Jul 21 09:12:06 2011 @@ -38,13 +38,13 @@ import org.apache.cassandra.db.ColumnFam import org.apache.cassandra.db.DefsTable; import org.apache.cassandra.db.Table; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.db.migration.Migration; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.MmappedSegmentedFile; import org.apache.cassandra.locator.*; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.scheduler.NoScheduler; @@ -186,6 +186,9 @@ public class DatabaseDescriptor indexAccessMode = conf.disk_access_mode; logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode ); } + // We could enable cleaner for index only mmap but it probably doesn't matter much + if (conf.disk_access_mode == Config.DiskAccessMode.mmap) + MmappedSegmentedFile.initCleaner(); /* Authentication and authorization backend, implementing IAuthenticator and IAuthority */ if (conf.authenticator != null) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jul 21 09:12:06 2011 @@ -343,7 +343,15 @@ public class ColumnFamilyStore implement { throw new AssertionError(e); } - buildSecondaryIndexes(getSSTables(), FBUtilities.singleton(info.name)); + Collection sstables = markCurrentViewReferenced().sstables; + try + { + buildSecondaryIndexes(sstables, FBUtilities.singleton(info.name)); + } + finally + { + SSTableReader.releaseReferences(sstables); + } SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName); } }; @@ -356,6 +364,7 @@ public class ColumnFamilyStore implement { logger.info(String.format("Submitting index build of %s for data in %s", metadata.comparator.getString(columns), StringUtils.join(sstables, ", "))); + Table.IndexBuilder builder = table.createIndexBuilder(this, columns, new ReducingKeyIterator(sstables)); Future future = CompactionManager.instance.submitIndexBuild(this, builder); try @@ -372,6 +381,7 @@ public class ColumnFamilyStore implement { throw new RuntimeException(e); } + logger.info("Index build of " + metadata.comparator.getString(columns) + " complete"); } @@ -1234,16 +1244,56 @@ public class ColumnFamilyStore implement return cf.isSuper() ? removeDeleted(cf, gcBefore) : removeDeletedCF(cf, gcBefore); } + /** + * Get the current view and acquires references on all its sstables. + * This is a bit tricky because we must ensure that between the time we + * get the current view and the time we acquire the references the set of + * sstables hasn't changed. Otherwise we could get a view for which an + * sstable have been deleted in the meantime. + * + * At the end of this method, a reference on all the sstables of the + * returned view will have been acquired and must thus be released when + * appropriate. + */ + private DataTracker.View markCurrentViewReferenced() + { + while (true) + { + DataTracker.View currentView = data.getView(); + SSTableReader.acquireReferences(currentView.sstables); + if (currentView.sstables == data.getView().sstables) // reference equality is fine + { + return currentView; + } + else + { + // the set of sstables has changed, let's release the acquired references and try again + SSTableReader.releaseReferences(currentView.sstables); + } + } + } + + /** + * Get the current sstables, acquiring references on all of them. + * The caller is in charge of releasing the references on the sstables. + * + * See markCurrentViewReferenced() above. + */ + public Collection markCurrentSSTablesReferenced() + { + return markCurrentViewReferenced().sstables; + } + private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore) { // we are querying top-level columns, do a merging fetch with indexes. List iterators = new ArrayList(); final ColumnFamily returnCF = ColumnFamily.create(metadata); + DataTracker.View currentView = markCurrentViewReferenced(); try { IColumnIterator iter; int sstablesToIterate = 0; - DataTracker.View currentView = data.getView(); /* add the current memtable */ iter = filter.getMemtableColumnIterator(currentView.memtable, getComparator()); @@ -1303,6 +1353,7 @@ public class ColumnFamilyStore implement logger.error("error closing " + ci, th); } } + SSTableReader.releaseReferences(currentView.sstables); } } @@ -1328,58 +1379,66 @@ public class ColumnFamilyStore implement QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter); int gcBefore = (int)(System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds(); - DataTracker.View currentView = data.getView(); - Collection memtables = new ArrayList(); - memtables.add(currentView.memtable); - memtables.addAll(currentView.memtablesPendingFlush); - // It is fine to aliases the View.sstables since it's an unmodifiable collection - Collection sstables = currentView.sstables; - - CloseableIterator iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this); - List rows = new ArrayList(); - + DataTracker.View currentView = markCurrentViewReferenced(); try { - // pull rows out of the iterator - boolean first = true; - while (iterator.hasNext()) - { - Row current = iterator.next(); - DecoratedKey key = current.key; - - if (!stopAt.isEmpty() && stopAt.compareTo(key) < 0) - return rows; - - // skip first one - if(range instanceof Bounds || !first || !key.equals(startWith)) - { - // TODO this is necessary because when we collate supercolumns together, we don't check - // their subcolumns for relevance, so we need to do a second prune post facto here. - rows.add(current.cf != null && current.cf.isSuper() - ? new Row(current.key, ColumnFamilyStore.removeDeleted(current.cf, gcBefore)) - : current); - if (logger.isDebugEnabled()) - logger.debug("scanned " + key); - } - first = false; + Collection memtables = new ArrayList(); + memtables.add(currentView.memtable); + memtables.addAll(currentView.memtablesPendingFlush); + // It is fine to aliases the View.sstables since it's an unmodifiable collection + Collection sstables = currentView.sstables; + + CloseableIterator iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this); + List rows = new ArrayList(); - if (rows.size() >= maxResults) - return rows; - } - } - finally - { try { - iterator.close(); + // pull rows out of the iterator + boolean first = true; + while (iterator.hasNext()) + { + Row current = iterator.next(); + DecoratedKey key = current.key; + + if (!stopAt.isEmpty() && stopAt.compareTo(key) < 0) + return rows; + + // skip first one + if(range instanceof Bounds || !first || !key.equals(startWith)) + { + // TODO this is necessary because when we collate supercolumns together, we don't check + // their subcolumns for relevance, so we need to do a second prune post facto here. + rows.add(current.cf != null && current.cf.isSuper() + ? new Row(current.key, ColumnFamilyStore.removeDeleted(current.cf, gcBefore)) + : current); + if (logger.isDebugEnabled()) + logger.debug("scanned " + key); + } + first = false; + + if (rows.size() >= maxResults) + return rows; + } } - catch (IOException e) + finally { - throw new IOError(e); + try + { + iterator.close(); + } + catch (IOException e) + { + throw new IOError(e); + } } + + return rows; + } + finally + { + SSTableReader.releaseReferences(currentView.sstables); } - return rows; } private NamesQueryFilter getExtraFilter(IndexClause clause) @@ -1633,26 +1692,34 @@ public class ColumnFamilyStore implement { for (ColumnFamilyStore cfs : concatWithIndexes()) { - for (SSTableReader ssTable : cfs.data.getSSTables()) + DataTracker.View currentView = cfs.markCurrentViewReferenced(); + try { - try + for (SSTableReader ssTable : currentView.sstables) { - // mkdir - File dataDirectory = ssTable.descriptor.directory.getParentFile(); - String snapshotDirectoryPath = Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table.name, snapshotName); - FileUtils.createDirectory(snapshotDirectoryPath); - - // hard links - ssTable.createLinks(snapshotDirectoryPath); - if (logger.isDebugEnabled()) - logger.debug("Snapshot for " + table + " keyspace data file " + ssTable.getFilename() + - " created in " + snapshotDirectoryPath); - } - catch (IOException e) - { - throw new IOError(e); + try + { + // mkdir + File dataDirectory = ssTable.descriptor.directory.getParentFile(); + String snapshotDirectoryPath = Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table.name, snapshotName); + FileUtils.createDirectory(snapshotDirectoryPath); + + // hard links + ssTable.createLinks(snapshotDirectoryPath); + if (logger.isDebugEnabled()) + logger.debug("Snapshot for " + table + " keyspace data file " + ssTable.getFilename() + + " created in " + snapshotDirectoryPath); + } + catch (IOException e) + { + throw new IOError(e); + } } } + finally + { + SSTableReader.releaseReferences(currentView.sstables); + } } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Thu Jul 21 09:12:06 2011 @@ -37,7 +37,7 @@ import org.apache.cassandra.io.sstable.D import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.Pair; -public class DataTracker +public class DataTracker { private static final Logger logger = LoggerFactory.getLogger(DataTracker.class); @@ -157,6 +157,10 @@ public class DataTracker * @return A subset of the given active sstables that have been marked compacting, * or null if the thresholds cannot be met: files that are marked compacting must * later be unmarked using unmarkCompacting. + * + * Note that we could acquire references on the marked sstables and release them in + * unmarkCompacting, but since we will never call markCompacted on a sstable marked + * as compacting (unless there is a serious bug), we can skip this. */ public Set markCompacting(Collection tomark, int min, int max) { @@ -280,7 +284,16 @@ public class DataTracker if (logger.isDebugEnabled()) logger.debug(String.format("removing %s from list of files tracked for %s.%s", sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName())); - sstable.markCompacted(); + // A reference must be acquire before any call to markCompacted, see SSTableReader for details + sstable.acquireReference(); + try + { + sstable.markCompacted(); + } + finally + { + sstable.releaseReference(); + } liveSize.addAndGet(-sstable.bytesOnDisk()); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Jul 21 09:12:06 2011 @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; @@ -42,9 +43,10 @@ import org.apache.cassandra.db.filter.Qu import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.dht.LocalToken; import org.apache.cassandra.io.sstable.ReducingKeyIterator; -import org.apache.cassandra.io.sstable.SSTableDeletingReference; +import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.MmappedSegmentedFile; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -683,13 +685,18 @@ public class Table public String getDataFileLocation(long expectedSize) { String path = DatabaseDescriptor.getDataFileLocationForTable(name, expectedSize); - if (path == null) + // Requesting GC has a chance to free space only if we're using mmap and a non SUN jvm + if (path == null + && (DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap || DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap) + && !MmappedSegmentedFile.isCleanerAvailable()) { - // retry after GCing to force unmap of compacted SSTables so they can be deleted StorageService.instance.requestGC(); + // retry after GCing has forced unmap of compacted SSTables so they can be deleted + // Note: GCInspector will do this already, but only sun JVM supports GCInspector so far + SSTableDeletingTask.rescheduleFailedTasks(); try { - Thread.sleep(SSTableDeletingReference.RETRY_DELAY * 2); + Thread.sleep(10000); } catch (InterruptedException e) { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Jul 21 09:12:06 2011 @@ -342,28 +342,36 @@ public class CompactionManager implement } } - if (sstables.isEmpty()) - { - logger.error("No file to compact for user defined compaction"); - } - // attempt to schedule the set - else if ((sstables = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null) + Collection toCompact; + try { - // success: perform the compaction - try + if (sstables.isEmpty()) { - AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); - AbstractCompactionTask task = strategy.getUserDefinedTask(sstables, gcBefore); - task.execute(executor); + logger.error("No file to compact for user defined compaction"); + } + // attempt to schedule the set + else if ((toCompact = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null) + { + // success: perform the compaction + try + { + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); + AbstractCompactionTask task = strategy.getUserDefinedTask(toCompact, gcBefore); + task.execute(executor); + } + finally + { + cfs.getDataTracker().unmarkCompacting(toCompact); + } } - finally + else { - cfs.getDataTracker().unmarkCompacting(sstables); + logger.error("SSTables for user defined compaction are already being compacted."); } } - else + finally { - logger.error("SSTables for user defined compaction are already being compacted."); + SSTableReader.releaseReferences(sstables); } return this; @@ -377,18 +385,23 @@ public class CompactionManager implement return executor.submit(callable); } + // This acquire a reference on the sstable + // This is not efficent, do not use in any critical path private SSTableReader lookupSSTable(final ColumnFamilyStore cfs, Descriptor descriptor) { - for (SSTableReader sstable : cfs.getSSTables()) + SSTableReader found = null; + for (SSTableReader sstable : cfs.markCurrentSSTablesReferenced()) { // .equals() with no other changes won't work because in sstable.descriptor, the directory is an absolute path. // We could construct descriptor with an absolute path too but I haven't found any satisfying way to do that // (DB.getDataFileLocationForTable() may not return the right path if you have multiple volumes). Hence the // endsWith. if (sstable.descriptor.toString().endsWith(descriptor.toString())) - return sstable; + found = sstable; + else + sstable.releaseReference(); } - return null; + return found; } /** @@ -779,7 +792,8 @@ public class CompactionManager implement throw new AssertionError(e); } - CompactionIterator ci = new ValidationCompactionIterator(cfs, validator.request.range); + Collection sstables = cfs.markCurrentSSTablesReferenced(); + CompactionIterator ci = new ValidationCompactionIterator(cfs, sstables, validator.request.range); executor.beginCompaction(ci); try { @@ -796,6 +810,7 @@ public class CompactionManager implement } finally { + SSTableReader.releaseReferences(sstables); ci.close(); executor.finishCompaction(ci); } @@ -940,11 +955,11 @@ public class CompactionManager implement private static class ValidationCompactionIterator extends CompactionIterator { - public ValidationCompactionIterator(ColumnFamilyStore cfs, Range range) throws IOException + public ValidationCompactionIterator(ColumnFamilyStore cfs, Collection sstables, Range range) throws IOException { super(CompactionType.VALIDATION, - getScanners(cfs.getSSTables(), range), - new CompactionController(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true)); + getScanners(sstables, range), + new CompactionController(cfs, sstables, getDefaultGcBefore(cfs), true)); } protected static List getScanners(Iterable sstables, Range range) throws IOException Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java?rev=1149085&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java Thu Jul 21 09:12:06 2011 @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.DataTracker; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.WrappedRunnable; + +public class SSTableDeletingTask extends WrappedRunnable +{ + private static final Logger logger = LoggerFactory.getLogger(SSTableDeletingTask.class); + + // Deleting sstables is tricky because the mmapping might not have been finalized yet, + // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs). + // Additionally, we need to make sure to delete the data file first, so on restart the others + // will be recognized as GCable. + private static final Set failedTasks = new CopyOnWriteArraySet(); + + public final Descriptor desc; + public final Set components; + private DataTracker tracker; + private final long size; + + public SSTableDeletingTask(SSTableReader referent) + { + this.desc = referent.descriptor; + this.components = referent.components; + this.size = referent.bytesOnDisk(); + } + + public void setTracker(DataTracker tracker) + { + this.tracker = tracker; + } + + public void schedule() + { + StorageService.tasks.submit(this); + } + + protected void runMayThrow() throws IOException + { + // If we can't successfully delete the DATA component, set the task to be retried later: see above + File datafile = new File(desc.filenameFor(Component.DATA)); + if (!datafile.delete()) + { + logger.error("Unable to delete " + datafile + " (it will be removed on server restart; we'll also retry after GC)"); + failedTasks.add(this); + return; + } + // let the remainder be cleaned up by delete + SSTable.delete(desc, Sets.difference(components, Collections.singleton(Component.DATA))); + if (tracker != null) + tracker.spaceReclaimed(size); + } + + /** + * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.) + * Useful because there are times when we know GC has been invoked; also exposed as an mbean. + */ + public static void rescheduleFailedTasks() + { + for (SSTableDeletingTask task : failedTasks) + { + failedTasks.remove(task); + task.schedule(); + } + } +} + Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Jul 21 09:12:06 2011 @@ -20,10 +20,10 @@ package org.apache.cassandra.io.sstable; import java.io.*; -import java.lang.ref.Reference; -import java.lang.ref.ReferenceQueue; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Function; import com.google.common.collect.Collections2; @@ -36,11 +36,9 @@ import org.apache.cassandra.config.Datab import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; -import org.apache.cassandra.io.IColumnSerializer; import org.apache.cassandra.io.util.BufferedRandomAccessFile; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileUtils; @@ -59,41 +57,6 @@ public class SSTableReader extends SSTab // guesstimated size of INDEX_INTERVAL index entries private static final int INDEX_FILE_BUFFER_BYTES = 16 * DatabaseDescriptor.getIndexInterval(); - // `finalizers` is required to keep the PhantomReferences alive after the enclosing SSTR is itself - // unreferenced. otherwise they will never get enqueued. - private static final Set> finalizers = new HashSet>(); - private static final ReferenceQueue finalizerQueue = new ReferenceQueue() - {{ - Runnable runnable = new Runnable() - { - public void run() - { - while (true) - { - SSTableDeletingReference r; - try - { - r = (SSTableDeletingReference) finalizerQueue.remove(); - finalizers.remove(r); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - try - { - r.cleanup(); - } - catch (IOException e) - { - logger.error("Error deleting " + r.desc, e); - } - } - } - }; - new Thread(runnable, "SSTABLE-DELETER").start(); - }}; - /** * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created @@ -119,7 +82,10 @@ public class SSTableReader extends SSTab private BloomFilterTracker bloomFilterTracker = new BloomFilterTracker(); - private volatile SSTableDeletingReference phantomReference; + private final AtomicInteger holdReferences = new AtomicInteger(0); + private final AtomicBoolean isCompacted = new AtomicBoolean(false); + private final AtomicBoolean isScheduledForDeletion = new AtomicBoolean(false); + private final SSTableDeletingTask deletingTask; private final SSTableMetadata sstableMetadata; @@ -240,15 +206,15 @@ public class SSTableReader extends SSTab this.dfile = dfile; this.indexSummary = indexSummary; this.bf = bloomFilter; + this.deletingTask = new SSTableDeletingTask(this); } public void setTrackedBy(DataTracker tracker) { if (tracker != null) { - phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue); - finalizers.add(phantomReference); keyCache = tracker.getKeyCache(); + deletingTask.setTracker(tracker); } } @@ -639,6 +605,35 @@ public class SSTableReader extends SSTab return dfile.length; } + public void acquireReference() + { + holdReferences.incrementAndGet(); + } + + public void releaseReference() + { + if (holdReferences.decrementAndGet() == 0 && isCompacted.get()) + { + // Force finalizing mmapping if necessary + ifile.cleanup(); + dfile.cleanup(); + + deletingTask.schedule(); + } + assert holdReferences.get() >= 0 : "Reference counter " + holdReferences.get() + " for " + dfile.path; + } + + /** + * Mark the sstable as compacted. + * When calling this function, the caller must ensure two things: + * - He must have acquired a reference with acquireReference() + * - He must ensure that the SSTableReader is not referenced anywhere except for threads holding a reference. + * + * The reason we ask caller to acquire a reference is because this greatly simplify the logic here. + * If that wasn't the case, markCompacted would have to deal with both the case where some thread still + * have references and the case where no thread have any reference. Making this logic thread-safe is a + * bit hard, so we make sure that at least the caller thread has a reference and delegate the rest to releaseRefence() + */ public void markCompacted() { if (logger.isDebugEnabled()) @@ -652,7 +647,9 @@ public class SSTableReader extends SSTab { throw new IOError(e); } - phantomReference.deleteOnCleanup(); + + boolean alreadyCompacted = isCompacted.getAndSet(true); + assert !alreadyCompacted : this + " was already marked compacted"; } /** @@ -808,4 +805,29 @@ public class SSTableReader extends SSTab { return sstableMetadata.getMaxTimestamp(); } + + public static void acquireReferences(Iterable sstables) + { + for (SSTableReader sstable : sstables) + { + if (sstable != null) + sstable.acquireReference(); + } + } + + public static void releaseReferences(Iterable sstables) + { + for (SSTableReader sstable : sstables) + { + try + { + if (sstable != null) + sstable.releaseReference(); + } + catch (Throwable ex) + { + logger.error("Failed releasing reference on " + sstable, ex); + } + } + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java Thu Jul 21 09:12:06 2011 @@ -68,4 +68,9 @@ public class BufferedSegmentedFile exten throw new IOError(e); } } + + public void cleanup() + { + // nothing to do + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java Thu Jul 21 09:12:06 2011 @@ -25,17 +25,25 @@ import java.io.File; import java.io.IOError; import java.io.IOException; import java.io.RandomAccessFile; +import java.lang.reflect.Method; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class MmappedSegmentedFile extends SegmentedFile { + private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class); + // in a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size to stay sane. public static long MAX_SEGMENT_SIZE = Integer.MAX_VALUE; + private static Method cleanerMethod = null; + /** * Sorted array of segment offsets and MappedByteBuffers for segments. If mmap is completely disabled, or if the * segment would be too long to mmap, the value for an offset will be null, indicating that we need to fall back @@ -90,6 +98,53 @@ public class MmappedSegmentedFile extend } } + public static void initCleaner() + { + try + { + cleanerMethod = Class.forName("sun.nio.ch.DirectBuffer").getMethod("cleaner"); + } + catch (Exception e) + { + // Perhaps a non-sun-derived JVM - contributions welcome + logger.info("Cannot initialize un-mmaper. (Are you using a non-SUN JVM?) Compacted data files will not be removed promptly. Consider using a SUN JVM or using standard disk access mode"); + } + } + + public static boolean isCleanerAvailable() + { + return cleanerMethod != null; + } + + public void cleanup() + { + if (cleanerMethod == null) + return; + + /* + * Try forcing the unmapping of segments using undocumented unsafe sun APIs. + * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping. + * If this works and a thread tries to access any segment, hell will unleash on earth. + */ + try + { + for (Segment segment : segments) + { + if (segment.right == null) + continue; + + Object cleaner = cleanerMethod.invoke(segment.right); + cleaner.getClass().getMethod("clean").invoke(cleaner); + } + logger.debug("All segments have been unmapped successfully"); + } + catch (Exception e) + { + // This is not supposed to happen + logger.error("Error while unmapping segments", e); + } + } + /** * Overrides the default behaviour to create segments of a maximum size. */ Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java Thu Jul 21 09:12:06 2011 @@ -72,6 +72,11 @@ public abstract class SegmentedFile } /** + * Do whatever action is needed to reclaim ressources used by this SegmentedFile. + */ + public abstract void cleanup(); + + /** * Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it. */ public static abstract class Builder Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Jul 21 09:12:06 2011 @@ -494,7 +494,8 @@ public class AntiEntropyService ColumnFamilyStore cfstore = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right); try { - Collection sstables = cfstore.getSSTables(); + // We acquire references for transferSSTables + Collection sstables = cfstore.markCurrentSSTablesReferenced(); Callback callback = new Callback(); // send ranges to the remote node StreamOutSession outsession = StreamOutSession.create(request.cf.left, request.endpoint, callback); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Thu Jul 21 09:12:06 2011 @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.utils.StatusLogger; public class GCInspector @@ -135,6 +136,8 @@ public class GCInspector // if we just finished a full collection and we're still using a lot of memory, try to reduce the pressure if (gcw.getName().equals("ConcurrentMarkSweep")) { + SSTableDeletingTask.rescheduleFailedTasks(); + double usage = (double) memoryUsed / memoryMax; if (memoryUsed > DatabaseDescriptor.getReduceCacheSizesAt() * memoryMax && !cacheSizesReduced) Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Jul 21 09:12:06 2011 @@ -49,6 +49,7 @@ import org.apache.cassandra.db.commitlog import org.apache.cassandra.dht.*; import org.apache.cassandra.gms.*; import org.apache.cassandra.io.DeletionService; +import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.AbstractReplicationStrategy; @@ -2517,4 +2518,9 @@ public class StorageService implements I { return AbstractCassandraDaemon.exceptions.get(); } + + public void rescheduleFailedDeletions() + { + SSTableDeletingTask.rescheduleFailedTasks(); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Thu Jul 21 09:12:06 2011 @@ -321,4 +321,6 @@ public interface StorageServiceMBean public void setCompactionThroughputMbPerSec(int value); public void bulkLoad(String directory); + + public void rescheduleFailedDeletions(); } Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Thu Jul 21 09:12:06 2011 @@ -29,7 +29,7 @@ import java.util.List; import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.Pair; @@ -45,8 +45,8 @@ public class PendingFile return serializer_; } - // NB: this reference prevents garbage collection of the sstable on the source node - private final SSTable sstable; + // NB: this reference is used to be able to release the acquired reference upon completion + public final SSTableReader sstable; public final Descriptor desc; public final String component; @@ -61,12 +61,12 @@ public class PendingFile this(null, desc, pf.component, pf.sections, pf.type, pf.estimatedKeys); } - public PendingFile(SSTable sstable, Descriptor desc, String component, List> sections, OperationType type) + public PendingFile(SSTableReader sstable, Descriptor desc, String component, List> sections, OperationType type) { this(sstable, desc, component, sections, type, 0); } - public PendingFile(SSTable sstable, Descriptor desc, String component, List> sections, OperationType type, long estimatedKeys) + public PendingFile(SSTableReader sstable, Descriptor desc, String component, List> sections, OperationType type, long estimatedKeys) { this.sstable = sstable; this.desc = desc; Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Thu Jul 21 09:12:06 2011 @@ -140,43 +140,61 @@ public class StreamInSession { // wait for bloom filters and row indexes to finish building HashMap > cfstores = new HashMap>(); - for (Future future : buildFutures) + List referenced = new LinkedList(); + try { - try + for (Future future : buildFutures) + { + try + { + SSTableReader sstable = future.get(); + assert sstable.getTableName().equals(table); + + // Acquiring the reference (for secondary index building) before adding it makes sure we don't have to care about races + sstable.acquireReference(); + referenced.add(sstable); + + ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName()); + cfs.addSSTable(sstable); + if (!cfstores.containsKey(cfs)) + cfstores.put(cfs, new ArrayList()); + cfstores.get(cfs).add(sstable); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + } + + for (SSTableReader sstable : readers) { - SSTableReader sstable = future.get(); assert sstable.getTableName().equals(table); + + // Acquiring the reference (for secondary index building) before adding it makes sure we don't have to care about races + sstable.acquireReference(); + referenced.add(sstable); + ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName()); cfs.addSSTable(sstable); if (!cfstores.containsKey(cfs)) cfstores.put(cfs, new ArrayList()); cfstores.get(cfs).add(sstable); } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (ExecutionException e) + + // build secondary indexes + for (Map.Entry> entry : cfstores.entrySet()) { - throw new RuntimeException(e); + if (entry.getKey() != null && !entry.getKey().getIndexedColumns().isEmpty()) + entry.getKey().buildSecondaryIndexes(entry.getValue(), entry.getKey().getIndexedColumns()); } } - - for (SSTableReader sstable : readers) - { - assert sstable.getTableName().equals(table); - ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName()); - cfs.addSSTable(sstable); - if (!cfstores.containsKey(cfs)) - cfstores.put(cfs, new ArrayList()); - cfstores.get(cfs).add(sstable); - } - - // build secondary indexes - for (Map.Entry> entry : cfstores.entrySet()) + finally { - if (entry.getKey() != null && !entry.getKey().getIndexedColumns().isEmpty()) - entry.getKey().buildSecondaryIndexes(entry.getValue(), entry.getKey().getIndexedColumns()); + SSTableReader.releaseReferences(referenced); } // send reply to source that we're done Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Thu Jul 21 09:12:06 2011 @@ -118,7 +118,7 @@ public class StreamOut flushSSTables(cfses); Iterable sstables = Collections.emptyList(); for (ColumnFamilyStore cfStore : cfses) - sstables = Iterables.concat(sstables, cfStore.getSSTables()); + sstables = Iterables.concat(sstables, cfStore.markCurrentSSTablesReferenced()); transferSSTables(session, sstables, ranges, type); } catch (IOException e) @@ -129,7 +129,7 @@ public class StreamOut /** * Low-level transfer of matching portions of a group of sstables from a single table to the target endpoint. - * You should probably call transferRanges instead. + * You should probably call transferRanges instead. This moreover assumes that references have been acquired on the sstables. */ public static void transferSSTables(StreamOutSession session, Iterable sstables, Collection ranges, OperationType type) throws IOException { @@ -150,7 +150,11 @@ public class StreamOut Descriptor desc = sstable.descriptor; List> sections = sstable.getPositionsForRanges(ranges); if (sections.isEmpty()) + { + // A reference was acquired on the sstable and we won't stream it + sstable.releaseReference(); continue; + } pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type, sstable.estimatedKeysForRanges(ranges))); } logger.info("Stream context metadata {}, {} sstables.", pending, Iterables.size(sstables)); Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java Thu Jul 21 09:12:06 2011 @@ -114,6 +114,7 @@ public class StreamOutSession public void startNext() throws IOException { assert files.containsKey(currentFile); + files.get(currentFile).sstable.releaseReference(); files.remove(currentFile); Iterator iter = files.values().iterator(); if (iter.hasNext()) @@ -122,6 +123,9 @@ public class StreamOutSession public void close() { + // Release reference on last file + for (PendingFile file : files.values()) + file.sstable.releaseReference(); streams.remove(context); if (callback != null) callback.run(); Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java Thu Jul 21 09:12:06 2011 @@ -24,6 +24,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.Column; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.DecoratedKey; @@ -35,6 +38,8 @@ import org.apache.cassandra.Util; public class SSTableUtils { + private static Logger logger = LoggerFactory.getLogger(SSTableUtils.class); + // first configured table and cf public static String TABLENAME = "Keyspace1"; public static String CFNAME = "Standard1"; Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Thu Jul 21 09:12:06 2011 @@ -30,6 +30,7 @@ import org.apache.cassandra.dht.BytesTok import org.apache.cassandra.dht.Range; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.net.Message; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -176,7 +177,7 @@ public class SerializationsTest extends in.close(); } - private static SSTable makeSSTable() + private static SSTableReader makeSSTable() { Table t = Table.open("Keyspace1"); for (int i = 0; i < 100; i++) Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1149085&r1=1149084&r2=1149085&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Thu Jul 21 09:12:06 2011 @@ -77,6 +77,8 @@ public class StreamingTransferTest exten cfs.forceBlockingFlush(); assert cfs.getSSTables().size() == 1; SSTableReader sstable = cfs.getSSTables().iterator().next(); + // We acquire a reference now, because removeAllSSTables will mark the sstable compacted, and we have work to do with it + sstable.acquireReference(); cfs.removeAllSSTables(); // transfer the first and last key @@ -134,6 +136,9 @@ public class StreamingTransferTest exten List ranges = new ArrayList(); ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("test")))); ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("transfer2")), p.getMinimumToken())); + // Acquiring references, transferSSTables needs it + sstable.acquireReference(); + sstable2.acquireReference(); StreamOutSession session = StreamOutSession.create(tablename, LOCAL, null); StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges, OperationType.BOOTSTRAP); session.await(); @@ -186,6 +191,9 @@ public class StreamingTransferTest exten // the left hand side of the range is exclusive, so we transfer from the second-to-last token ranges.add(new Range(secondtolast.getKey().token, p.getMinimumToken())); + // Acquiring references, transferSSTables needs it + SSTableReader.acquireReferences(ssTableReaders); + StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null); StreamOut.transferSSTables(session, ssTableReaders, ranges, OperationType.BOOTSTRAP);