accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1344915 - in /accumulo/branches/ACCUMULO-578: bin/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/file/ core/src/main/java/org/...
Date Thu, 31 May 2012 21:20:40 GMT
Author: ecn
Date: Thu May 31 21:20:39 2012
New Revision: 1344915

URL: http://svn.apache.org/viewvc?rev=1344915&view=rev
Log:
ACCUMULO-578 checkpoint after testing on the small cluster

Added:
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java   (contents, props changed)
      - copied, changed from r1344409, accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/NamingThreadFactory.java
Removed:
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/NamingThreadFactory.java
Modified:
    accumulo/branches/ACCUMULO-578/bin/tdown.sh
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
    accumulo/branches/ACCUMULO-578/test/system/continuous/agitator.pl

Modified: accumulo/branches/ACCUMULO-578/bin/tdown.sh
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/bin/tdown.sh?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/bin/tdown.sh (original)
+++ accumulo/branches/ACCUMULO-578/bin/tdown.sh Thu May 31 21:20:39 2012
@@ -39,5 +39,5 @@ for server in `cat $SLAVES | grep -v '^#
         $ACCUMULO_HOME/bin/stop-server.sh $server "$ACCUMULO_HOME/.*/accumulo-start.*.jar" tserver KILL & 
 done
 
-echo 'Cleaning tablet server and logger entries from zookeeper'
+echo 'Cleaning tablet server entries from zookeeper'
 $ACCUMULO_HOME/bin/accumulo org.apache.accumulo.server.util.ZooZap -tservers

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java Thu May 31 21:20:39 2012
@@ -77,6 +77,7 @@ import org.apache.accumulo.core.util.Arg
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.util.MetadataTable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.OpTimer;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.core.util.TextUtil;
@@ -386,7 +387,7 @@ public class TableOperationsImpl extends
     CountDownLatch latch = new CountDownLatch(splits.size());
     AtomicReference<Exception> exception = new AtomicReference<Exception>(null);
     
-    ExecutorService executor = Executors.newFixedThreadPool(16);
+    ExecutorService executor = Executors.newFixedThreadPool(16, new NamingThreadFactory("addSplits"));
     try {
       executor.submit(new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));
 

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java Thu May 31 21:20:39 2012
@@ -23,10 +23,8 @@ import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -42,6 +40,7 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
@@ -70,19 +69,8 @@ public class ScannerIterator implements 
   
   private static final List<KeyValue> EMPTY_LIST = Collections.emptyList();
   
-  private static AtomicInteger threadCounter = new AtomicInteger(1);
-
   private static ThreadPoolExecutor readaheadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3l, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
-      new ThreadFactory() {
-        
-        @Override
-        public Thread newThread(Runnable r) {
-          Thread t = new Thread(r);
-          t.setDaemon(true);
-          t.setName("Accumulo scanner read ahead thread " + threadCounter.getAndIncrement());
-          return t;
-        }
-      });
+      new NamingThreadFactory("Accumulo scanner read ahead thread"));
 
   private class Reader implements Runnable {
     

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java Thu May 31 21:20:39 2012
@@ -21,11 +21,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchScanner;
@@ -36,6 +31,7 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.log4j.Logger;
 
 public class TabletServerBatchReader extends ScannerOptions implements BatchScanner {
@@ -59,25 +55,6 @@ public class TabletServerBatchReader ext
   
   private final int batchReaderInstance = getNextBatchReaderInstance();
   
-  private static class BatchReaderThreadFactory implements ThreadFactory {
-    
-    private ThreadFactory dtf = Executors.defaultThreadFactory();
-    private int threadNum = 1;
-    private final int batchReaderInstance;
-    
-    BatchReaderThreadFactory(int batchReaderInstance) {
-      this.batchReaderInstance = batchReaderInstance;
-    }
-    
-    public Thread newThread(Runnable r) {
-      Thread thread = dtf.newThread(r);
-      thread.setName("batch scanner " + batchReaderInstance + "-" + threadNum++);
-      thread.setDaemon(true);
-      return thread;
-    }
-    
-  }
-  
   public TabletServerBatchReader(Instance instance, AuthInfo credentials, String table, Authorizations authorizations, int numQueryThreads) {
     ArgumentChecker.notNull(instance, credentials, table, authorizations);
     this.instance = instance;
@@ -86,8 +63,7 @@ public class TabletServerBatchReader ext
     this.table = table;
     this.numThreads = numQueryThreads;
     
-    queryThreadPool = new ThreadPoolExecutor(numQueryThreads, numQueryThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
-        new BatchReaderThreadFactory(batchReaderInstance));
+    queryThreadPool = new SimpleThreadPool(numQueryThreads, "batch scanner " + batchReaderInstance + "-");
     
     ranges = null;
   }

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java Thu May 31 21:20:39 2012
@@ -60,6 +60,7 @@ import org.apache.accumulo.core.tabletse
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
@@ -545,7 +546,7 @@ public class TabletServerBatchWriter {
     public MutationWriter(int numSendThreads) {
       serversMutations = new HashMap<String,TabletServerMutations>();
       queued = new HashSet<String>();
-      sendThreadPool = Executors.newFixedThreadPool(numSendThreads);
+      sendThreadPool = Executors.newFixedThreadPool(numSendThreads, new NamingThreadFactory(this.getClass().getName()));
     }
     
     private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) {

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java Thu May 31 21:20:39 2012
@@ -29,10 +29,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +50,7 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -68,19 +68,6 @@ public class BloomFilterLayer {
   public static final String BLOOM_FILE_NAME = "acu_bloom";
   public static final int HASH_COUNT = 5;
   
-  private static class BloomLoaderThreadFactory implements ThreadFactory {
-    
-    private ThreadFactory dtf = Executors.defaultThreadFactory();
-    private int threadNum = 1;
-    
-    public Thread newThread(Runnable r) {
-      Thread thread = dtf.newThread(r);
-      thread.setName("bloom-loader-" + threadNum++);
-      thread.setDaemon(true);
-      return thread;
-    }
-  }
-  
   private static ExecutorService loadThreadPool = null;
   
   private static synchronized ExecutorService getLoadThreadPool(int maxLoadThreads) {
@@ -89,7 +76,8 @@ public class BloomFilterLayer {
     }
     
     if (maxLoadThreads > 0) {
-      loadThreadPool = new ThreadPoolExecutor(0, maxLoadThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new BloomLoaderThreadFactory());
+      BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
+      loadThreadPool = new ThreadPoolExecutor(0, maxLoadThreads, 60, TimeUnit.SECONDS, q, new NamingThreadFactory("bloom-loader"));
     }
     
     return loadThreadPool;

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java Thu May 31 21:20:39 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -96,7 +97,7 @@ public class LruBlockCache implements Bl
   private final EvictionThread evictionThread;
   
   /** Statistics thread schedule pool (for heavy debugging, could remove) */
-  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1);
+  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats"));
   
   /** Current size of cache */
   private final AtomicLong size;

Copied: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java (from r1344409, accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/NamingThreadFactory.java)
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java?p2=accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java&p1=accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/NamingThreadFactory.java&r1=1344409&r2=1344915&rev=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/NamingThreadFactory.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java Thu May 31 21:20:39 2012
@@ -14,15 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.accumulo.server.util;
+package org.apache.accumulo.core.util;
 
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.cloudtrace.instrument.TraceRunnable;
+import org.apache.log4j.Logger;
 
 public class NamingThreadFactory implements ThreadFactory {
+  private static final Logger log = Logger.getLogger(NamingThreadFactory.class);
   
-  private ThreadFactory dtf = Executors.defaultThreadFactory();
-  private int threadNum = 1;
+  private AtomicInteger threadNum = new AtomicInteger(1);
   private String name;
   
   public NamingThreadFactory(String name) {
@@ -30,9 +33,7 @@ public class NamingThreadFactory impleme
   }
   
   public Thread newThread(Runnable r) {
-    Thread thread = dtf.newThread(r);
-    thread.setName(name + " " + threadNum++);
-    return thread;
+    return new Daemon(new LoggingRunnable(log, new TraceRunnable(r)), name + " " + threadNum.getAndIncrement());
   }
   
 }

Propchange: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Thu May 31 21:20:39 2012
@@ -57,6 +57,7 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.StopWatch;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -138,7 +139,7 @@ public class BulkImporter {
       final Map<Path,List<TabletLocation>> assignments = Collections.synchronizedSortedMap(new TreeMap<Path,List<TabletLocation>>());
       
       timer.start(Timers.EXAMINE_MAP_FILES);
-      ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+      ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
       
       for (Path path : paths) {
         final Path mapFile = path;
@@ -376,7 +377,7 @@ public class BulkImporter {
     
     final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<Path,List<AssignmentInfo>>());
     
-    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
     
     for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
       if (entry.getValue().size() == 1) {
@@ -586,12 +587,11 @@ public class BulkImporter {
       apt.put(entry.getKey(), entry.getValue());
     }
     
-    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
     
     for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer.entrySet()) {
       String location = entry.getKey();
-      threadPool
-          .submit(new TraceRunnable(new LoggingRunnable(log, new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()))));
+      threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()));
     }
     
     threadPool.shutdown();

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Thu May 31 21:20:39 2012
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
@@ -28,6 +29,7 @@ import java.util.UUID;
 import org.apache.accumulo.cloudtrace.instrument.Span;
 import org.apache.accumulo.cloudtrace.instrument.Trace;
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
@@ -35,10 +37,12 @@ import org.apache.accumulo.core.gc.thrif
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.util.MetadataTable.LogEntry;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,12 +54,12 @@ import org.apache.zookeeper.KeeperExcept
 public class GarbageCollectWriteAheadLogs {
   private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
   
-  private final AccumuloConfiguration conf;
+  private final Instance instance;
   private final FileSystem fs;
   
-  GarbageCollectWriteAheadLogs(FileSystem fs, AccumuloConfiguration conf) {
+  GarbageCollectWriteAheadLogs(Instance instance, FileSystem fs) {
+    this.instance = instance;
     this.fs = fs;
-    this.conf = conf;
   }
 
   public void collect(GCStatus status) {
@@ -103,7 +107,21 @@ public class GarbageCollectWriteAheadLog
     }
   }
   
+  boolean holdsLock(InetSocketAddress addr) {
+    try {
+      String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + org.apache.accumulo.core.util.AddressUtil.toString(addr);
+      List<String> children = ZooReaderWriter.getInstance().getChildren(zpath);
+      return !(children == null || children.isEmpty());
+    } catch (KeeperException.NoNodeException ex) {
+      return false;
+    } catch (Exception ex) {
+      log.debug(ex, ex);
+      return true;
+    }
+  }
+
   private int removeFiles(Map<String,ArrayList<String>> serverToFileMap, final GCStatus status) {
+    AccumuloConfiguration conf = instance.getConfiguration();
     for (Entry<String,ArrayList<String>> entry : serverToFileMap.entrySet()) {
       if (entry.getKey().length() == 0) {
         // old-style log entry, just remove it
@@ -117,6 +135,8 @@ public class GarbageCollectWriteAheadLog
         }
       } else {
         InetSocketAddress address = AddressUtil.parseAddress(entry.getKey(), Property.TSERV_CLIENTPORT);
+        if (!holdsLock(address))
+          continue;
         Iface tserver = null;
         try {
           tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
@@ -162,6 +182,7 @@ public class GarbageCollectWriteAheadLog
   }
   
   private int scanServers(Map<String,String> fileToServerMap) throws Exception {
+    AccumuloConfiguration conf = instance.getConfiguration();
     Path walRoot = new Path(Constants.getWalDirectory(conf));
     for (FileStatus status : fs.listStatus(walRoot)) {
       String name = status.getPath().getName();

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Thu May 31 21:20:39 2012
@@ -64,6 +64,7 @@ import org.apache.accumulo.core.master.s
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -297,7 +298,7 @@ public class SimpleGarbageCollector impl
       
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
-      GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(fs, instance.getConfiguration());
+      GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs);
       try {
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
@@ -585,7 +586,7 @@ public class SimpleGarbageCollector impl
     
     final BatchWriter finalWriter = writer;
     
-    ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads);
+    ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads, new NamingThreadFactory("deleting"));
     
     for (final String delete : confirmedDeletes) {
       

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Thu May 31 21:20:39 2012
@@ -27,10 +27,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.cloudtrace.instrument.TraceExecutorService;
 import org.apache.accumulo.core.Constants;
@@ -49,8 +46,8 @@ import org.apache.accumulo.core.file.Fil
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -367,17 +364,8 @@ class LoadFiles extends MasterRepo {
   
   synchronized void initializeThreadPool(Master master) {
     if (threadPool == null) {
-      int THREAD_POOL_SIZE = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
-      ThreadFactory threadFactory = new ThreadFactory() {
-        int count = 0;
-        
-        @Override
-        public Thread newThread(Runnable r) {
-          return new Daemon(r, "bulk loader " + count++);
-        }
-      };
-      ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
-          threadFactory);
+      int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
+      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
       pool.allowCoreThreadTimeOut(true);
       threadPool = new TraceExecutorService(pool);
     }

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java Thu May 31 21:20:39 2012
@@ -40,11 +40,11 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedKeyIterator;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.accumulo.server.util.NamingThreadFactory;
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.commons.collections.map.LRUMap;

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Thu May 31 21:20:39 2012
@@ -3644,10 +3644,12 @@ public class Tablet {
   
   private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>();
   
-  synchronized public Set<String> getCurrentLogs() {
+  public Set<String> getCurrentLogs() {
     Set<String> result = new HashSet<String>();
-    for (DfsLogger log : currentLogs) {
-      result.add(log.toString());
+    synchronized (currentLogs) {
+      for (DfsLogger log : currentLogs) {
+        result.add(log.toString());
+      }
     }
     return result;
   }

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Thu May 31 21:20:39 2012
@@ -1853,7 +1853,7 @@ public class TabletServer extends Abstra
       final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
       // Root tablet assignment must take place immediately
       if (extent.isRootTablet()) {
-        new Thread("Root Tablet Assignment") {
+        new Daemon("Root Tablet Assignment") {
           public void run() {
             ah.run();
             if (onlineTablets.containsKey(extent)) {
@@ -2060,14 +2060,16 @@ public class TabletServer extends Abstra
             continue nextFile;
         }
         // this check is not strictly necessary
+        List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
         synchronized (onlineTablets) {
-          for (Entry<KeyExtent,Tablet> entry : onlineTablets.entrySet()) {
-            for (String current : entry.getValue().getCurrentLogs()) {
-              if (current.contains(filename)) {
-                log.error("Attempting to remove a write-ahead log that is in use.  This should never happen!");
-                log.info("Attempted to delete " + filename + " from tablet " + entry.getKey());
-                continue nextFile;
-              }
+          onlineTabletsCopy.addAll(onlineTablets.values());
+        }
+        for (Tablet tablet : onlineTabletsCopy) {
+          for (String current : tablet.getCurrentLogs()) {
+            if (current.contains(filename)) {
+              log.error("Attempting to remove a write-ahead log that is in use.  This should never happen!");
+              log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
+              continue nextFile;
             }
           }
         }
@@ -2521,7 +2523,7 @@ public class TabletServer extends Abstra
               AssignmentHandler handler = new AssignmentHandler(extentToOpen, retryAttempt + 1);
               if (extent.isMeta()) {
                 if (extent.isRootTablet()) {
-                  new Thread(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
+                  new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
                 } else {
                   resourceManager.addMetaDataAssignment(handler);
                 }
@@ -3112,7 +3114,7 @@ public class TabletServer extends Abstra
       Instance instance = HdfsZooInstance.getInstance();
       ServerConfiguration conf = new ServerConfiguration(instance);
       Accumulo.init(fs, conf, "tserver");
-      recoverLocalWriteAheadLogs(fs, conf);
+      // recoverLocalWriteAheadLogs(fs, conf);
       TabletServer server = new TabletServer(conf, fs);
       server.config(hostname);
       Accumulo.enableTracing(hostname, "tserver");

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Thu May 31 21:20:39 2012
@@ -45,12 +45,12 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
 import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
-import org.apache.accumulo.server.util.NamingThreadFactory;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
 import org.apache.hadoop.fs.FileSystem;

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Thu May 31 21:20:39 2012
@@ -55,6 +55,14 @@ import org.apache.log4j.Logger;
 public class DfsLogger {
   private static Logger log = Logger.getLogger(DfsLogger.class);
   
+  public static class LogClosedException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public LogClosedException() {
+      super("LogClosed");
+    }
+  }
+
   public interface ServerResources {
     AccumuloConfiguration getConfiguration();
     
@@ -73,7 +81,7 @@ public class DfsLogger {
   
   private boolean closed = false;
 
-  private class LogWriterTask implements Runnable {
+  private class LogSyncingTask implements Runnable {
 
     @Override
     public void run() {
@@ -91,7 +99,6 @@ public class DfsLogger {
         synchronized (closeLock) {
           if (!closed) {
             try {
-              logFile.flush();
               logFile.sync();
             } catch (IOException ex) {
               log.warn("Exception syncing " + ex);
@@ -101,7 +108,7 @@ public class DfsLogger {
             }
           } else {
             for (DfsLogger.LogWork logWork : work) {
-              logWork.exception = new IOException("logger closed");
+              logWork.exception = new LogClosedException();
             }
           }
         }
@@ -226,7 +233,7 @@ public class DfsLogger {
       throw ex;
     }
     
-    Thread t = new Daemon(new LogWriterTask());
+    Thread t = new Daemon(new LogSyncingTask());
     t.setName("Accumulo WALog thread " + toString());
     t.start();
   }
@@ -272,7 +279,7 @@ public class DfsLogger {
         logFile.close();
       } catch (IOException ex) {
         log.error(ex);
-        throw new IOException("Log file closed");
+        throw new LogClosedException();
       }
   }
   
@@ -334,7 +341,7 @@ public class DfsLogger {
       // to wait on walog I/O operations
 
       if (closed)
-        throw new IOException("logger closed");
+        throw new LogClosedException();
       workQueue.add(work);
     }
 

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Thu May 31 21:20:39 2012
@@ -27,9 +27,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.TimerTask;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
@@ -37,6 +35,7 @@ import org.apache.accumulo.core.conf.Acc
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.RecoveryStatus;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.logger.LogFileKey;
@@ -188,9 +187,7 @@ public class LogSorter {
     this.fs = fs;
     this.conf = conf;
     int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
-    this.threadPool = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,
-        0L, TimeUnit.MILLISECONDS,
-        new LinkedBlockingQueue<Runnable>());
+    this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
   }
   
   public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException {
@@ -260,6 +257,7 @@ public class LogSorter {
           // Great... we got the lock, but maybe we're too busy
           if (threadPool.getQueue().size() > 1) {
             lock.unlock();
+            log.debug("got the lock, but thread pool is busy; released the lock on " + child);
             continue;
           }
           byte[] contents = zoo.getData(childPath, null);
@@ -275,6 +273,8 @@ public class LogSorter {
               }
             }
           });
+        } else {
+          log.info("failed to get the lock " + child);
         }
       }
     } catch (Throwable t) {

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Thu May 31 21:20:39 2012
@@ -208,6 +208,8 @@ public class TabletServerLogger {
       for (DfsLogger logger : loggers) {
         try {
           logger.close();
+        } catch (DfsLogger.LogClosedException ex) {
+          // ignore
         } catch (Throwable ex) {
           log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex);
         }
@@ -281,13 +283,11 @@ public class TabletServerLogger {
           // double-check: did the log set change?
           success = (currentLogSet == logSetId.get());
         }
+      } catch (DfsLogger.LogClosedException ex) {
+        log.debug("Logs closed while writing, retrying " + (attempt + 1));
       } catch (Exception t) {
-        if (attempt == 0) {
-          log.info("Log write failed: another thread probably closed the log", t);
-        } else {
-          log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
-          UtilWaitThread.sleep(100);
-        }
+        log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
+        UtilWaitThread.sleep(100);
       } finally {
         attempt++;
       }

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java Thu May 31 21:20:39 2012
@@ -19,11 +19,7 @@ package org.apache.accumulo.server.test.
 import java.net.InetAddress;
 import java.util.Properties;
 import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.TableExistsException;
@@ -31,16 +27,14 @@ import org.apache.accumulo.core.client.a
 import org.apache.accumulo.core.iterators.LongCombiner;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.server.test.randomwalk.State;
 import org.apache.accumulo.server.test.randomwalk.Test;
 import org.apache.hadoop.fs.FileSystem;
 
 public class Setup extends Test {
   
-  private static final int CORE_POOL_SIZE = 8;
-  private static final int MAX_POOL_SIZE = CORE_POOL_SIZE;
+  private static final int MAX_POOL_SIZE = 8;
   static String tableName = null;
   
   @Override
@@ -67,14 +61,7 @@ public class Setup extends Test {
     state.set("fs", FileSystem.get(CachedConfiguration.getInstance()));
     BulkPlusOne.counter.set(0l);
     
-    BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
-    ThreadFactory factory = new ThreadFactory() {
-      @Override
-      public Thread newThread(Runnable r) {
-        return new Daemon(new LoggingRunnable(log, r));
-      }
-    };
-    ThreadPoolExecutor e = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 1, TimeUnit.SECONDS, q, factory);
+    ThreadPoolExecutor e = new SimpleThreadPool(MAX_POOL_SIZE, "bulkImportPool");
     state.set("pool", e);
   }
   

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java Thu May 31 21:20:39 2012
@@ -25,18 +25,14 @@ import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
 import java.util.Random;
 import java.util.TimerTask;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.TBufferedSocket;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -205,7 +201,7 @@ public class TServerUtils {
     }
   }
   
-  public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, int numThreads,
+  public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads,
       long timeBetweenThreadChecks) throws TTransportException {
     TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
     THsHaServer.Args options = new THsHaServer.Args(transport);
@@ -214,21 +210,8 @@ public class TServerUtils {
     /*
      * Create our own very special thread pool.
      */
-    // 1. name the threads for client connections
-    ThreadFactory factory = new ThreadFactory() {
-      AtomicInteger threadId = new AtomicInteger();
-      
-      @Override
-      public Thread newThread(Runnable r) {
-        return new Thread(new LoggingRunnable(log, r), "ClientPool-" + threadId.getAndIncrement());
-      }
-    };
-    // 2. allow tasks to queue, potentially forever
-    final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
-    // 3. keep the number of threads small
-    final int minimumThreadPoolSize = numThreads;
-    final ThreadPoolExecutor pool = new ThreadPoolExecutor(minimumThreadPoolSize, minimumThreadPoolSize, 10L, TimeUnit.SECONDS, queue, factory);
-    // 4. periodically adjust the number of threads we need by checking how busy our threads are
+    final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+    // periodically adjust the number of threads we need by checking how busy our threads are
     SimpleTimer.getInstance().schedule(new TimerTask() {
       @Override
       public void run() {
@@ -239,7 +222,7 @@ public class TServerUtils {
           pool.setCorePoolSize(larger);
         } else {
           if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
-            int smaller = Math.max(minimumThreadPoolSize, pool.getCorePoolSize() - 1);
+            int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
             if (smaller != pool.getCorePoolSize()) {
               // there is a race condition here... the active count could be higher by the time
               // we decrease the core pool size... so the active count could end up higher than

Modified: accumulo/branches/ACCUMULO-578/test/system/continuous/agitator.pl
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/test/system/continuous/agitator.pl?rev=1344915&r1=1344914&r2=1344915&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/test/system/continuous/agitator.pl (original)
+++ accumulo/branches/ACCUMULO-578/test/system/continuous/agitator.pl Thu May 31 21:20:39 2012
@@ -74,27 +74,8 @@ while(1){
 		$t = strftime "%Y%m%d %H:%M:%S", localtime;
 	
 		$rn = rand(1);
-		$kill_tserver = 0;
-		$kill_logger = 0;
-		if($rn <.33){
-			$kill_tserver = 1;
-			$kill_logger = 1;
-		}elsif($rn < .66){
-			$kill_tserver = 1;
-			$kill_logger = 0;
-		}else{
-			$kill_tserver = 0;
-			$kill_logger = 1;
-		}
-	
-		print STDERR "$t Killing $server $kill_tserver $kill_logger\n";
-	        if($kill_tserver) {
-			system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" tserver KILL");
-		}
-
-	        if($kill_logger) {
-			system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" logger KILL");
-		}
+		print STDERR "$t Killing $server\n";
+		system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" tserver KILL");
 	}
 
 	sleep($sleep2 * 60);



Mime
View raw message