accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1346380 [5/5] - in /accumulo/trunk: ./ bin/ core/src/main/java/org/apache/accumulo/core/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accum...
Date Tue, 05 Jun 2012 13:18:25 GMT
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Tue Jun  5 13:18:22 2012
@@ -18,11 +18,11 @@ package org.apache.accumulo.server.table
 
 import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
-import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
@@ -45,6 +45,7 @@ import java.util.SortedSet;
 import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
@@ -97,7 +98,6 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.data.thrift.UpdateErrors;
 import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
-import org.apache.accumulo.core.master.MasterNotRunningException;
 import org.apache.accumulo.core.master.thrift.Compacting;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.master.thrift.TableInfo;
@@ -136,6 +136,8 @@ import org.apache.accumulo.server.client
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
 import org.apache.accumulo.server.master.state.TServerInstance;
@@ -159,10 +161,9 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.tabletserver.Tablet.TabletClosedException;
 import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
-import org.apache.accumulo.server.tabletserver.log.LoggerStrategy;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
+import org.apache.accumulo.server.tabletserver.log.LogSorter;
 import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
-import org.apache.accumulo.server.tabletserver.log.RoundRobinLoggerStrategy;
 import org.apache.accumulo.server.tabletserver.log.TabletServerLogger;
 import org.apache.accumulo.server.tabletserver.mastermessage.MasterMessage;
 import org.apache.accumulo.server.tabletserver.mastermessage.SplitReportMessage;
@@ -189,9 +190,12 @@ import org.apache.accumulo.server.zookee
 import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.start.Platform;
-import org.apache.accumulo.start.classloader.AccumuloClassLoader;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
@@ -211,22 +215,22 @@ public class TabletServer extends Abstra
   private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
   private static long lastMemorySize = 0;
   private static long gcTimeIncreasedCount;
-  private static final Class<? extends LoggerStrategy> DEFAULT_LOGGER_STRATEGY = RoundRobinLoggerStrategy.class;
   
   private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
   
   private TabletServerLogger logger;
-  private LoggerStrategy loggerStrategy;
   
   protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
   
   private ServerConfiguration serverConfig;
+  private LogSorter logSorter = null;
   
   public TabletServer(ServerConfiguration conf, FileSystem fs) {
     super();
     this.serverConfig = conf;
     this.instance = conf.getInstance();
     this.fs = TraceFileSystem.wrap(fs);
+    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
     SimpleTimer.getInstance().schedule(new TimerTask() {
       @Override
       public void run() {
@@ -897,7 +901,7 @@ public class TabletServer extends Abstra
         
         ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
         String oldThreadName = Thread.currentThread().getName();
-
+        
         try {
           runState.set(ScanRunState.RUNNING);
           Thread.currentThread().setName(
@@ -962,7 +966,7 @@ public class TabletServer extends Abstra
           Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
           if (isCancelled() || session == null)
             return;
-
+          
           TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
           long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
           long bytesAdded = 0;
@@ -1850,7 +1854,7 @@ public class TabletServer extends Abstra
       final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
       // Root tablet assignment must take place immediately
       if (extent.isRootTablet()) {
-        new Thread("Root Tablet Assignment") {
+        new Daemon("Root Tablet Assignment") {
           public void run() {
             ah.run();
             if (onlineTablets.containsKey(extent)) {
@@ -1973,12 +1977,6 @@ public class TabletServer extends Abstra
       return statsKeeper.getTabletStats();
     }
     
-    @Override
-    public void useLoggers(TInfo tinfo, AuthInfo credentials, Set<String> loggers) throws TException {
-      loggerStrategy.preferLoggers(loggers);
-    }
-    
-    @Override
     public List<ActiveScan> getActiveScans(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
       
       try {
@@ -2042,6 +2040,60 @@ public class TabletServer extends Abstra
       
     }
     
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.cloudtrace.thrift.TInfo,
+     * org.apache.accumulo.core.security.thrift.AuthInfo, java.util.List)
+     */
+    @Override
+    public void removeLogs(TInfo tinfo, AuthInfo credentials, List<String> filenames) throws TException {
+      String myname = getClientAddressString();
+      myname = myname.replace(':', '+');
+      Path logDir = new Path(Constants.getWalDirectory(acuConf), myname);
+      Set<String> loggers = new HashSet<String>();
+      logger.getLoggers(loggers);
+      nextFile:
+      for (String filename : filenames) {
+        for (String logger : loggers) {
+          if (logger.contains(filename))
+            continue nextFile;
+        }
+        List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
+        synchronized (onlineTablets) {
+          onlineTabletsCopy.addAll(onlineTablets.values());
+        }
+        for (Tablet tablet : onlineTabletsCopy) {
+          for (String current : tablet.getCurrentLogs()) {
+            if (current.contains(filename)) {
+              log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
+              continue nextFile;
+            }
+          }
+        }
+        try {
+          String source = logDir + "/" + filename;
+          if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
+            String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
+            fs.mkdirs(new Path(walogArchive));
+            String dest = walogArchive + "/" + filename;
+            log.info("Archiving walog " + source + " to " + dest);
+            if (!fs.rename(new Path(source), new Path(dest)))
+              log.error("rename is unsuccessful");
+          } else {
+            log.info("Deleting walog " + filename);
+            if (!fs.delete(new Path(source), true))
+              log.warn("Failed to delete walog " + source);
+            if (fs.delete(new Path(Constants.getRecoveryDir(acuConf), filename), true))
+              log.info("Deleted any recovery log " + filename);
+
+          }
+        } catch (IOException e) {
+          log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
+        }
+      }
+    }
+    
   }
   
   private class SplitRunner implements Runnable {
@@ -2476,7 +2528,7 @@ public class TabletServer extends Abstra
               AssignmentHandler handler = new AssignmentHandler(extentToOpen, retryAttempt + 1);
               if (extent.isMeta()) {
                 if (extent.isRootTablet()) {
-                  new Thread(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
+                  new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
                 } else {
                   resourceManager.addMetaDataAssignment(handler);
                 }
@@ -2494,7 +2546,6 @@ public class TabletServer extends Abstra
   
   private FileSystem fs;
   private Instance instance;
-  private ZooCache cache;
   
   private SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
   private SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
@@ -2527,48 +2578,23 @@ public class TabletServer extends Abstra
     return statsKeeper;
   }
   
-  public Set<String> getLoggers() throws TException, MasterNotRunningException, ThriftSecurityException {
-    Set<String> allLoggers = new HashSet<String>();
-    String dir = ZooUtil.getRoot(instance) + Constants.ZLOGGERS;
-    for (String child : cache.getChildren(dir)) {
-      allLoggers.add(new String(cache.get(dir + "/" + child)));
-    }
-    if (allLoggers.isEmpty()) {
-      log.warn("there are no loggers registered in zookeeper");
-      return allLoggers;
-    }
-    Set<String> result = loggerStrategy.getLoggers(Collections.unmodifiableSet(allLoggers));
-    Set<String> bogus = new HashSet<String>(result);
-    bogus.removeAll(allLoggers);
-    if (!bogus.isEmpty())
-      log.warn("logger strategy is returning loggers that are not candidates");
-    result.removeAll(bogus);
-    if (result.isEmpty())
-      log.warn("strategy returned no useful loggers");
-    return result;
-  }
-  
-  public void addLoggersToMetadata(List<RemoteLogger> logs, KeyExtent extent, int id) {
+  public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) {
     log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
     
-    List<MetadataTable.LogEntry> entries = new ArrayList<MetadataTable.LogEntry>();
     long now = RelativeTime.currentTimeMillis();
     List<String> logSet = new ArrayList<String>();
-    for (RemoteLogger log : logs)
+    for (DfsLogger log : logs)
       logSet.add(log.toString());
-    for (RemoteLogger log : logs) {
-      MetadataTable.LogEntry entry = new MetadataTable.LogEntry();
-      entry.extent = extent;
-      entry.tabletId = id;
-      entry.timestamp = now;
-      entry.server = log.getLogger();
-      entry.filename = log.getFileName();
-      entry.logSet = logSet;
-      entries.add(entry);
-    }
-    MetadataTable.addLogEntries(SecurityConstants.getSystemCredentials(), entries, getLock());
+    MetadataTable.LogEntry entry = new MetadataTable.LogEntry();
+    entry.extent = extent;
+    entry.tabletId = id;
+    entry.timestamp = now;
+    entry.server = logs.get(0).getLogger();
+    entry.filename = logs.get(0).getFileName();
+    entry.logSet = logSet;
+    MetadataTable.addLogEntry(SecurityConstants.getSystemCredentials(), entry, getLock());
   }
-  
+
   private int startServer(AccumuloConfiguration conf, Property portHint, TProcessor processor, String threadName) throws UnknownHostException {
     ServerPort sp = TServerUtils.startServer(conf, portHint, processor, this.getClass().getSimpleName(), threadName, Property.TSERV_PORTSEARCH,
         Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK);
@@ -2681,6 +2707,12 @@ public class TabletServer extends Abstra
     }
     clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
     announceExistence();
+    try {
+      logSorter.startWatchingForRecoveryLogs(getClientAddressString());
+    } catch (Exception ex) {
+      log.error("Error setting watches for recoveries");
+      throw new RuntimeException(ex);
+    }
     
     try {
       OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
@@ -2991,31 +3023,6 @@ public class TabletServer extends Abstra
     majorCompactorThread = new Daemon(new LoggingRunnable(log, new MajorCompactor()));
     majorCompactorThread.setName("Split/MajC initiator");
     majorCompactorThread.start();
-    
-    String className = getSystemConfiguration().get(Property.TSERV_LOGGER_STRATEGY);
-    Class<? extends LoggerStrategy> klass = DEFAULT_LOGGER_STRATEGY;
-    try {
-      klass = AccumuloClassLoader.loadClass(className, LoggerStrategy.class);
-    } catch (Exception ex) {
-      log.warn("Unable to load class " + className + " for logger strategy, using " + klass.getName(), ex);
-    }
-    try {
-      Constructor<? extends LoggerStrategy> constructor = klass.getConstructor(TabletServer.class);
-      loggerStrategy = constructor.newInstance(this);
-      loggerStrategy.init(serverConfig);
-    } catch (Exception ex) {
-      log.warn("Unable to create object of type " + klass.getName() + " using " + DEFAULT_LOGGER_STRATEGY.getName());
-    }
-    if (loggerStrategy == null) {
-      try {
-        loggerStrategy = DEFAULT_LOGGER_STRATEGY.getConstructor(TabletServer.class).newInstance(this);
-      } catch (Exception ex) {
-        log.fatal("Programmer error: cannot create a logger strategy.");
-        throw new RuntimeException(ex);
-      }
-    }
-    cache = new ZooCache();
-    
   }
   
   public TabletServerStatus getStats(Map<String,MapCounter<ScanRunState>> scanCounts) {
@@ -3096,12 +3103,11 @@ public class TabletServer extends Abstra
     result.name = getClientAddressString();
     result.holdTime = resourceManager.holdTime();
     result.lookups = seekCount.get();
-    result.loggers = new HashSet<String>();
     result.indexCacheHits = resourceManager.getIndexCache().getStats().getHitCount();
     result.indexCacheRequest = resourceManager.getIndexCache().getStats().getRequestCount();
     result.dataCacheHits = resourceManager.getDataCache().getStats().getHitCount();
     result.dataCacheRequest = resourceManager.getDataCache().getStats().getRequestCount();
-    logger.getLoggers(result.loggers);
+    result.logSorts = logSorter.getLogSorts();
     return result;
   }
   
@@ -3113,6 +3119,7 @@ public class TabletServer extends Abstra
       Instance instance = HdfsZooInstance.getInstance();
       ServerConfiguration conf = new ServerConfiguration(instance);
       Accumulo.init(fs, conf, "tserver");
+      // recoverLocalWriteAheadLogs(fs, conf);
       TabletServer server = new TabletServer(conf, fs);
       server.config(hostname);
       Accumulo.enableTracing(hostname, "tserver");
@@ -3121,7 +3128,47 @@ public class TabletServer extends Abstra
       log.error("Uncaught exception in TabletServer.main, exiting", ex);
     }
   }
-  
+
+  /**
+   * Copy local walogs into HDFS on an upgrade
+   * 
+   */
+  public static void recoverLocalWriteAheadLogs(FileSystem fs, ServerConfiguration serverConf) throws IOException {
+    FileSystem localfs = FileSystem.getLocal(fs.getConf()).getRawFileSystem();
+    AccumuloConfiguration conf = serverConf.getConfiguration();
+    String localWalDirectories = conf.get(Property.LOGGER_DIR);
+    for (String localWalDirectory : localWalDirectories.split(",")) {
+      for (FileStatus file : localfs.listStatus(new Path(localWalDirectory))) {
+        String name = file.getPath().getName();
+        try {
+          UUID.fromString(name);
+        } catch (IllegalArgumentException ex) {
+          log.info("Ignoring non-log file " + name + " in " + localWalDirectory);
+          continue;
+        }
+        LogFileKey key = new LogFileKey();
+        LogFileValue value = new LogFileValue();
+        log.info("Openning local log " + file.getPath());
+        Reader reader = new SequenceFile.Reader(localfs, file.getPath(), localfs.getConf());
+        Path tmp = new Path(Constants.getWalDirectory(conf) + "/" + name + ".copy");
+        FSDataOutputStream writer = fs.create(tmp);
+        while (reader.next(key, value)) {
+          try {
+            key.write(writer);
+            value.write(writer);
+          } catch (EOFException ex) {
+            break;
+          }
+        }
+        writer.close();
+        reader.close();
+        fs.rename(tmp, new Path(tmp.getParent(), name));
+        log.info("Copied local log " + name);
+        localfs.delete(new Path(localWalDirectory, name), true);
+      }
+    }
+  }
+
   public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
     totalMinorCompactions++;
     logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
@@ -3144,7 +3191,7 @@ public class TabletServer extends Abstra
       String recovery = null;
       for (String log : entry.logSet) {
         String[] parts = log.split("/"); // "host:port/filename"
-        log = ServerConstants.getRecoveryDir() + "/" + parts[1] + ".recovered";
+        log = ServerConstants.getRecoveryDir() + "/" + parts[1];
         Path finished = new Path(log + "/finished");
         TabletServer.log.info("Looking for " + finished);
         if (fs.exists(finished)) {
@@ -3337,11 +3384,27 @@ public class TabletServer extends Abstra
     return METRICS_PREFIX;
   }
   
-  // public AccumuloConfiguration getTableConfiguration(String tableId) {
-  // return ServerConfiguration.getTableConfiguration(instance, tableId);
-  // }
-
   public TableConfiguration getTableConfiguration(KeyExtent extent) {
     return ServerConfiguration.getTableConfiguration(instance, extent.getTableId().toString());
   }
+  public DfsLogger.ServerResources getServerConfig() {
+    return new DfsLogger.ServerResources() {
+      
+      @Override
+      public FileSystem getFileSystem() {
+        return fs;
+      }
+      
+      @Override
+      public Set<TServerInstance> getCurrentTServers() {
+        return null;
+      }
+      
+      @Override
+      public AccumuloConfiguration getConfiguration() {
+        return getSystemConfiguration();
+      }
+    };
+  }
+
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Tue Jun  5 13:18:22 2012
@@ -45,12 +45,12 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
 import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
-import org.apache.accumulo.server.util.NamingThreadFactory;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
 import org.apache.hadoop.fs.FileSystem;

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1346380&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Tue Jun  5 13:18:22 2012
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.log;
+
+import static org.apache.accumulo.server.logger.LogEvents.COMPACTION_FINISH;
+import static org.apache.accumulo.server.logger.LogEvents.COMPACTION_START;
+import static org.apache.accumulo.server.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.server.logger.LogEvents.MANY_MUTATIONS;
+import static org.apache.accumulo.server.logger.LogEvents.OPEN;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Wrap a connection to a logger.
+ * 
+ */
+public class DfsLogger {
+  private static Logger log = Logger.getLogger(DfsLogger.class);
+  
+  public static class LogClosedException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public LogClosedException() {
+      super("LogClosed");
+    }
+  }
+
+  public interface ServerResources {
+    AccumuloConfiguration getConfiguration();
+    
+    FileSystem getFileSystem();
+    
+    Set<TServerInstance> getCurrentTServers();
+  }
+
+  private LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<DfsLogger.LogWork>();
+  
+  private String closeLock = new String("foo");
+  
+  private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, null);
+  
+  private static final LogFileValue EMPTY = new LogFileValue();
+  
+  private boolean closed = false;
+
+  private class LogSyncingTask implements Runnable {
+
+    @Override
+    public void run() {
+      ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
+      while (true) {
+        work.clear();
+        
+        try {
+          work.add(workQueue.take());
+        } catch (InterruptedException ex) {
+          continue;
+        }
+        workQueue.drainTo(work);
+        
+        synchronized (closeLock) {
+          if (!closed) {
+            try {
+              logFile.sync();
+            } catch (IOException ex) {
+              log.warn("Exception syncing " + ex);
+              for (DfsLogger.LogWork logWork : work) {
+                logWork.exception = ex;
+              }
+            }
+          } else {
+            for (DfsLogger.LogWork logWork : work) {
+              logWork.exception = new LogClosedException();
+            }
+          }
+        }
+        
+        boolean sawClosedMarker = false;
+        for (DfsLogger.LogWork logWork : work)
+          if (logWork == CLOSED_MARKER)
+            sawClosedMarker = true;
+          else
+            logWork.latch.countDown();
+        
+        if (sawClosedMarker) {
+          synchronized (closeLock) {
+            closeLock.notifyAll();
+          }
+          break;
+        }
+      }
+    }
+  }
+
+  static class LogWork {
+    List<TabletMutations> mutations;
+    CountDownLatch latch;
+    volatile Exception exception;
+    
+    public LogWork(List<TabletMutations> mutations, CountDownLatch latch) {
+      this.mutations = mutations;
+      this.latch = latch;
+    }
+  }
+
+  public static class LoggerOperation {
+    private LogWork work;
+    
+    public LoggerOperation(LogWork work) {
+      this.work = work;
+    }
+    
+    public void await() throws IOException {
+      try {
+        work.latch.await();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      
+      if (work.exception != null) {
+        if (work.exception instanceof IOException)
+          throw (IOException) work.exception;
+        else if (work.exception instanceof RuntimeException)
+          throw (RuntimeException) work.exception;
+        else
+          throw new RuntimeException(work.exception);
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#equals(java.lang.Object)
+   */
+  @Override
+  public boolean equals(Object obj) {
+    // filename is unique
+    if (obj == null)
+      return false;
+    if (obj instanceof DfsLogger)
+      return getFileName().equals(((DfsLogger) obj).getFileName());
+    return false;
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#hashCode()
+   */
+  @Override
+  public int hashCode() {
+    // filename is unique
+    return getFileName().hashCode();
+  }
+  
+  private ServerResources conf;
+  private FSDataOutputStream logFile;
+  private Path logPath;
+  private String logger;
+  
+  public DfsLogger(ServerResources conf) throws IOException {
+    this.conf = conf;
+  }
+  
+  public DfsLogger(ServerResources conf, String logger, String filename) throws IOException {
+    this.conf = conf;
+    this.logger = logger;
+    this.logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename);
+  }
+
+  public synchronized void open(String address) throws IOException {
+    String filename = UUID.randomUUID().toString();
+    logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
+
+    logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()) + "/" + logger + "/" + filename);
+    try {
+      FileSystem fs = conf.getFileSystem();
+      short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
+      if (replication == 0)
+        replication = (short) fs.getDefaultReplication();
+      long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
+      if (blockSize == 0)
+        blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
+      int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
+      blockSize -= blockSize % checkSum;
+      blockSize = Math.max(blockSize, checkSum);
+      logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+      LogFileKey key = new LogFileKey();
+      key.event = OPEN;
+      key.tserverSession = filename;
+      key.filename = filename;
+      write(key, EMPTY);
+      log.debug("Got new write-ahead log: " + this);
+    } catch (IOException ex) {
+      if (logFile != null)
+        logFile.close();
+      logFile = null;
+      throw ex;
+    }
+    
+    Thread t = new Daemon(new LogSyncingTask());
+    t.setName("Accumulo WALog thread " + toString());
+    t.start();
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#toString()
+   */
+  @Override
+  public String toString() {
+    return getLogger() + "/" + getFileName();
+  }
+  
+  public String getLogger() {
+    return logger;
+  }
+  
+  public String getFileName() {
+    return logPath.getName();
+  }
+  
+  public void close() throws IOException {
+    
+    synchronized (closeLock) {
+      if (closed)
+        return;
+      // after closed is set to true, nothing else should be added to the queue
+      // CLOSED_MARKER should be the last thing on the queue, therefore when the
+      // background thread sees the marker and exits there should be nothing else
+      // to process... so nothing should be left waiting for the background
+      // thread to do work
+      closed = true;
+      workQueue.add(CLOSED_MARKER);
+      while (!workQueue.isEmpty())
+        try {
+          closeLock.wait();
+        } catch (InterruptedException e) {
+          log.info("Interrupted");
+        }
+    }
+
+    if (logFile != null)
+      try {
+        logFile.close();
+      } catch (IOException ex) {
+        log.error(ex);
+        throw new LogClosedException();
+      }
+  }
+  
+  public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException {
+    // write this log to the METADATA table
+    final LogFileKey key = new LogFileKey();
+    key.event = DEFINE_TABLET;
+    key.seq = seq;
+    key.tid = tid;
+    key.tablet = tablet;
+    try {
+      write(key, EMPTY);
+      logFile.sync();
+    } catch (IOException ex) {
+      log.error(ex);
+      throw ex;
+    }
+  }
+  
+  /**
+   * @param key
+   * @param empty2
+   * @throws IOException
+   */
+  private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
+    key.write(logFile);
+    value.write(logFile);
+  }
+
+  public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
+    return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation.toThrift()))));
+  }
+  
+  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
+    DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new CountDownLatch(1));
+    
+    synchronized (DfsLogger.this) {
+      try {
+        for (TabletMutations mutation : mutations) {
+          LogFileKey key = new LogFileKey();
+          key.event = MANY_MUTATIONS;
+          key.seq = mutation.seq;
+          key.tid = mutation.tabletID;
+          LogFileValue value = new LogFileValue();
+          Mutation[] m = new Mutation[mutation.mutations.size()];
+          for (int i = 0; i < m.length; i++)
+            m[i] = new Mutation(mutation.mutations.get(i));
+          value.mutations = m;
+          write(key, value);
+        }
+      } catch (Exception e) {
+        log.error(e, e);
+        work.exception = e;
+      }
+    }
+
+    synchronized (closeLock) {
+      // use a different lock for close check so that adding to work queue does not need
+      // to wait on walog I/O operations
+
+      if (closed)
+        throw new LogClosedException();
+      workQueue.add(work);
+    }
+
+    return new LoggerOperation(work);
+  }
+  
+  public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
+    LogFileKey key = new LogFileKey();
+    key.event = COMPACTION_FINISH;
+    key.seq = seq;
+    key.tid = tid;
+    try {
+      write(key, EMPTY);
+    } catch (IOException ex) {
+      log.error(ex);
+      throw ex;
+    }
+  }
+  
+  public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
+    LogFileKey key = new LogFileKey();
+    key.event = COMPACTION_START;
+    key.seq = seq;
+    key.tid = tid;
+    key.filename = fqfn;
+    try {
+      write(key, EMPTY);
+    } catch (IOException ex) {
+      log.error(ex);
+      throw ex;
+    }
+  }
+  
+}

Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1346380&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Tue Jun  5 13:18:22 2012
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.log;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TimerTask;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.RecoveryStatus;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ * 
+ */
+public class LogSorter {
+  
+  private static final Logger log = Logger.getLogger(LogSorter.class);
+  FileSystem fs;
+  AccumuloConfiguration conf;
+  
+  private Map<String,Work> currentWork = new HashMap<String,Work>();
+
+  class Work implements Runnable {
+    final String name;
+    FSDataInputStream input;
+    final String destPath;
+    long bytesCopied = -1;
+    long sortStart = 0;
+    long sortStop = -1;
+    private final LogSortNotifier cback;
+    
+    synchronized long getBytesCopied() throws IOException {
+      return input == null ? bytesCopied : input.getPos();
+    }
+    
+    Work(String name, FSDataInputStream input, String destPath, LogSortNotifier cback) {
+      this.name = name;
+      this.input = input;
+      this.destPath = destPath;
+      this.cback = cback;
+    }
+    synchronized boolean finished() {
+      return input == null;
+    }
+    public void run() {
+      sortStart = System.currentTimeMillis();
+      String formerThreadName = Thread.currentThread().getName();
+      int part = 0;
+      try {
+        final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
+        Thread.currentThread().setName("Sorting " + name + " for recovery");
+        while (true) {
+          final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new ArrayList<Pair<LogFileKey, LogFileValue>>();
+          try {
+            long start = input.getPos();
+            while (input.getPos() - start < bufferSize) {
+              LogFileKey key = new LogFileKey();
+              LogFileValue value = new LogFileValue();
+              key.readFields(input);
+              value.readFields(input);
+              buffer.add(new Pair<LogFileKey, LogFileValue>(key, value));
+            }
+            writeBuffer(buffer, part++);
+            buffer.clear();
+          } catch (EOFException ex) {
+            writeBuffer(buffer, part++);
+            break;
+          }
+        }
+        fs.create(new Path(destPath, "finished")).close();
+        log.info("Log copy/sort of " + name + " complete");
+      } catch (Throwable t) {
+        try {
+          fs.create(new Path(destPath, "failed")).close();
+        } catch (IOException e) {
+          log.error("Error creating failed flag file " + name, e);
+        }
+        log.error(t, t);
+        try {
+          cback.notice(name, getBytesCopied(), part, getSortTime(), t.toString());
+        } catch (Exception ex) {
+          log.error("Strange error notifying the master of a logSort problem for file " + name);
+        }
+      } finally {
+        Thread.currentThread().setName(formerThreadName);
+        try {
+          close();
+        } catch (IOException e) {
+          log.error("Error during cleanup sort/copy " + name, e);
+        }
+        sortStop = System.currentTimeMillis();
+        synchronized (currentWork) {
+          currentWork.remove(name);
+        }
+        try {
+          cback.notice(name, getBytesCopied(), part, getSortTime(), "");
+        } catch (Exception ex) {
+          log.error("Strange error reporting successful log sort " + name, ex);
+        }
+      }
+    }
+    
+    private void writeBuffer(ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+      String path = destPath + String.format("/part-r-%05d", part++);
+      MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
+      try {
+        Collections.sort(buffer, new Comparator<Pair<LogFileKey,LogFileValue>>() {
+          @Override
+          public int compare(Pair<LogFileKey,LogFileValue> o1, Pair<LogFileKey,LogFileValue> o2) {
+            return o1.getFirst().compareTo(o2.getFirst());
+          }
+        });
+        for (Pair<LogFileKey,LogFileValue> entry : buffer) {
+          output.append(entry.getFirst(), entry.getSecond());
+        }
+      } finally {
+        output.close();
+      }
+    }
+    
+    synchronized void close() throws IOException {
+      bytesCopied = input.getPos();
+      input.close();
+      input = null;
+    }
+    
+    public synchronized long getSortTime() {
+      if (sortStart > 0) {
+        if (sortStop > 0)
+          return sortStop - sortStart;
+        return System.currentTimeMillis() - sortStart;
+      }
+      return 0;
+    }
+  };
+  
+  final ThreadPoolExecutor threadPool;
+  private Instance instance;
+  
+  public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) {
+    this.instance = instance;
+    this.fs = fs;
+    this.conf = conf;
+    int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
+    this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
+  }
+  
+  public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException {
+    final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY;
+    final ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    zoo.mkdirs(path);
+    List<String> children = zoo.getChildren(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        switch (event.getType()) {
+          case NodeChildrenChanged:
+            if (event.getPath().equals(path))
+              try {
+                attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
+              } catch (KeeperException e) {
+                log.error("Unable to get recovery information", e);
+              } catch (InterruptedException e) {
+                log.info("Interrupted getting recovery information", e);
+              }
+            else
+              log.info("Unexpected path for NodeChildrenChanged event " + event.getPath());
+            break;
+          case NodeCreated:
+          case NodeDataChanged:
+          case NodeDeleted:
+          case None:
+            log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
+            break;
+          
+        }
+      }
+    });
+    attemptRecoveries(zoo, serverName, path, children);
+    Random r = new Random();
+    // Add a little jitter to avoid all the tservers slamming zookeeper at once
+    SimpleTimer.getInstance().schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
+        } catch (KeeperException e) {
+          log.error("Unable to get recovery information", e);
+        } catch (InterruptedException e) {
+          log.info("Interrupted getting recovery information", e);
+        }        
+      }
+    }, r.nextInt(1000), 60 * 1000);
+  }
+  
+  private void attemptRecoveries(final ZooReaderWriter zoo, final String serverName, String path, List<String> children) {
+    if (children.size() == 0)
+      return;
+    log.info("Zookeeper references " + children.size() + " recoveries, attempting locks");
+    Random random = new Random();
+    Collections.shuffle(children, random);
+    try {
+      for (String child : children) {
+        final String childPath = path + "/" + child;
+        log.debug("Attempting to lock " + child);
+        ZooLock lock = new ZooLock(childPath);
+        if (lock.tryLock(new LockWatcher() {
+          @Override
+          public void lostLock(LockLossReason reason) {
+            log.info("Ignoring lost lock event, reason " + reason);
+          }
+        }, serverName.getBytes())) {
+          // Great... we got the lock, but maybe we're too busy
+          if (threadPool.getQueue().size() > 1) {
+            lock.unlock();
+            log.debug("got the lock, but thread pool is busy; released the lock on " + child);
+            continue;
+          }
+          log.info("got lock for " + child);
+          byte[] contents = zoo.getData(childPath, null);
+          String destination = Constants.getRecoveryDir(conf) + "/" + child;
+          startSort(new String(contents), destination, new LogSortNotifier() {
+            @Override
+            public void notice(String name, long bytes, int parts, long milliseconds, String error) {
+              log.info("Finished log sort " + name + " " + bytes + " bytes " + parts + " parts in " + milliseconds + "ms");
+              try {
+                zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
+              } catch (Exception e) {
+                log.error("Error received when trying to delete recovery entry in zookeeper " + childPath);
+              }
+            }
+          });
+        } else {
+          log.info("failed to get the lock " + child);
+        }
+      }
+    } catch (Throwable t) {
+      log.error("Unexpected error", t);
+    }
+  }
+
+  public interface LogSortNotifier {
+    public void notice(String name, long bytes, int parts, long milliseconds, String error);
+  }
+
+  private void startSort(String src, String dest, LogSortNotifier cback) throws IOException {
+    log.info("Copying " + src + " to " + dest);
+    fs.delete(new Path(dest), true);
+    Path srcPath = new Path(src);
+    synchronized (currentWork) {
+      Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback);
+      if (!currentWork.containsKey(srcPath.getName())) {
+        threadPool.execute(work);
+        currentWork.put(srcPath.getName(), work);
+      }
+    }
+  }
+  
+  public List<RecoveryStatus> getLogSorts() {
+    List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
+    synchronized (currentWork) {
+      for (Entry<String,Work> entries : currentWork.entrySet()) {
+        RecoveryStatus status = new RecoveryStatus();
+        status.name = entries.getKey();
+        try {
+          status.progress = entries.getValue().getBytesCopied() / (0.0 + conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE));
+        } catch (IOException ex) {
+          log.warn("Error getting bytes read");
+        }
+        status.runtime = (int) entries.getValue().getSortTime();
+        result.add(status);
+      }
+      return result;
+    }
+  }
+}

Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Tue Jun  5 13:18:22 2012
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -35,13 +34,12 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.thrift.TMutation;
-import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.tabletserver.Tablet;
 import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
 import org.apache.accumulo.server.tabletserver.TabletServer;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LoggerOperation;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger.LoggerOperation;
 import org.apache.log4j.Logger;
 
 /**
@@ -61,7 +59,7 @@ public class TabletServerLogger {
   private final TabletServer tserver;
   
   // The current log set: always updated to a new set with every change of loggers
-  private final List<RemoteLogger> loggers = new ArrayList<RemoteLogger>();
+  private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
   
   // The current generation of logSet.
   // Because multiple threads can be using a log set at one time, a log
@@ -134,7 +132,7 @@ public class TabletServerLogger {
     this.maxSize = maxSize;
   }
   
-  private int initializeLoggers(final List<RemoteLogger> copy) throws IOException {
+  private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
     final int[] result = {-1};
     testLockAndRun(logSetLock, new TestCallWithWriteLock() {
       boolean test() {
@@ -165,8 +163,8 @@ public class TabletServerLogger {
   public void getLoggers(Set<String> loggersOut) {
     logSetLock.readLock().lock();
     try {
-      for (RemoteLogger logger : loggers) {
-        loggersOut.add(logger.getLogger());
+      for (DfsLogger logger : loggers) {
+        loggersOut.add(logger.toString());
       }
     } finally {
       logSetLock.readLock().unlock();
@@ -183,35 +181,13 @@ public class TabletServerLogger {
     }
     
     try {
-      while (true) {
-        Set<String> loggerAddresses = tserver.getLoggers();
-        if (!loggerAddresses.isEmpty()) {
-          for (String logger : loggerAddresses) {
-            try {
-              loggers.add(new RemoteLogger(logger, UUID.randomUUID(), tserver.getSystemConfiguration()));
-            } catch (LoggerClosedException t) {
-              close();
-              break;
-            } catch (Exception t) {
-              close();
-              log.warn("Unable to connect to " + logger + ": " + t);
-              break;
-            }
-          }
-          
-          if (loggers.size() == loggerAddresses.size())
-            break;
-          if (loggers.size() > 0) {
-            // something is screwy, loggers.size() should be 0 or loggerAddresses.size()..
-            throw new RuntimeException("Unexpected number of loggers " + loggers.size() + " " + loggerAddresses.size());
-          }
-        }
-        UtilWaitThread.sleep(1000);
-      }
+      DfsLogger alog = new DfsLogger(tserver.getServerConfig());
+      alog.open(tserver.getClientAddressString());
+      loggers.add(alog);
       logSetId.incrementAndGet();
       return;
     } catch (Exception t) {
-      throw new IOException(t);
+      throw new RuntimeException(t);
     }
   }
   
@@ -229,13 +205,13 @@ public class TabletServerLogger {
       throw new IllegalStateException("close should be called with write lock held!");
     }
     try {
-      for (RemoteLogger logger : loggers) {
+      for (DfsLogger logger : loggers) {
         try {
           logger.close();
-        } catch (LoggerClosedException ex) {
-          // expected
+        } catch (DfsLogger.LogClosedException ex) {
+          // ignore
         } catch (Throwable ex) {
-          log.error("Unable to cleanly close logger " + logger.getLogger() + ": " + ex);
+          log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex);
         }
       }
       loggers.clear();
@@ -246,7 +222,7 @@ public class TabletServerLogger {
   }
   
   interface Writer {
-    LoggerOperation write(RemoteLogger logger, int seq) throws Exception;
+    LoggerOperation write(DfsLogger logger, int seq) throws Exception;
   }
   
   private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
@@ -265,7 +241,7 @@ public class TabletServerLogger {
     while (!success) {
       try {
         // get a reference to the loggers that no other thread can touch
-        ArrayList<RemoteLogger> copy = new ArrayList<RemoteLogger>();
+        ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
         currentLogSet = initializeLoggers(copy);
         
         // add the logger to the log set for the memory in the tablet,
@@ -294,7 +270,7 @@ public class TabletServerLogger {
           if (seq < 0)
             throw new RuntimeException("Logger sequence generator wrapped!  Onos!!!11!eleven");
           ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
-          for (RemoteLogger wal : copy) {
+          for (DfsLogger wal : copy) {
             LoggerOperation lop = writer.write(wal, seq);
             if (lop != null)
               queuedOperations.add(lop);
@@ -307,13 +283,11 @@ public class TabletServerLogger {
           // double-check: did the log set change?
           success = (currentLogSet == logSetId.get());
         }
+      } catch (DfsLogger.LogClosedException ex) {
+        log.debug("Logs closed while writing, retrying " + (attempt + 1));
       } catch (Exception t) {
-        if (attempt == 0) {
-          log.info("Log write failed: another thread probably closed the log");
-        } else {
-          log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
-          UtilWaitThread.sleep(100);
-        }
+        log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
+        UtilWaitThread.sleep(100);
       } finally {
         attempt++;
       }
@@ -356,7 +330,7 @@ public class TabletServerLogger {
       return -1;
     return write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
         return null;
       }
@@ -368,7 +342,7 @@ public class TabletServerLogger {
       return -1;
     int seq = write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         return logger.log(tabletSeq, commitSession.getLogId(), m);
       }
     });
@@ -388,7 +362,7 @@ public class TabletServerLogger {
     
     int seq = write(loggables.keySet(), false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
         for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
           CommitSession cs = entry.getKey();
@@ -419,7 +393,7 @@ public class TabletServerLogger {
     
     int seq = write(commitSession, true, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
         return null;
       }
@@ -435,7 +409,7 @@ public class TabletServerLogger {
       return -1;
     write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
         return null;
       }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java Tue Jun  5 13:18:22 2012
@@ -90,9 +90,6 @@ public class GetMasterStats {
         out(2, "Time Difference %.1f", ((now - server.lastContact) / 1000.));
         out(2, "Total Records %d", summary.recs);
         out(2, "Lookups %d", server.lookups);
-        out(2, "Loggers %d", server.loggers.size());
-        for (String logger : server.loggers)
-          out(3, "Logger %s", logger);
         if (server.holdTime > 0)
           out(2, "Hold Time %d", server.holdTime);
         if (server.tableMap != null && server.tableMap.size() > 0) {
@@ -111,16 +108,12 @@ public class GetMasterStats {
             out(4, "Queued for Minor Compaction %d", info.minor == null ? 0 : info.minor.queued);
           }
         }
-      }
-    }
-    if (stats.recovery != null && stats.recovery.size() > 0) {
-      out(0, "Recovery");
-      for (RecoveryStatus r : stats.recovery) {
-        out(1, "Log Server %s", r.host);
-        out(1, "Log Name %s", r.name);
-        out(1, "Map Progress: %.2f%%", r.mapProgress * 100);
-        out(1, "Reduce Progress: %.2f%%", r.reduceProgress * 100);
-        out(1, "Time running: %s", r.runtime / 1000.);
+        out(2, "Recoveries %d", server.logSorts.size());
+        for (RecoveryStatus sort : server.logSorts) {
+          out(3, "File %s", sort.name);
+          out(3, "Progress %.2f%%", sort.progress * 100);
+          out(3, "Time running %s", sort.runtime / 1000.);
+        }
       }
     }
   }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java Tue Jun  5 13:18:22 2012
@@ -22,7 +22,6 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.accumulo.server.logger.IdentityReducer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -135,7 +134,7 @@ public class RunTests extends Configured
     job.setOutputValueClass(Text.class);
     
     // don't do anything with the results (yet) a summary would be nice
-    job.setReducerClass(IdentityReducer.class);
+    job.setNumReduceTasks(0);
     
     // submit the job
     log.info("Starting tests");

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java Tue Jun  5 13:18:22 2012
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.accumulo.cloudtrace.thrift.TInfo;
 import org.apache.accumulo.core.client.Instance;
@@ -164,9 +163,6 @@ public class NullTserver {
     public void unloadTablet(TInfo tinfo, AuthInfo credentials, String lock, TKeyExtent extent, boolean save) throws TException {}
     
     @Override
-    public void useLoggers(TInfo tinfo, AuthInfo credentials, Set<String> loggers) throws TException {}
-    
-    @Override
     public List<ActiveScan> getActiveScans(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException {
       return new ArrayList<ActiveScan>();
     }
@@ -188,6 +184,16 @@ public class NullTserver {
     public void flush(TInfo tinfo, AuthInfo credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
       
     }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.cloudtrace.thrift.TInfo,
+     * org.apache.accumulo.core.security.thrift.AuthInfo, java.util.List)
+     */
+    @Override
+    public void removeLogs(TInfo tinfo, AuthInfo credentials, List<String> filenames) throws TException {
+    }
   }
   
   public static void main(String[] args) throws Exception {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java Tue Jun  5 13:18:22 2012
@@ -19,11 +19,7 @@ package org.apache.accumulo.server.test.
 import java.net.InetAddress;
 import java.util.Properties;
 import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.TableExistsException;
@@ -31,16 +27,14 @@ import org.apache.accumulo.core.client.a
 import org.apache.accumulo.core.iterators.LongCombiner;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.server.test.randomwalk.State;
 import org.apache.accumulo.server.test.randomwalk.Test;
 import org.apache.hadoop.fs.FileSystem;
 
 public class Setup extends Test {
   
-  private static final int CORE_POOL_SIZE = 8;
-  private static final int MAX_POOL_SIZE = CORE_POOL_SIZE;
+  private static final int MAX_POOL_SIZE = 8;
   static String tableName = null;
   
   @Override
@@ -67,14 +61,7 @@ public class Setup extends Test {
     state.set("fs", FileSystem.get(CachedConfiguration.getInstance()));
     BulkPlusOne.counter.set(0l);
     
-    BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
-    ThreadFactory factory = new ThreadFactory() {
-      @Override
-      public Thread newThread(Runnable r) {
-        return new Daemon(new LoggingRunnable(log, r));
-      }
-    };
-    ThreadPoolExecutor e = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 1, TimeUnit.SECONDS, q, factory);
+    ThreadPoolExecutor e = new SimpleThreadPool(MAX_POOL_SIZE, "bulkImportPool");
     state.set("pool", e);
   }
   

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java Tue Jun  5 13:18:22 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.util.Progressab
 // If FileSystem was an interface, we could use a Proxy, but it's not, so we have to override everything manually
 
 public class TraceFileSystem extends FileSystem {
+
   @Override
   public void setConf(Configuration conf) {
     Span span = Trace.start("setConf");
@@ -667,6 +668,10 @@ public class TraceFileSystem extends Fil
     this.impl = impl;
   }
   
+  public FileSystem getImplementation() {
+    return impl;
+  }
+
   @Override
   public URI getUri() {
     Span span = Trace.start("getUri");

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java Tue Jun  5 13:18:22 2012
@@ -23,7 +23,7 @@ import org.apache.accumulo.server.conf.S
 
 public class AddressUtil {
   static public InetSocketAddress parseAddress(String address, Property portDefaultProperty) {
-    final int dfaultPort = ServerConfiguration.getDefaultConfiguration().getPort(Property.TSERV_CLIENTPORT);
+    final int dfaultPort = ServerConfiguration.getDefaultConfiguration().getPort(portDefaultProperty);
     return org.apache.accumulo.core.util.AddressUtil.parseAddress(address, dfaultPort);
   }
   

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java Tue Jun  5 13:18:22 2012
@@ -353,7 +353,6 @@ public class Initialize {
     zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, new byte[0], NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZROOT_TABLET, new byte[0], NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZROOT_TABLET_WALOGS, new byte[0], NodeExistsPolicy.FAIL);
-    zoo.putPersistentData(zkInstanceRoot + Constants.ZLOGGERS, new byte[0], NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, new byte[0], NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, new byte[0], NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, new byte[0], NodeExistsPolicy.FAIL);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Tue Jun  5 13:18:22 2012
@@ -35,7 +35,6 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
-import java.util.TreeSet;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -67,7 +66,6 @@ import org.apache.accumulo.core.util.Cac
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.StringUtil;
-import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -769,11 +767,7 @@ public class MetadataTable extends org.a
     return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZROOT_TABLET_WALOGS;
   }
   
-  public static void addLogEntries(AuthInfo credentials, List<LogEntry> entries, ZooLock zooLock) {
-    if (entries.size() == 0)
-      return;
-    // entries should be a complete log set, so we should only need to write the first entry
-    LogEntry entry = entries.get(0);
+  public static void addLogEntry(AuthInfo credentials, LogEntry entry, ZooLock zooLock) {
     if (entry.extent.isRootTablet()) {
       String root = getZookeeperLogLocation();
       while (true) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java Tue Jun  5 13:18:22 2012
@@ -25,18 +25,14 @@ import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
 import java.util.Random;
 import java.util.TimerTask;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.TBufferedSocket;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -205,7 +201,7 @@ public class TServerUtils {
     }
   }
   
-  public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, int numThreads,
+  public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads,
       long timeBetweenThreadChecks) throws TTransportException {
     TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
     THsHaServer.Args options = new THsHaServer.Args(transport);
@@ -214,21 +210,8 @@ public class TServerUtils {
     /*
      * Create our own very special thread pool.
      */
-    // 1. name the threads for client connections
-    ThreadFactory factory = new ThreadFactory() {
-      AtomicInteger threadId = new AtomicInteger();
-      
-      @Override
-      public Thread newThread(Runnable r) {
-        return new Thread(new LoggingRunnable(log, r), "ClientPool-" + threadId.getAndIncrement());
-      }
-    };
-    // 2. allow tasks to queue, potentially forever
-    final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
-    // 3. keep the number of threads small
-    final int minimumThreadPoolSize = numThreads;
-    final ThreadPoolExecutor pool = new ThreadPoolExecutor(minimumThreadPoolSize, minimumThreadPoolSize, 10L, TimeUnit.SECONDS, queue, factory);
-    // 4. periodically adjust the number of threads we need by checking how busy our threads are
+    final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+    // periodically adjust the number of threads we need by checking how busy our threads are
     SimpleTimer.getInstance().schedule(new TimerTask() {
       @Override
       public void run() {
@@ -239,7 +222,7 @@ public class TServerUtils {
           pool.setCorePoolSize(larger);
         } else {
           if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
-            int smaller = Math.max(minimumThreadPoolSize, pool.getCorePoolSize() - 1);
+            int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
             if (smaller != pool.getCorePoolSize()) {
               // there is a race condition here... the active count could be higher by the time
               // we decrease the core pool size... so the active count could end up higher than

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooZap.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooZap.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooZap.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooZap.java Tue Jun  5 13:18:22 2012
@@ -41,7 +41,6 @@ public class ZooZap {
     
     boolean zapMaster = false;
     boolean zapTservers = false;
-    boolean zapLoggers = false;
     boolean zapTracers = false;
     
     if (args.length == 0) {
@@ -54,8 +53,6 @@ public class ZooZap {
         zapTservers = true;
       } else if (args[i].equals("-master")) {
         zapMaster = true;
-      } else if (args[i].equals("-loggers")) {
-        zapLoggers = true;
       } else if (args[i].equals("-tracers")) {
         zapTracers = true;
       } else if (args[i].equals("-verbose")) {
@@ -98,24 +95,19 @@ public class ZooZap {
       }
     }
     
-    if (zapLoggers) {
-      String loggersPath = Constants.ZROOT + "/" + iid + Constants.ZLOGGERS;
-      zapDirectory(zoo, loggersPath);
-    }
-    
     if (zapTracers) {
-      String loggersPath = Constants.ZROOT + "/" + iid + Constants.ZTRACERS;
-      zapDirectory(zoo, loggersPath);
+      String path = Constants.ZROOT + "/" + iid + Constants.ZTRACERS;
+      zapDirectory(zoo, path);
     }
     
   }
   
-  private static void zapDirectory(IZooReaderWriter zoo, String loggersPath) {
+  private static void zapDirectory(IZooReaderWriter zoo, String path) {
     try {
-      List<String> children = zoo.getChildren(loggersPath);
+      List<String> children = zoo.getChildren(path);
       for (String child : children) {
-        message("Deleting " + loggersPath + "/" + child + " from zookeeper");
-        zoo.recursiveDelete(loggersPath + "/" + child, NodeMissingPolicy.SKIP);
+        message("Deleting " + path + "/" + child + " from zookeeper");
+        zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP);
       }
     } catch (Exception e) {
       e.printStackTrace();

Modified: accumulo/trunk/start/src/main/java/org/apache/accumulo/start/Main.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/start/src/main/java/org/apache/accumulo/start/Main.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/start/src/main/java/org/apache/accumulo/start/Main.java (original)
+++ accumulo/trunk/start/src/main/java/org/apache/accumulo/start/Main.java Tue Jun  5 13:18:22 2012
@@ -52,8 +52,6 @@ public class Main {
         runTMP = AccumuloClassLoader.loadClass("org.apache.accumulo.server.gc.SimpleGarbageCollector");
       } else if (args[0].equals("monitor")) {
         runTMP = AccumuloClassLoader.loadClass("org.apache.accumulo.server.monitor.Monitor");
-      } else if (args[0].equals("logger")) {
-        runTMP = AccumuloClassLoader.loadClass("org.apache.accumulo.server.logger.LogService");
       } else if (args[0].equals("tracer")) {
         runTMP = AccumuloClassLoader.loadClass("org.apache.accumulo.server.trace.TraceServer");
       } else if (args[0].equals("classpath")) {

Modified: accumulo/trunk/test/system/continuous/agitator.pl
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/system/continuous/agitator.pl?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/test/system/continuous/agitator.pl (original)
+++ accumulo/trunk/test/system/continuous/agitator.pl Tue Jun  5 13:18:22 2012
@@ -74,27 +74,8 @@ while(1){
 		$t = strftime "%Y%m%d %H:%M:%S", localtime;
 	
 		$rn = rand(1);
-		$kill_tserver = 0;
-		$kill_logger = 0;
-		if($rn <.33){
-			$kill_tserver = 1;
-			$kill_logger = 1;
-		}elsif($rn < .66){
-			$kill_tserver = 1;
-			$kill_logger = 0;
-		}else{
-			$kill_tserver = 0;
-			$kill_logger = 1;
-		}
-	
-		print STDERR "$t Killing $server $kill_tserver $kill_logger\n";
-	        if($kill_tserver) {
-			system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" tserver KILL");
-		}
-
-	        if($kill_logger) {
-			system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" logger KILL");
-		}
+		print STDERR "$t Killing $server\n";
+		system("$ACCUMULO_HOME/bin/stop-server.sh $server \"accumulo-start.*.jar\" tserver KILL");
 	}
 
 	sleep($sleep2 * 60);



Mime
View raw message