Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5A0C59E96 for ; Wed, 21 Sep 2011 13:02:43 +0000 (UTC) Received: (qmail 14820 invoked by uid 500); 21 Sep 2011 13:02:40 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 14804 invoked by uid 500); 21 Sep 2011 13:02:40 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 14795 invoked by uid 99); 21 Sep 2011 13:02:40 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Sep 2011 13:02:40 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Sep 2011 13:02:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8202123888E4 for ; Wed, 21 Sep 2011 13:02:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1173611 - in /cassandra/trunk: CHANGES.txt src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java src/java/org/apache/cassandra/db/ColumnFamilyStore.java src/java/org/apache/cassandra/io/sstable/SSTableReader.java Date: Wed, 21 Sep 2011 13:02:17 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20110921130217.8202123888E4@eris.apache.org> Author: jbellis Date: Wed Sep 21 13:02:16 2011 New Revision: 1173611 URL: http://svn.apache.org/viewvc?rev=1173611&view=rev Log: Revert "parallelize sstable open at server startup" Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1173611&r1=1173610&r2=1173611&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Wed Sep 21 13:02:16 2011 @@ -16,7 +16,6 @@ schema definitions were found (CASSANDRA-3219) * Fixes for LeveledCompactionStrategy score computation, prioritization, and scheduling (CASSANDRA-3224) - * parallelize sstable open at server startup (CASSANDRA-2988) 1.0.0-beta1 Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1173611&r1=1173610&r2=1173611&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed Sep 21 13:02:16 2011 @@ -98,11 +98,6 @@ public class DebuggableThreadPoolExecuto this.setRejectedExecutionHandler(blockingExecutionHandler); } - public static DebuggableThreadPoolExecutor createWithPoolSize(String threadPoolName, int size) - { - return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory(threadPoolName)); - } - protected void onInitialRejection(Runnable task) {} protected void onFinalAccept(Runnable task) {} protected void onFinalRejection(Runnable task) {} Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173611&r1=1173610&r2=1173611&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 21 13:02:16 2011 @@ -212,8 +212,15 @@ public class ColumnFamilyStore implement // scan for sstables corresponding to this cf and load them data = new DataTracker(this); Set savedKeys = keyCache.readSaved(); - Set>> entries = files(table.name, columnFamilyName, false, false).entrySet(); - data.addSSTables(SSTableReader.batchOpen(entries, savedKeys, data, metadata, this.partitioner)); + List sstables = new ArrayList(); + for (Map.Entry> sstableFiles : files(table.name, columnFamilyName, false, false).entrySet()) + { + SSTableReader reader = openSSTableReader(sstableFiles, savedKeys, data, metadata, partitioner); + + if (reader != null) // if == null, logger errors where already fired + sstables.add(reader); + } + data.addSSTables(sstables); // compaction strategy should be created after the CFS has been prepared this.compactionStrategy = metadata.createCompactionStrategyInstance(this); @@ -534,15 +541,10 @@ public class ColumnFamilyStore implement descriptor)); logger.info("Initializing new SSTable {}", rawSSTable); - try - { - reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, data, metadata, partitioner); - } - catch (IOException e) - { - SSTableReader.logOpenException(rawSSTable.getKey(), e); - continue; - } + reader = openSSTableReader(rawSSTable, savedKeys, data, metadata, partitioner); + + if (reader == null) + continue; // something wrong with SSTable, skipping sstables.add(reader); @@ -1890,6 +1892,30 @@ public class ColumnFamilyStore implement return indexManager.getBuiltIndexes(); } + private static SSTableReader openSSTableReader(Map.Entry> rawSSTable, + Set savedKeys, + DataTracker tracker, + CFMetaData metadata, + IPartitioner partitioner) + { + SSTableReader reader = null; + + try + { + reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, tracker, metadata, partitioner); + } + catch (FileNotFoundException ex) + { + logger.error("Missing sstable component in " + rawSSTable + "; skipped because of " + ex.getMessage()); + } + catch (IOException ex) + { + logger.error("Corrupt sstable " + rawSSTable + "; skipped", ex); + } + + return reader; + } + public int getUnleveledSSTables() { return this.compactionStrategy instanceof LeveledCompactionStrategy Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1173611&r1=1173610&r2=1173611&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed Sep 21 13:02:16 2011 @@ -24,12 +24,9 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.*; import com.google.common.base.Function; import com.google.common.collect.Collections2; - -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.slf4j.Logger; @@ -167,59 +164,6 @@ public class SSTableReader extends SSTab return sstable; } - public static void logOpenException(Descriptor descriptor, IOException e) - { - if (e instanceof FileNotFoundException) - logger.error("Missing sstable component in " + descriptor + "; skipped because of " + e.getMessage()); - else - logger.error("Corrupt sstable " + descriptor + "; skipped", e); - } - - public static Collection batchOpen(Set>> entries, - final Set savedKeys, - final DataTracker tracker, - final CFMetaData metadata, - final IPartitioner partitioner) - { - final Collection sstables = new LinkedBlockingQueue(); - - ExecutorService executor = DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors()); - for (final Map.Entry> entry : entries) - { - Runnable runnable = new Runnable() - { - public void run() - { - SSTableReader sstable; - try - { - sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner); - } - catch (IOException ex) - { - logger.error("Corrupt sstable " + entry + "; skipped", ex); - return; - } - sstables.add(sstable); - } - }; - executor.submit(runnable); - } - - executor.shutdown(); - try - { - executor.awaitTermination(7, TimeUnit.DAYS); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - - return sstables; - - } - /** * Open a RowIndexedReader which already has its state initialized (by SSTableWriter). */