Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9AAAACAB9 for ; Thu, 31 May 2012 21:21:13 +0000 (UTC) Received: (qmail 80764 invoked by uid 500); 31 May 2012 21:21:13 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 80716 invoked by uid 500); 31 May 2012 21:21:13 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 80706 invoked by uid 99); 31 May 2012 21:21:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 May 2012 21:21:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 May 2012 21:21:03 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4ADA12388980 for ; Thu, 31 May 2012 21:20:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1344915 - in /accumulo/branches/ACCUMULO-578: bin/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/file/ core/src/main/java/org/... Date: Thu, 31 May 2012 21:20:40 -0000 To: commits@accumulo.apache.org From: ecn@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120531212041.4ADA12388980@eris.apache.org> Author: ecn Date: Thu May 31 21:20:39 2012 New Revision: 1344915 URL: http://svn.apache.org/viewvc?rev=1344915&view=rev Log: ACCUMULO-578 checkpoint after testing on the small cluster Added: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java (contents, props changed) - copied, changed from r1344409, accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/NamingThreadFactory.java Removed: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/NamingThreadFactory.java Modified: accumulo/branches/ACCUMULO-578/bin/tdown.sh accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java accumulo/branches/ACCUMULO-578/test/system/continuous/agitator.pl Modified: accumulo/branches/ACCUMULO-578/bin/tdown.sh URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/bin/tdown.sh?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/bin/tdown.sh (original) +++ accumulo/branches/ACCUMULO-578/bin/tdown.sh Thu May 31 21:20:39 2012 @@ -39,5 +39,5 @@ for server in `cat $SLAVES | grep -v '^# $ACCUMULO_HOME/bin/stop-server.sh $server "$ACCUMULO_HOME/.*/accumulo-start.*.jar" tserver KILL & done -echo 'Cleaning tablet server and logger entries from zookeeper' +echo 'Cleaning tablet server entries from zookeeper' $ACCUMULO_HOME/bin/accumulo org.apache.accumulo.server.util.ZooZap -tservers Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java (original) +++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java Thu May 31 21:20:39 2012 @@ -77,6 +77,7 @@ import org.apache.accumulo.core.util.Arg import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.MetadataTable; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.OpTimer; import org.apache.accumulo.core.util.StringUtil; import org.apache.accumulo.core.util.TextUtil; @@ -386,7 +387,7 @@ public class TableOperationsImpl extends CountDownLatch latch = new CountDownLatch(splits.size()); AtomicReference exception = new AtomicReference(null); - ExecutorService executor = Executors.newFixedThreadPool(16); + ExecutorService executor = Executors.newFixedThreadPool(16, new NamingThreadFactory("addSplits")); try { executor.submit(new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits)); Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java (original) +++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java Thu May 31 21:20:39 2012 @@ -23,10 +23,8 @@ import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -42,6 +40,7 @@ import org.apache.accumulo.core.data.Ran import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.thrift.AuthInfo; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; @@ -70,19 +69,8 @@ public class ScannerIterator implements private static final List EMPTY_LIST = Collections.emptyList(); - private static AtomicInteger threadCounter = new AtomicInteger(1); - private static ThreadPoolExecutor readaheadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3l, TimeUnit.SECONDS, new SynchronousQueue(), - new ThreadFactory() { - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setDaemon(true); - t.setName("Accumulo scanner read ahead thread " + threadCounter.getAndIncrement()); - return t; - } - }); + new NamingThreadFactory("Accumulo scanner read ahead thread")); private class Reader implements Runnable { Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java (original) +++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java Thu May 31 21:20:39 2012 @@ -21,11 +21,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.BatchScanner; @@ -36,6 +31,7 @@ import org.apache.accumulo.core.data.Val import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.util.ArgumentChecker; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.log4j.Logger; public class TabletServerBatchReader extends ScannerOptions implements BatchScanner { @@ -59,25 +55,6 @@ public class TabletServerBatchReader ext private final int batchReaderInstance = getNextBatchReaderInstance(); - private static class BatchReaderThreadFactory implements ThreadFactory { - - private ThreadFactory dtf = Executors.defaultThreadFactory(); - private int threadNum = 1; - private final int batchReaderInstance; - - BatchReaderThreadFactory(int batchReaderInstance) { - this.batchReaderInstance = batchReaderInstance; - } - - public Thread newThread(Runnable r) { - Thread thread = dtf.newThread(r); - thread.setName("batch scanner " + batchReaderInstance + "-" + threadNum++); - thread.setDaemon(true); - return thread; - } - - } - public TabletServerBatchReader(Instance instance, AuthInfo credentials, String table, Authorizations authorizations, int numQueryThreads) { ArgumentChecker.notNull(instance, credentials, table, authorizations); this.instance = instance; @@ -86,8 +63,7 @@ public class TabletServerBatchReader ext this.table = table; this.numThreads = numQueryThreads; - queryThreadPool = new ThreadPoolExecutor(numQueryThreads, numQueryThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), - new BatchReaderThreadFactory(batchReaderInstance)); + queryThreadPool = new SimpleThreadPool(numQueryThreads, "batch scanner " + batchReaderInstance + "-"); ranges = null; } Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java (original) +++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Thu May 31 21:20:39 2012 @@ -60,6 +60,7 @@ import org.apache.accumulo.core.tabletse import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; @@ -545,7 +546,7 @@ public class TabletServerBatchWriter { public MutationWriter(int numSendThreads) { serversMutations = new HashMap(); queued = new HashSet(); - sendThreadPool = Executors.newFixedThreadPool(numSendThreads); + sendThreadPool = Executors.newFixedThreadPool(numSendThreads, new NamingThreadFactory(this.getClass().getName())); } private void binMutations(MutationSet mutationsToProcess, Map binnedMutations) { Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java (original) +++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java Thu May 31 21:20:39 2012 @@ -29,10 +29,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +50,7 @@ import org.apache.accumulo.core.iterator import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.start.classloader.AccumuloClassLoader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -68,19 +68,6 @@ public class BloomFilterLayer { public static final String BLOOM_FILE_NAME = "acu_bloom"; public static final int HASH_COUNT = 5; - private static class BloomLoaderThreadFactory implements ThreadFactory { - - private ThreadFactory dtf = Executors.defaultThreadFactory(); - private int threadNum = 1; - - public Thread newThread(Runnable r) { - Thread thread = dtf.newThread(r); - thread.setName("bloom-loader-" + threadNum++); - thread.setDaemon(true); - return thread; - } - } - private static ExecutorService loadThreadPool = null; private static synchronized ExecutorService getLoadThreadPool(int maxLoadThreads) { @@ -89,7 +76,8 @@ public class BloomFilterLayer { } if (maxLoadThreads > 0) { - loadThreadPool = new ThreadPoolExecutor(0, maxLoadThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), new BloomLoaderThreadFactory()); + BlockingQueue q = new LinkedBlockingQueue(); + loadThreadPool = new ThreadPoolExecutor(0, maxLoadThreads, 60, TimeUnit.SECONDS, q, new NamingThreadFactory("bloom-loader")); } return loadThreadPool; Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java (original) +++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java Thu May 31 21:20:39 2012 @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -96,7 +97,7 @@ public class LruBlockCache implements Bl private final EvictionThread evictionThread; /** Statistics thread schedule pool (for heavy debugging, could remove) */ - private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats")); /** Current size of cache */ private final AtomicLong size; Copied: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java (from r1344409, accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/NamingThreadFactory.java) URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java?p2=accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java&p1=accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/NamingThreadFactory.java&r1=1344409&r2=1344915&rev=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/NamingThreadFactory.java (original) +++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java Thu May 31 21:20:39 2012 @@ -14,15 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.server.util; +package org.apache.accumulo.core.util; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.cloudtrace.instrument.TraceRunnable; +import org.apache.log4j.Logger; public class NamingThreadFactory implements ThreadFactory { + private static final Logger log = Logger.getLogger(NamingThreadFactory.class); - private ThreadFactory dtf = Executors.defaultThreadFactory(); - private int threadNum = 1; + private AtomicInteger threadNum = new AtomicInteger(1); private String name; public NamingThreadFactory(String name) { @@ -30,9 +33,7 @@ public class NamingThreadFactory impleme } public Thread newThread(Runnable r) { - Thread thread = dtf.newThread(r); - thread.setName(name + " " + threadNum++); - return thread; + return new Daemon(new LoggingRunnable(log, new TraceRunnable(r)), name + " " + threadNum.getAndIncrement()); } } Propchange: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Thu May 31 21:20:39 2012 @@ -57,6 +57,7 @@ import org.apache.accumulo.core.security import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.StopWatch; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; @@ -138,7 +139,7 @@ public class BulkImporter { final Map> assignments = Collections.synchronizedSortedMap(new TreeMap>()); timer.start(Timers.EXAMINE_MAP_FILES); - ExecutorService threadPool = Executors.newFixedThreadPool(numThreads); + ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping")); for (Path path : paths) { final Path mapFile = path; @@ -376,7 +377,7 @@ public class BulkImporter { final Map> ais = Collections.synchronizedMap(new TreeMap>()); - ExecutorService threadPool = Executors.newFixedThreadPool(numThreads); + ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes")); for (final Entry> entry : assignments.entrySet()) { if (entry.getValue().size() == 1) { @@ -586,12 +587,11 @@ public class BulkImporter { apt.put(entry.getKey(), entry.getValue()); } - ExecutorService threadPool = Executors.newFixedThreadPool(numThreads); + ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit")); for (Entry>> entry : assignmentsPerTabletServer.entrySet()) { String location = entry.getKey(); - threadPool - .submit(new TraceRunnable(new LoggingRunnable(log, new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue())))); + threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue())); } threadPool.shutdown(); Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Thu May 31 21:20:39 2012 @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; @@ -28,6 +29,7 @@ import java.util.UUID; import org.apache.accumulo.cloudtrace.instrument.Span; import org.apache.accumulo.cloudtrace.instrument.Trace; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.gc.thrift.GCStatus; @@ -35,10 +37,12 @@ import org.apache.accumulo.core.gc.thrif import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.security.SecurityConstants; import org.apache.accumulo.server.util.AddressUtil; import org.apache.accumulo.server.util.MetadataTable; import org.apache.accumulo.server.util.MetadataTable.LogEntry; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,12 +54,12 @@ import org.apache.zookeeper.KeeperExcept public class GarbageCollectWriteAheadLogs { private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class); - private final AccumuloConfiguration conf; + private final Instance instance; private final FileSystem fs; - GarbageCollectWriteAheadLogs(FileSystem fs, AccumuloConfiguration conf) { + GarbageCollectWriteAheadLogs(Instance instance, FileSystem fs) { + this.instance = instance; this.fs = fs; - this.conf = conf; } public void collect(GCStatus status) { @@ -103,7 +107,21 @@ public class GarbageCollectWriteAheadLog } } + boolean holdsLock(InetSocketAddress addr) { + try { + String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + org.apache.accumulo.core.util.AddressUtil.toString(addr); + List children = ZooReaderWriter.getInstance().getChildren(zpath); + return !(children == null || children.isEmpty()); + } catch (KeeperException.NoNodeException ex) { + return false; + } catch (Exception ex) { + log.debug(ex, ex); + return true; + } + } + private int removeFiles(Map> serverToFileMap, final GCStatus status) { + AccumuloConfiguration conf = instance.getConfiguration(); for (Entry> entry : serverToFileMap.entrySet()) { if (entry.getKey().length() == 0) { // old-style log entry, just remove it @@ -117,6 +135,8 @@ public class GarbageCollectWriteAheadLog } } else { InetSocketAddress address = AddressUtil.parseAddress(entry.getKey(), Property.TSERV_CLIENTPORT); + if (!holdsLock(address)) + continue; Iface tserver = null; try { tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); @@ -162,6 +182,7 @@ public class GarbageCollectWriteAheadLog } private int scanServers(Map fileToServerMap) throws Exception { + AccumuloConfiguration conf = instance.getConfiguration(); Path walRoot = new Path(Constants.getWalDirectory(conf)); for (FileStatus status : fs.listStatus(walRoot)) { String name = status.getPath().getName(); Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Thu May 31 21:20:39 2012 @@ -64,6 +64,7 @@ import org.apache.accumulo.core.master.s import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.UtilWaitThread; @@ -297,7 +298,7 @@ public class SimpleGarbageCollector impl // Clean up any unused write-ahead logs Span waLogs = Trace.start("walogs"); - GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(fs, instance.getConfiguration()); + GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs); try { log.info("Beginning garbage collection of write-ahead logs"); walogCollector.collect(status); @@ -585,7 +586,7 @@ public class SimpleGarbageCollector impl final BatchWriter finalWriter = writer; - ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads); + ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads, new NamingThreadFactory("deleting")); for (final String delete : confirmedDeletes) { Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Thu May 31 21:20:39 2012 @@ -27,10 +27,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.cloudtrace.instrument.TraceExecutorService; import org.apache.accumulo.core.Constants; @@ -49,8 +46,8 @@ import org.apache.accumulo.core.file.Fil import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -367,17 +364,8 @@ class LoadFiles extends MasterRepo { synchronized void initializeThreadPool(Master master) { if (threadPool == null) { - int THREAD_POOL_SIZE = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); - ThreadFactory threadFactory = new ThreadFactory() { - int count = 0; - - @Override - public Thread newThread(Runnable r) { - return new Daemon(r, "bulk loader " + count++); - } - }; - ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue(), - threadFactory); + int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); + ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import"); pool.allowCoreThreadTimeOut(true); threadPool = new TraceExecutorService(pool); } Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java Thu May 31 21:20:39 2012 @@ -40,11 +40,11 @@ import org.apache.accumulo.core.data.Ran import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.SortedKeyIterator; import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.security.SecurityConstants; import org.apache.accumulo.server.util.MetadataTable; -import org.apache.accumulo.server.util.NamingThreadFactory; import org.apache.accumulo.server.zookeeper.IZooReaderWriter; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.commons.collections.map.LRUMap; Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Thu May 31 21:20:39 2012 @@ -3644,10 +3644,12 @@ public class Tablet { private Set currentLogs = new HashSet(); - synchronized public Set getCurrentLogs() { + public Set getCurrentLogs() { Set result = new HashSet(); - for (DfsLogger log : currentLogs) { - result.add(log.toString()); + synchronized (currentLogs) { + for (DfsLogger log : currentLogs) { + result.add(log.toString()); + } } return result; } Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Thu May 31 21:20:39 2012 @@ -1853,7 +1853,7 @@ public class TabletServer extends Abstra final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent)); // Root tablet assignment must take place immediately if (extent.isRootTablet()) { - new Thread("Root Tablet Assignment") { + new Daemon("Root Tablet Assignment") { public void run() { ah.run(); if (onlineTablets.containsKey(extent)) { @@ -2060,14 +2060,16 @@ public class TabletServer extends Abstra continue nextFile; } // this check is not strictly necessary + List onlineTabletsCopy = new ArrayList(); synchronized (onlineTablets) { - for (Entry entry : onlineTablets.entrySet()) { - for (String current : entry.getValue().getCurrentLogs()) { - if (current.contains(filename)) { - log.error("Attempting to remove a write-ahead log that is in use. This should never happen!"); - log.info("Attempted to delete " + filename + " from tablet " + entry.getKey()); - continue nextFile; - } + onlineTabletsCopy.addAll(onlineTablets.values()); + } + for (Tablet tablet : onlineTabletsCopy) { + for (String current : tablet.getCurrentLogs()) { + if (current.contains(filename)) { + log.error("Attempting to remove a write-ahead log that is in use. This should never happen!"); + log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent()); + continue nextFile; } } } @@ -2521,7 +2523,7 @@ public class TabletServer extends Abstra AssignmentHandler handler = new AssignmentHandler(extentToOpen, retryAttempt + 1); if (extent.isMeta()) { if (extent.isRootTablet()) { - new Thread(new LoggingRunnable(log, handler), "Root tablet assignment retry").start(); + new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry").start(); } else { resourceManager.addMetaDataAssignment(handler); } @@ -3112,7 +3114,7 @@ public class TabletServer extends Abstra Instance instance = HdfsZooInstance.getInstance(); ServerConfiguration conf = new ServerConfiguration(instance); Accumulo.init(fs, conf, "tserver"); - recoverLocalWriteAheadLogs(fs, conf); + // recoverLocalWriteAheadLogs(fs, conf); TabletServer server = new TabletServer(conf, fs); server.config(hostname); Accumulo.enableTracing(hostname, "tserver"); Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Thu May 31 21:20:39 2012 @@ -45,12 +45,12 @@ import org.apache.accumulo.core.data.Key import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.MetadataTable.DataFileValue; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager; import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason; -import org.apache.accumulo.server.util.NamingThreadFactory; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.accumulo.start.classloader.AccumuloClassLoader; import org.apache.hadoop.fs.FileSystem; Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Thu May 31 21:20:39 2012 @@ -55,6 +55,14 @@ import org.apache.log4j.Logger; public class DfsLogger { private static Logger log = Logger.getLogger(DfsLogger.class); + public static class LogClosedException extends IOException { + private static final long serialVersionUID = 1L; + + public LogClosedException() { + super("LogClosed"); + } + } + public interface ServerResources { AccumuloConfiguration getConfiguration(); @@ -73,7 +81,7 @@ public class DfsLogger { private boolean closed = false; - private class LogWriterTask implements Runnable { + private class LogSyncingTask implements Runnable { @Override public void run() { @@ -91,7 +99,6 @@ public class DfsLogger { synchronized (closeLock) { if (!closed) { try { - logFile.flush(); logFile.sync(); } catch (IOException ex) { log.warn("Exception syncing " + ex); @@ -101,7 +108,7 @@ public class DfsLogger { } } else { for (DfsLogger.LogWork logWork : work) { - logWork.exception = new IOException("logger closed"); + logWork.exception = new LogClosedException(); } } } @@ -226,7 +233,7 @@ public class DfsLogger { throw ex; } - Thread t = new Daemon(new LogWriterTask()); + Thread t = new Daemon(new LogSyncingTask()); t.setName("Accumulo WALog thread " + toString()); t.start(); } @@ -272,7 +279,7 @@ public class DfsLogger { logFile.close(); } catch (IOException ex) { log.error(ex); - throw new IOException("Log file closed"); + throw new LogClosedException(); } } @@ -334,7 +341,7 @@ public class DfsLogger { // to wait on walog I/O operations if (closed) - throw new IOException("logger closed"); + throw new LogClosedException(); workQueue.add(work); } Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Thu May 31 21:20:39 2012 @@ -27,9 +27,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.TimerTask; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; @@ -37,6 +35,7 @@ import org.apache.accumulo.core.conf.Acc import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.master.thrift.RecoveryStatus; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.logger.LogFileKey; @@ -188,9 +187,7 @@ public class LogSorter { this.fs = fs; this.conf = conf; int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT); - this.threadPool = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue()); + this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName()); } public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException { @@ -260,6 +257,7 @@ public class LogSorter { // Great... we got the lock, but maybe we're too busy if (threadPool.getQueue().size() > 1) { lock.unlock(); + log.debug("got the lock, but thread pool is busy; released the lock on " + child); continue; } byte[] contents = zoo.getData(childPath, null); @@ -275,6 +273,8 @@ public class LogSorter { } } }); + } else { + log.info("failed to get the lock " + child); } } } catch (Throwable t) { Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Thu May 31 21:20:39 2012 @@ -208,6 +208,8 @@ public class TabletServerLogger { for (DfsLogger logger : loggers) { try { logger.close(); + } catch (DfsLogger.LogClosedException ex) { + // ignore } catch (Throwable ex) { log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex); } @@ -281,13 +283,11 @@ public class TabletServerLogger { // double-check: did the log set change? success = (currentLogSet == logSetId.get()); } + } catch (DfsLogger.LogClosedException ex) { + log.debug("Logs closed while writing, retrying " + (attempt + 1)); } catch (Exception t) { - if (attempt == 0) { - log.info("Log write failed: another thread probably closed the log", t); - } else { - log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t); - UtilWaitThread.sleep(100); - } + log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t); + UtilWaitThread.sleep(100); } finally { attempt++; } Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java Thu May 31 21:20:39 2012 @@ -19,11 +19,7 @@ package org.apache.accumulo.server.test. import java.net.InetAddress; import java.util.Properties; import java.util.Random; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableExistsException; @@ -31,16 +27,14 @@ import org.apache.accumulo.core.client.a import org.apache.accumulo.core.iterators.LongCombiner; import org.apache.accumulo.core.iterators.user.SummingCombiner; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.util.Daemon; -import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.server.test.randomwalk.State; import org.apache.accumulo.server.test.randomwalk.Test; import org.apache.hadoop.fs.FileSystem; public class Setup extends Test { - private static final int CORE_POOL_SIZE = 8; - private static final int MAX_POOL_SIZE = CORE_POOL_SIZE; + private static final int MAX_POOL_SIZE = 8; static String tableName = null; @Override @@ -67,14 +61,7 @@ public class Setup extends Test { state.set("fs", FileSystem.get(CachedConfiguration.getInstance())); BulkPlusOne.counter.set(0l); - BlockingQueue q = new LinkedBlockingQueue(); - ThreadFactory factory = new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Daemon(new LoggingRunnable(log, r)); - } - }; - ThreadPoolExecutor e = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 1, TimeUnit.SECONDS, q, factory); + ThreadPoolExecutor e = new SimpleThreadPool(MAX_POOL_SIZE, "bulkImportPool"); state.set("pool", e); } Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java Thu May 31 21:20:39 2012 @@ -25,18 +25,14 @@ import java.net.UnknownHostException; import java.nio.channels.ServerSocketChannel; import java.util.Random; import java.util.TimerTask; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.TBufferedSocket; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; @@ -205,7 +201,7 @@ public class TServerUtils { } } - public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, int numThreads, + public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads, long timeBetweenThreadChecks) throws TTransportException { TNonblockingServerSocket transport = new TNonblockingServerSocket(port); THsHaServer.Args options = new THsHaServer.Args(transport); @@ -214,21 +210,8 @@ public class TServerUtils { /* * Create our own very special thread pool. */ - // 1. name the threads for client connections - ThreadFactory factory = new ThreadFactory() { - AtomicInteger threadId = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - return new Thread(new LoggingRunnable(log, r), "ClientPool-" + threadId.getAndIncrement()); - } - }; - // 2. allow tasks to queue, potentially forever - final BlockingQueue queue = new LinkedBlockingQueue(); - // 3. keep the number of threads small - final int minimumThreadPoolSize = numThreads; - final ThreadPoolExecutor pool = new ThreadPoolExecutor(minimumThreadPoolSize, minimumThreadPoolSize, 10L, TimeUnit.SECONDS, queue, factory); - // 4. periodically adjust the number of threads we need by checking how busy our threads are + final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool"); + // periodically adjust the number of threads we need by checking how busy our threads are SimpleTimer.getInstance().schedule(new TimerTask() { @Override public void run() { @@ -239,7 +222,7 @@ public class TServerUtils { pool.setCorePoolSize(larger); } else { if (pool.getCorePoolSize() > pool.getActiveCount() + 3) { - int smaller = Math.max(minimumThreadPoolSize, pool.getCorePoolSize() - 1); + int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1); if (smaller != pool.getCorePoolSize()) { // there is a race condition here... the active count could be higher by the time // we decrease the core pool size... so the active count could end up higher than Modified: accumulo/branches/ACCUMULO-578/test/system/continuous/agitator.pl URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/test/system/continuous/agitator.pl?rev=1344915&r1=1344914&r2=1344915&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/test/system/continuous/agitator.pl (original) +++ accumulo/branches/ACCUMULO-578/test/system/continuous/agitator.pl Thu May 31 21:20:39 2012 @@ -74,27 +74,8 @@ while(1){ $t = strftime "%Y%m%d %H:%M:%S", localtime; $rn = rand(1); - $kill_tserver = 0; - $kill_logger = 0; - if($rn <.33){ - $kill_tserver = 1; - $kill_logger = 1; - }elsif($rn < .66){ - $kill_tserver = 1; - $kill_logger = 0; - }else{ - $kill_tserver = 0; - $kill_logger = 1; - } - - print STDERR "$t Killing $server $kill_tserver $kill_logger\n"; - if($kill_tserver) { - system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" tserver KILL"); - } - - if($kill_logger) { - system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" logger KILL"); - } + print STDERR "$t Killing $server\n"; + system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" tserver KILL"); } sleep($sleep2 * 60);