Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AD676909C for ; Wed, 21 Sep 2011 13:15:09 +0000 (UTC) Received: (qmail 47374 invoked by uid 500); 21 Sep 2011 13:15:09 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 47353 invoked by uid 500); 21 Sep 2011 13:15:09 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 47345 invoked by uid 99); 21 Sep 2011 13:15:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Sep 2011 13:15:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Sep 2011 13:15:07 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C8569238889B for ; Wed, 21 Sep 2011 13:14:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1173615 - in /cassandra/branches/cassandra-1.0.0: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/ Date: Wed, 21 Sep 2011 13:14:47 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20110921131447.C8569238889B@eris.apache.org> Author: jbellis Date: Wed Sep 21 13:14:47 2011 New Revision: 1173615 URL: http://svn.apache.org/viewvc?rev=1173615&view=rev Log: parallelize sstable open at server startup patch by Melvin Wang and jbellis; reviewed by pyaskevich for CASSANDRA-2988 Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1173615&r1=1173614&r2=1173615&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Wed Sep 21 13:14:47 2011 @@ -12,6 +12,7 @@ schema definitions were found (CASSANDRA-3219) * Fixes for LeveledCompactionStrategy score computation, prioritization, and scheduling (CASSANDRA-3224) + * parallelize sstable open at server startup (CASSANDRA-2988) 1.0.0-beta1 Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1173615&r1=1173614&r2=1173615&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed Sep 21 13:14:47 2011 @@ -98,6 +98,11 @@ public class DebuggableThreadPoolExecuto this.setRejectedExecutionHandler(blockingExecutionHandler); } + public static DebuggableThreadPoolExecutor createWithPoolSize(String threadPoolName, int size) + { + return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory(threadPoolName)); + } + protected void onInitialRejection(Runnable task) {} protected void onFinalAccept(Runnable task) {} protected void onFinalRejection(Runnable task) {} Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173615&r1=1173614&r2=1173615&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 21 13:14:47 2011 @@ -212,15 +212,8 @@ public class ColumnFamilyStore implement // scan for sstables corresponding to this cf and load them data = new DataTracker(this); Set savedKeys = keyCache.readSaved(); - List sstables = new ArrayList(); - for (Map.Entry> sstableFiles : files(table.name, columnFamilyName, false, false).entrySet()) - { - SSTableReader reader = openSSTableReader(sstableFiles, savedKeys, data, metadata, partitioner); - - if (reader != null) // if == null, logger errors where already fired - sstables.add(reader); - } - data.addSSTables(sstables); + Set>> entries = files(table.name, columnFamilyName, false, false).entrySet(); + data.addSSTables(SSTableReader.batchOpen(entries, savedKeys, data, metadata, this.partitioner)); // compaction strategy should be created after the CFS has been prepared this.compactionStrategy = metadata.createCompactionStrategyInstance(this); @@ -541,10 +534,15 @@ public class ColumnFamilyStore implement descriptor)); logger.info("Initializing new SSTable {}", rawSSTable); - reader = openSSTableReader(rawSSTable, savedKeys, data, metadata, partitioner); - - if (reader == null) - continue; // something wrong with SSTable, skipping + try + { + reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, data, metadata, partitioner); + } + catch (IOException e) + { + SSTableReader.logOpenException(rawSSTable.getKey(), e); + continue; + } sstables.add(reader); @@ -1892,30 +1890,6 @@ public class ColumnFamilyStore implement return indexManager.getBuiltIndexes(); } - private static SSTableReader openSSTableReader(Map.Entry> rawSSTable, - Set savedKeys, - DataTracker tracker, - CFMetaData metadata, - IPartitioner partitioner) - { - SSTableReader reader = null; - - try - { - reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, tracker, metadata, partitioner); - } - catch (FileNotFoundException ex) - { - logger.error("Missing sstable component in " + rawSSTable + "; skipped because of " + ex.getMessage()); - } - catch (IOException ex) - { - logger.error("Corrupt sstable " + rawSSTable + "; skipped", ex); - } - - return reader; - } - public int getUnleveledSSTables() { return this.compactionStrategy instanceof LeveledCompactionStrategy Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1173615&r1=1173614&r2=1173615&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed Sep 21 13:14:47 2011 @@ -24,9 +24,12 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.*; import com.google.common.base.Function; import com.google.common.collect.Collections2; + +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.slf4j.Logger; @@ -164,6 +167,59 @@ public class SSTableReader extends SSTab return sstable; } + public static void logOpenException(Descriptor descriptor, IOException e) + { + if (e instanceof FileNotFoundException) + logger.error("Missing sstable component in " + descriptor + "; skipped because of " + e.getMessage()); + else + logger.error("Corrupt sstable " + descriptor + "; skipped", e); + } + + public static Collection batchOpen(Set>> entries, + final Set savedKeys, + final DataTracker tracker, + final CFMetaData metadata, + final IPartitioner partitioner) + { + final Collection sstables = new LinkedBlockingQueue(); + + ExecutorService executor = DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors()); + for (final Map.Entry> entry : entries) + { + Runnable runnable = new Runnable() + { + public void run() + { + SSTableReader sstable; + try + { + sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner); + } + catch (IOException ex) + { + logger.error("Corrupt sstable " + entry + "; skipped", ex); + return; + } + sstables.add(sstable); + } + }; + executor.submit(runnable); + } + + executor.shutdown(); + try + { + executor.awaitTermination(7, TimeUnit.DAYS); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + + return sstables; + + } + /** * Open a RowIndexedReader which already has its state initialized (by SSTableWriter). */