accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1346380 [4/5] - in /accumulo/trunk: ./ bin/ core/src/main/java/org/apache/accumulo/core/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accum...
Date Tue, 05 Jun 2012 13:18:25 GMT
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java Tue Jun  5 13:18:22 2012
@@ -23,7 +23,9 @@ import org.apache.thrift.transport.TSock
 
 public class AddressUtil {
   static public InetSocketAddress parseAddress(String address, int defaultPort) throws NumberFormatException {
-    final String[] parts = address.split(":", 2);
+    String[] parts = address.split(":", 2);
+    if (address.contains("+"))
+      parts = address.split("\\+", 2);
     if (parts.length == 2) {
       if (parts[1].isEmpty())
         return new InetSocketAddress(parts[0], defaultPort);

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java?rev=1346380&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java Tue Jun  5 13:18:22 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.cloudtrace.instrument.TraceRunnable;
+import org.apache.log4j.Logger;
+
+public class NamingThreadFactory implements ThreadFactory {
+  private static final Logger log = Logger.getLogger(NamingThreadFactory.class);
+  
+  private AtomicInteger threadNum = new AtomicInteger(1);
+  private String name;
+  
+  public NamingThreadFactory(String name) {
+    this.name = name;
+  }
+  
+  public Thread newThread(Runnable r) {
+    return new Daemon(new LoggingRunnable(log, new TraceRunnable(r)), name + " " + threadNum.getAndIncrement());
+  }
+  
+}

Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java?rev=1346380&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java Tue Jun  5 13:18:22 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * 
+ */
+public class SimpleThreadPool extends ThreadPoolExecutor {
+  
+  public SimpleThreadPool(int max, final String name) {
+    super(0, Integer.MAX_VALUE, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(name));
+  }
+  
+}

Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java Tue Jun  5 13:18:22 2012
@@ -49,6 +49,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
 
+@SuppressWarnings("deprecation")
 public class SetIterCommand extends Command {
   
   private Option mincScopeOpt, majcScopeOpt, scanScopeOpt, nameOpt, priorityOpt;

Modified: accumulo/trunk/core/src/main/thrift/master.thrift
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/thrift/master.thrift?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/thrift/master.thrift (original)
+++ accumulo/trunk/core/src/main/thrift/master.thrift Tue Jun  5 13:18:22 2012
@@ -42,16 +42,9 @@ struct TableInfo {
 }
 
 struct RecoveryStatus {
-    1:string host
     2:string name
-    3:double mapProgress
-    4:double reduceProgress
     5:i32 runtime                   // in millis
-    6:double copyProgress
-}
-
-struct LoggerStatus {
-    1:string logger
+    6:double progress
 }
 
 struct TabletServerStatus {
@@ -61,11 +54,11 @@ struct TabletServerStatus {
     5:double osLoad
     7:i64 holdTime
     8:i64 lookups
-    9:set<string> loggers
     10:i64 indexCacheHits    
     11:i64 indexCacheRequest   
     12:i64 dataCacheHits   
-    13:i64 dataCacheRequest   
+    13:i64 dataCacheRequest
+    14:list<RecoveryStatus> logSorts
 }
 
 enum MasterState {
@@ -94,14 +87,11 @@ struct MasterMonitorInfo {
     1:map<string, TableInfo> tableMap
     2:list<TabletServerStatus> tServerInfo
     3:map<string, byte> badTServers
-    4:list<RecoveryStatus> recovery
-    5:list<LoggerStatus> loggers
     6:MasterState state
     8:MasterGoalState goalState
     7:i32 unassignedTablets
     9:set<string> serversShuttingDown
     10:list<DeadServer> deadTabletServers
-    11:list<DeadServer> deadLoggers
 }
 
 struct TabletSplit {

Modified: accumulo/trunk/core/src/main/thrift/tabletserver.thrift
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/thrift/tabletserver.thrift?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/thrift/tabletserver.thrift (original)
+++ accumulo/trunk/core/src/main/thrift/tabletserver.thrift Tue Jun  5 13:18:22 2012
@@ -148,8 +148,6 @@ service TabletClientService extends clie
   oneway void chop(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:string lock, 4:data.TKeyExtent extent),
   oneway void compact(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:string lock, 4:string tableId, 5:binary startRow, 6:binary endRow),
   
-  oneway void useLoggers(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:set<string> loggers),
-  
   master.TabletServerStatus getTabletServerStatus(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec)
   list<TabletStats> getTabletStats(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string tableId) throws (1:security.ThriftSecurityException sec)
   TabletStats getHistoricalStats(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec)
@@ -157,57 +155,13 @@ service TabletClientService extends clie
   oneway void fastHalt(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string lock);
   
   list<ActiveScan> getActiveScans(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec)
+  oneway void removeLogs(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:list<string> filenames)
 }
 
-// LogID should be cryptographically unguessable
-typedef i64 LogID
 typedef i32 TabletID
 
-exception NoSuchLogIDException {
-}
-
-exception LoggerClosedException {
-}
-
-struct LogFile {
-        1:string name,
-        2:LogID  id,
-}
-
 struct TabletMutations {
 	1:TabletID tabletID,
 	2:i64 seq,
 	3:list<data.TMutation> mutations
 }
-
-struct LogCopyInfo {
-	1:i64 fileSize,
-	2:string loggerZNode
-}
-
-service MutationLogger {
-    LogFile create(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string tserverSession) throws (1:security.ThriftSecurityException sec, 2:LoggerClosedException lce),
-
-    // Note that these methods correspond to org.apache.accumulo.server.tabletserver.log.TabletLog
-    void defineTablet(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:data.TKeyExtent tablet) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-    void log(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:data.TMutation mutation) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-    void logManyTablets(3:cloudtrace.TInfo tinfo, 1:LogID id, 2:list<TabletMutations> mutations) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-    void minorCompactionStarted(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:string fqfn) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-    void minorCompactionFinished(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:string fqfn) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-    void close(2:cloudtrace.TInfo tinfo, 1:LogID id) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-
-    // close a log file and initiate the push to HDFS
-    LogCopyInfo startCopy(4:cloudtrace.TInfo tinfo,
-                  1:security.AuthInfo credentials,
-                  2:string name, 
-                  3:string fullyQualifiedFileName,
-                  5:bool sort) throws (1:security.ThriftSecurityException sec),
-
-    // Support log garbage collection              
-    list<string> getClosedLogs(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec),
-    oneway void remove(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:list<string> files),
-    
-    // Shutdown
-    void beginShutdown(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec);
-    oneway void halt(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials);
-}

Modified: accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java (original)
+++ accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java Tue Jun  5 13:18:22 2012
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.client.m
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
-import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
 import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
 import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;

Modified: accumulo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/trunk/pom.xml?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/pom.xml (original)
+++ accumulo/trunk/pom.xml Tue Jun  5 13:18:22 2012
@@ -498,7 +498,7 @@
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-core</artifactId>
-        <version>0.20.203.0</version>
+        <version>0.20.205.0</version>
         <scope>provided</scope>
       </dependency>
       <dependency>

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Tue Jun  5 13:18:22 2012
@@ -57,6 +57,7 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.StopWatch;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -138,7 +139,7 @@ public class BulkImporter {
       final Map<Path,List<TabletLocation>> assignments = Collections.synchronizedSortedMap(new TreeMap<Path,List<TabletLocation>>());
       
       timer.start(Timers.EXAMINE_MAP_FILES);
-      ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+      ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
       
       for (Path path : paths) {
         final Path mapFile = path;
@@ -376,7 +377,7 @@ public class BulkImporter {
     
     final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<Path,List<AssignmentInfo>>());
     
-    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
     
     for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
       if (entry.getValue().size() == 1) {
@@ -586,12 +587,11 @@ public class BulkImporter {
       apt.put(entry.getKey(), entry.getValue());
     }
     
-    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
     
     for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer.entrySet()) {
       String location = entry.getKey();
-      threadPool
-          .submit(new TraceRunnable(new LoggingRunnable(log, new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()))));
+      threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()));
     }
     
     threadPool.shutdown();

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Tue Jun  5 13:18:22 2012
@@ -17,53 +17,49 @@
 package org.apache.accumulo.server.gc;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.UUID;
 
 import org.apache.accumulo.cloudtrace.instrument.Span;
 import org.apache.accumulo.cloudtrace.instrument.Trace;
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
-import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.util.MetadataTable.LogEntry;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 
 
 public class GarbageCollectWriteAheadLogs {
   private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
   
-  private final AccumuloConfiguration conf;
+  private final Instance instance;
   private final FileSystem fs;
   
-  GarbageCollectWriteAheadLogs(FileSystem fs, AccumuloConfiguration conf) {
+  GarbageCollectWriteAheadLogs(Instance instance, FileSystem fs) {
+    this.instance = instance;
     this.fs = fs;
-    this.conf = conf;
   }
 
   public void collect(GCStatus status) {
@@ -111,59 +107,51 @@ public class GarbageCollectWriteAheadLog
     }
   }
   
+  boolean holdsLock(InetSocketAddress addr) {
+    try {
+      String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + org.apache.accumulo.core.util.AddressUtil.toString(addr);
+      List<String> children = ZooReaderWriter.getInstance().getChildren(zpath);
+      return !(children == null || children.isEmpty());
+    } catch (KeeperException.NoNodeException ex) {
+      return false;
+    } catch (Exception ex) {
+      log.debug(ex, ex);
+      return true;
+    }
+  }
+
   private int removeFiles(Map<String,ArrayList<String>> serverToFileMap, final GCStatus status) {
-    final AtomicInteger count = new AtomicInteger();
-    ExecutorService threadPool = java.util.concurrent.Executors.newCachedThreadPool();
-    
-    for (final Entry<String,ArrayList<String>> serverFiles : serverToFileMap.entrySet()) {
-      final String server = serverFiles.getKey();
-      final List<String> files = serverFiles.getValue();
-      threadPool.submit(new Runnable() {
-        @Override
-        public void run() {
+    AccumuloConfiguration conf = instance.getConfiguration();
+    for (Entry<String,ArrayList<String>> entry : serverToFileMap.entrySet()) {
+      if (entry.getKey().length() == 0) {
+        // old-style log entry, just remove it
+        for (String filename : entry.getValue()) {
+          log.debug("Removing old-style WAL " + entry.getValue());
           try {
-            Iface logger = ThriftUtil.getClient(new MutationLogger.Client.Factory(), server, Property.LOGGER_PORT, Property.TSERV_LOGGER_TIMEOUT, conf);
-            try {
-              count.addAndGet(files.size());
-              log.debug(String.format("removing %d files from %s", files.size(), server));
-              if (files.size() > 0) {
-                log.debug("deleting files on logger " + server);
-                for (String file : files) {
-                  log.debug("Deleting " + file);
-                }
-                logger.remove(null, SecurityConstants.getSystemCredentials(), files);
-                synchronized (status.currentLog) {
-                  status.currentLog.deleted += files.size();
-                }
-              }
-            } finally {
-              ThriftUtil.returnClient(logger);
-            }
-            log.info(String.format("Removed %d files from %s", files.size(), server));
-            for (String file : files) {
-              try {
-                for (FileStatus match : fs.globStatus(new Path(ServerConstants.getRecoveryDir(), file + "*"))) {
-                  fs.delete(match.getPath(), true);
-                }
-              } catch (IOException ex) {
-                log.warn("Error deleting recovery data: ", ex);
-              }
-            }
-          } catch (TTransportException err) {
-            log.info("Ignoring communication error talking to logger " + serverFiles.getKey() + " (probably a timeout)");
-          } catch (TException err) {
-            log.info("Ignoring exception talking to logger " + serverFiles.getKey() + "(" + err + ")");
+            fs.delete(new Path(Constants.getWalDirectory(conf), filename), true);
+          } catch (IOException ex) {
+            log.error("Unable to delete wal " + filename + ": " + ex);
           }
         }
-      });
-      
+      } else {
+        InetSocketAddress address = AddressUtil.parseAddress(entry.getKey(), Property.TSERV_CLIENTPORT);
+        if (!holdsLock(address))
+          continue;
+        Iface tserver = null;
+        try {
+          tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+          tserver.removeLogs(null, SecurityConstants.getSystemCredentials(), entry.getValue());
+          log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
+          status.currentLog.deleted += entry.getValue().size();
+        } catch (TException e) {
+          log.warn("Error talking to " + address + ": " + e);
+        } finally {
+          if (tserver != null)
+            ThriftUtil.returnClient(tserver);
+        }
+      }
     }
-    threadPool.shutdown();
-    while (!threadPool.isShutdown())
-      try {
-        threadPool.awaitTermination(1, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {}
-    return count.get();
+    return 0;
   }
   
   private static Map<String,ArrayList<String>> mapServersToFiles(Map<String,String> fileToServerMap) {
@@ -194,31 +182,41 @@ public class GarbageCollectWriteAheadLog
   }
   
   private int scanServers(Map<String,String> fileToServerMap) throws Exception {
-    int count = 0;
-    IZooReaderWriter zk = ZooReaderWriter.getInstance();
-    String loggersDir = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZLOGGERS;
-    List<String> servers = zk.getChildren(loggersDir, null);
-    Collections.shuffle(servers);
-    for (String server : servers) {
-      String address = "no-data";
-      count++;
-      try {
-        byte[] data = zk.getData(loggersDir + "/" + server, null);
-        address = new String(data);
-        Iface logger = ThriftUtil.getClient(new MutationLogger.Client.Factory(), address, Property.LOGGER_PORT, Property.TSERV_LOGGER_TIMEOUT, conf);
-        for (String log : logger.getClosedLogs(null, SecurityConstants.getSystemCredentials())) {
-          fileToServerMap.put(log, address);
+    AccumuloConfiguration conf = instance.getConfiguration();
+    Path walRoot = new Path(Constants.getWalDirectory(conf));
+    for (FileStatus status : fs.listStatus(walRoot)) {
+      String name = status.getPath().getName();
+      if (status.isDir()) {
+        for (FileStatus file : fs.listStatus(new Path(walRoot, name))) {
+          if (isUUID(file.getPath().getName()))
+            fileToServerMap.put(file.getPath().getName(), name);
+          else {
+            log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
+          }
         }
-        ThriftUtil.returnClient(logger);
-      } catch (TException err) {
-        log.warn("Ignoring exception talking to logger " + address);
-      }
-      if (SimpleGarbageCollector.almostOutOfMemory()) {
-        log.warn("Running out of memory collecting write-ahead log file names from loggers, continuing with a partial list");
-        break;
+      } else if (isUUID(name)) {
+        // old-style WAL are not under a directory
+        fileToServerMap.put(name, "");
+      } else {
+        log.info("Ignoring file " + name + " because it doesn't look like a uuid");
       }
     }
+
+    int count = 0;
     return count;
   }
   
+  /**
+   * @param name
+   * @return
+   */
+  static private boolean isUUID(String name) {
+    try {
+      UUID.fromString(name);
+      return true;
+    } catch (IllegalArgumentException ex) {
+      return false;
+    }
+  }
+  
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Tue Jun  5 13:18:22 2012
@@ -64,6 +64,7 @@ import org.apache.accumulo.core.master.s
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -297,7 +298,7 @@ public class SimpleGarbageCollector impl
       
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
-      GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(fs, instance.getConfiguration());
+      GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs);
       try {
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
@@ -585,7 +586,7 @@ public class SimpleGarbageCollector impl
     
     final BatchWriter finalWriter = writer;
     
-    ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads);
+    ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads, new NamingThreadFactory("deleting"));
     
     for (final String delete : confirmedDeletes) {
       

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Tue Jun  5 13:18:22 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.accumulo.server.logger;
 
-import java.io.FileNotFoundException;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
@@ -36,9 +36,9 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 
 public class LogReader {
@@ -108,28 +108,33 @@ public class LogReader {
       
       if (fs.isFile(path)) {
         // read log entries from a simple hdfs file
-        org.apache.hadoop.io.SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(file), conf);
-        while (reader.next(key, value)) {
+        FSDataInputStream f = fs.open(path);
+        while (true) {
+          try {
+            key.readFields(f);
+            value.readFields(f);
+          } catch (EOFException ex) {
+            break;
+          }
           printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
         }
       } else if (local.isFile(path)) {
         // read log entries from a simple file
-        org.apache.hadoop.io.SequenceFile.Reader reader = new SequenceFile.Reader(local, new Path(file), conf);
-        while (reader.next(key, value)) {
+        FSDataInputStream f = fs.open(path);
+        while (true) {
+          try {
+            key.readFields(f);
+            value.readFields(f);
+          } catch (EOFException ex) {
+            break;
+          }
           printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
         }
       } else {
-        try {
-          // read the log entries sorted in a map file
-          MultiReader input = new MultiReader(fs, conf, file);
-          while (input.next(key, value)) {
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
-          }
-        } catch (FileNotFoundException ex) {
-          SequenceFile.Reader input = new SequenceFile.Reader(local, new Path(file), conf);
-          while (input.next(key, value)) {
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
-          }
+        // read the log entries sorted in a map file
+        MultiReader input = new MultiReader(fs, conf, file);
+        while (input.next(key, value)) {
+          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
         }
       }
     }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java Tue Jun  5 13:18:22 2012
@@ -133,15 +133,6 @@ public class LiveTServerSet implements W
       }
     }
     
-    public void useLoggers(Set<String> loggers) throws TException {
-      TabletClientService.Iface client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.useLoggers(null, SecurityConstants.getSystemCredentials(), loggers);
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
     public void chop(ZooLock lock, KeyExtent extent) throws TException {
       TabletClientService.Iface client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
       try {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java Tue Jun  5 13:18:22 2012
@@ -69,7 +69,6 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.master.thrift.LoggerStatus;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.master.thrift.MasterClientService.Processor;
 import org.apache.accumulo.core.master.thrift.MasterGoalState;
@@ -88,28 +87,20 @@ import org.apache.accumulo.core.util.Byt
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.Accumulo;
-import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fate.Fate;
 import org.apache.accumulo.server.fate.TStore.TStatus;
 import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
-import org.apache.accumulo.server.master.CoordinateRecoveryTask.JobComplete;
-import org.apache.accumulo.server.master.CoordinateRecoveryTask.LogFile;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
-import org.apache.accumulo.server.master.TabletServerLoggers.LoggerWatcher;
 import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
-import org.apache.accumulo.server.master.balancer.LoggerBalancer;
-import org.apache.accumulo.server.master.balancer.LoggerUser;
-import org.apache.accumulo.server.master.balancer.SimpleLoggerBalancer;
-import org.apache.accumulo.server.master.balancer.TServerUsesLoggers;
 import org.apache.accumulo.server.master.balancer.TabletBalancer;
+import org.apache.accumulo.server.master.recovery.RecoverLease;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.CurrentState;
 import org.apache.accumulo.server.master.state.DeadServerList;
@@ -147,7 +138,6 @@ import org.apache.accumulo.server.securi
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.apache.accumulo.server.security.ZKAuthenticator;
 import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
 import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.accumulo.server.util.DefaultMap;
@@ -177,17 +167,19 @@ import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 
 /**
- * The Master is responsible for assigning and balancing tablets and loggers to tablet servers.
+ * The Master is responsible for assigning and balancing tablets to tablet servers.
  * 
  * The master will also coordinate log recoveries and reports general status.
  */
-public class Master implements LiveTServerSet.Listener, LoggerWatcher, TableObserver, CurrentState, JobComplete {
+public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState {
   
   final private static Logger log = Logger.getLogger(Master.class);
   
@@ -216,8 +208,6 @@ public class Master implements LiveTServ
   
   private ZooLock masterLock = null;
   private TServer clientService = null;
-  private TabletServerLoggers loggers = null;
-  private CoordinateRecoveryTask recovery = null;
   private TabletBalancer tabletBalancer;
   
   private MasterState state = MasterState.INITIAL;
@@ -227,8 +217,8 @@ public class Master implements LiveTServ
   volatile private SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections
       .unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>());
   
-  private LoggerBalancer loggerBalancer;
-  
+  private Set<String> recoveriesInProgress = Collections.synchronizedSet(new HashSet<String>());
+
   synchronized private MasterState getMasterState() {
     return state;
   }
@@ -549,7 +539,6 @@ public class Master implements LiveTServ
     tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this);
     this.tabletBalancer = createInstanceFromPropertyName(aconf, Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer());
     this.tabletBalancer.init(serverConfig);
-    this.loggerBalancer = createInstanceFromPropertyName(aconf, Property.MASTER_LOGGER_BALANCER, LoggerBalancer.class, new SimpleLoggerBalancer());
   }
   
   public TServerConnection getConnection(TServerInstance server) {
@@ -709,11 +698,6 @@ public class Master implements LiveTServ
     @Override
     public MasterMonitorInfo getMasterStats(TInfo info, AuthInfo credentials) throws ThriftSecurityException, TException {
       final MasterMonitorInfo result = new MasterMonitorInfo();
-      result.loggers = new ArrayList<LoggerStatus>();
-      for (String logger : loggers.getLoggersFromZooKeeper().values()) {
-        result.loggers.add(new LoggerStatus(logger));
-      }
-      result.recovery = recovery.status();
       
       result.tServerInfo = new ArrayList<TabletServerStatus>();
       result.tableMap = new DefaultMap<String,TableInfo>(new TableInfo());
@@ -742,8 +726,6 @@ public class Master implements LiveTServ
       }
       DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS);
       result.deadTabletServers = obit.getList();
-      obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADLOGGERS);
-      result.deadLoggers = obit.getList();
       return result;
     }
     
@@ -858,10 +840,6 @@ public class Master implements LiveTServ
         balancer.init(serverConfig);
         tabletBalancer = balancer;
         log.info("tablet balancer changed to " + tabletBalancer.getClass().getName());
-      } else if (property.equals(Property.MASTER_LOGGER_BALANCER.getKey())) {
-        loggerBalancer = createInstanceFromPropertyName(instance.getConfiguration(), Property.MASTER_LOGGER_BALANCER, LoggerBalancer.class,
-            new SimpleLoggerBalancer());
-        log.info("log balancer changed to " + loggerBalancer.getClass().getName());
       }
     }
 
@@ -1089,6 +1067,7 @@ public class Master implements LiveTServ
       fate.delete(opid);
       
     }
+    
   }
   
   public MergeInfo getMergeInfo(Text tableId) {
@@ -1373,9 +1352,8 @@ public class Master implements LiveTServ
             
             if (goal == TabletGoalState.HOSTED) {
               if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
-                if (!recovery.recover(SecurityConstants.getSystemCredentials(), tls.extent, tls.walogs, Master.this)) {
+                if (recoverLogs(tls.extent, tls.walogs))
                   continue;
-                }
               }
               switch (state) {
                 case HOSTED:
@@ -1959,7 +1937,6 @@ public class Master implements LiveTServ
       } else if (getMasterGoalState() == MasterGoalState.CLEAN_STOP) {
         log.debug("not balancing because the master is attempting to stop cleanly");
       } else {
-        balanceLoggers();
         return balanceTablets();
       }
       return DEFAULT_WAIT_FOR_WATCHER;
@@ -1992,28 +1969,6 @@ public class Master implements LiveTServ
       }
     }
     
-    private void balanceLoggers() {
-      List<LoggerUser> logUsers = new ArrayList<LoggerUser>();
-      for (Entry<TServerInstance,TabletServerStatus> entry : tserverStatus.entrySet()) {
-        logUsers.add(new TServerUsesLoggers(entry.getKey(), entry.getValue()));
-      }
-      List<String> logNames = new ArrayList<String>(loggers.getLoggersFromZooKeeper().values());
-      Map<LoggerUser,List<String>> assignmentsOut = new HashMap<LoggerUser,List<String>>();
-      int loggersPerServer = getSystemConfiguration().getCount(Property.TSERV_LOGGER_COUNT);
-      loggerBalancer.balance(logUsers, logNames, assignmentsOut, loggersPerServer);
-      for (Entry<LoggerUser,List<String>> entry : assignmentsOut.entrySet()) {
-        TServerUsesLoggers tserver = (TServerUsesLoggers) entry.getKey();
-        try {
-          log.debug("Telling " + tserver.getInstance() + " to use loggers " + entry.getValue());
-          TServerConnection connection = tserverSet.getConnection(tserver.getInstance());
-          if (connection != null)
-            connection.useLoggers(new HashSet<String>(entry.getValue()));
-        } catch (Exception ex) {
-          log.warn("Unable to talk to " + tserver.getInstance(), ex);
-        }
-      }
-    }
-    
     private long balanceTablets() {
       List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
       Set<KeyExtent> migrationsCopy = new HashSet<KeyExtent>();
@@ -2048,7 +2003,7 @@ public class Master implements LiveTServ
         result.put(server, status);
         // TODO maybe remove from bad servers
       } catch (Exception ex) {
-        log.error("unable to get tablet server status " + server + " " + ex.getMessage());
+        log.error("unable to get tablet server status " + server + " " + ex.toString());
         log.debug("unable to get tablet server status " + server, ex);
         if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
           log.warn("attempting to stop " + server);
@@ -2073,6 +2028,31 @@ public class Master implements LiveTServ
     return result;
   }
   
+  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
+    boolean recoveryNeeded = false;
+    for (Collection<String> logs : walogs) {
+      for (String log : logs) {
+        String parts[] = log.split("/");
+        String host = parts[0];
+        String filename = parts[1];
+        if (fs.exists(new Path(Constants.getRecoveryDir(getSystemConfiguration()) + "/" + filename + "/finished"))) {
+          recoveriesInProgress.remove(filename);
+          continue;
+        }
+        recoveryNeeded = true;
+        synchronized (recoveriesInProgress) {
+          if (!recoveriesInProgress.contains(filename)) {
+            Master.log.info("Starting recovery of " + filename + " created for " + host + ", tablet " + extent + " holds a reference");
+            long tid = fate.startTransaction();
+            fate.seedTransaction(tid, new RecoverLease(host, filename), true);
+            recoveriesInProgress.add(filename);
+          }
+        }
+      }
+    }
+    return recoveryNeeded;
+  }
+
   public void run() throws IOException, InterruptedException, KeeperException {
     final String zroot = ZooUtil.getRoot(instance);
     
@@ -2080,13 +2060,6 @@ public class Master implements LiveTServ
     
     TableManager.getInstance().addObserver(this);
     
-    recovery = new CoordinateRecoveryTask(fs, getSystemConfiguration());
-    Thread recoveryThread = new Daemon(new LoggingRunnable(log, recovery), "Recovery Status");
-    recoveryThread.start();
-    
-    loggers = new TabletServerLoggers(this, getSystemConfiguration());
-    loggers.scanZooKeeperForUpdates();
-    
     StatusThread statusThread = new StatusThread();
     statusThread.start();
     
@@ -2094,6 +2067,13 @@ public class Master implements LiveTServ
     migrationCleanupThread.start();
     
     tserverSet.startListeningForTabletServerChanges();
+
+    ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        nextEvent.event("Noticed recovery changes", event.getType());
+      }
+    });
     
     AuthInfo systemAuths = SecurityConstants.getSystemCredentials();
     final TabletStateStore stores[] = {new ZooTabletStateStore(new ZooStore(zroot)), new RootTabletStateStore(instance, systemAuths, this),
@@ -2129,9 +2109,6 @@ public class Master implements LiveTServ
     final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
     statusThread.join(remaining(deadline));
     
-    recovery.stop();
-    recoveryThread.join(remaining(deadline));
-    
     // quit, even if the tablet servers somehow jam up and the watchers
     // don't stop
     for (TabletGroupWatcher watcher : watchers) {
@@ -2199,42 +2176,9 @@ public class Master implements LiveTServ
     }
   }
   
-  @Override
-  public void newLogger(String address) {
-    try {
-      RemoteLogger remote = new RemoteLogger(address, getSystemConfiguration());
-      for (String onDisk : remote.getClosedLogs()) {
-        Path path = new Path(ServerConstants.getRecoveryDir(), onDisk + ".failed");
-        if (fs.exists(path)) {
-          fs.delete(path, true);
-        }
-      }
-    } catch (Exception ex) {
-      log.warn("Unexpected error clearing failed recovery markers for new logger", ex);
-    }
-    DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADLOGGERS);
-    obit.delete(address);
-    nextEvent.event("Added logger %s", address);
-  }
-  
   static final String I_DONT_KNOW_WHY = "unexpected failure";
   
   @Override
-  public void deadLogger(String address) {
-    DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADLOGGERS);
-    InetSocketAddress parseAddress = AddressUtil.parseAddress(address, Property.LOGGER_PORT);
-    String cause = I_DONT_KNOW_WHY;
-    for (TServerInstance server : serversToShutdown) {
-      if (server.getLocation().getHostName().equals(parseAddress.getHostName())) {
-        cause = "clean shutdown";
-        break;
-      }
-    }
-    obit.post(address, cause);
-    log.info("Noticed logger went away: " + address);
-  }
-  
-  @Override
   public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
     DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS);
     if (added.size() > 0) {
@@ -2309,10 +2253,6 @@ public class Master implements LiveTServ
     return tserverSet.getCurrentServers();
   }
   
-  public Map<String,String> getLoggers() {
-    return loggers.getLoggersFromZooKeeper();
-  }
-  
   @Override
   public Collection<MergeInfo> merges() {
     List<MergeInfo> result = new ArrayList<MergeInfo>();
@@ -2322,11 +2262,6 @@ public class Master implements LiveTServ
     return result;
   }
   
-  @Override
-  public void finished(LogFile entry) {
-    nextEvent.event("Log recovery %s complete ", entry);
-  }
-  
   public void killTServer(TServerInstance server) {
     nextEvent.event("Forcing server down %s", server);
     serversToShutdown.add(server);
@@ -2358,4 +2293,7 @@ public class Master implements LiveTServ
     return this.fs;
   }
 
+  public void updateRecoveryInProgress(String file) {
+    recoveriesInProgress.add(file);
+  }
 }

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java?rev=1346380&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java Tue Jun  5 13:18:22 2012
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.master.tableOps.MasterRepo;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+
+public class RecoverLease extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+
+  private String server;
+  private String file;
+  private long start;
+
+  public RecoverLease(String server, String file) {
+    this.server = server;
+    this.file = file;
+    this.start = System.currentTimeMillis();
+  }
+  
+  public static Path getSource(Master master, String server, String file) {
+    String source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + server + "/" + file;
+    if (server.contains(":")) {
+      // old-style logger log, copied from local file systems by tservers, unsorted into the wal base dir
+      source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + file;
+    }
+    return new Path(source);
+  }
+  
+  public Path getSource(Master master) {
+    return getSource(master, server, file);
+  }
+
+  @Override
+  public long isReady(long tid, Master master) throws Exception {
+    master.updateRecoveryInProgress(file);
+    long diff = System.currentTimeMillis() - start;
+    if (diff < master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY))
+      return Math.max(diff, 0);
+    FileSystem fs = master.getFileSystem();
+    if (fs.exists(getSource(master)))
+      return 0;
+    log.warn("Unable to locate file " + file + " wal for server " + server);
+    return 1000;
+  }
+
+  @Override
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    Path source = getSource(master);
+    FileSystem fs = master.getFileSystem();
+    if (fs instanceof TraceFileSystem)
+      fs = ((TraceFileSystem) fs).getImplementation();
+    try {
+      if (fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem) fs;
+        dfs.recoverLease(source);
+        log.info("Recovered lease on " + source.toString());
+        return new SubmitFileForRecovery(server, file);
+      }
+    } catch (IOException ex) {
+      log.error("error recovering lease ", ex);
+    }
+    try {
+      fs.append(source).close();
+      log.info("Recovered lease on " + source.toString() + " using append");
+
+    } catch (IOException ex) {
+      log.error("error recovering lease using append", ex);
+    }
+    // lets do this again
+    return new RecoverLease(server, file);
+  }
+  
+}

Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java?rev=1346380&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java Tue Jun  5 13:18:22 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.master.tableOps.MasterRepo;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ * 
+ */
+public class SubmitFileForRecovery extends MasterRepo implements Repo<Master> {
+  
+  private static final long serialVersionUID = 1L;
+  String server;
+  String file;
+  
+  SubmitFileForRecovery(String server, String file) {
+    this.server = server;
+    this.file = file;
+  }
+
+  @Override
+  public Repo<Master> call(long tid, final Master master) throws Exception {
+    master.updateRecoveryInProgress(file);
+    String source = RecoverLease.getSource(master, server, file).toString();
+    ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;
+    zoo.putPersistentData(path, source.getBytes(), NodeExistsPolicy.SKIP);
+    log.info("Created zookeeper entry " + path + " with data " + source);
+    zoo.exists(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        switch (event.getType()) {
+          case NodeDataChanged:
+            log.info("noticed recovery entry for " + file + " was removed");
+            FileSystem fs = master.getFileSystem();
+            Path finished = new Path(Constants.getRecoveryDir(master.getSystemConfiguration()), "finished");
+            try {
+              if (fs.exists(finished))
+                log.info("log recovery for " + file + " successful");
+              else
+                log.error("zookeeper recovery entry " + path + " has been removed, but the finish flag file is missing");
+            } catch (IOException ex) {
+              log.error("Unable to check on the recovery status of " + file, ex);
+            }
+            break;
+        }
+      }
+      
+    });
+    return null;
+  }
+  
+}

Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Tue Jun  5 13:18:22 2012
@@ -27,10 +27,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.cloudtrace.instrument.TraceExecutorService;
 import org.apache.accumulo.core.Constants;
@@ -49,8 +46,8 @@ import org.apache.accumulo.core.file.Fil
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -367,17 +364,8 @@ class LoadFiles extends MasterRepo {
   
   synchronized void initializeThreadPool(Master master) {
     if (threadPool == null) {
-      int THREAD_POOL_SIZE = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
-      ThreadFactory threadFactory = new ThreadFactory() {
-        int count = 0;
-        
-        @Override
-        public Thread newThread(Runnable r) {
-          return new Daemon(r, "bulk loader " + count++);
-        }
-      };
-      ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
-          threadFactory);
+      int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
+      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
       pool.allowCoreThreadTimeOut(true);
       threadPool = new TraceExecutorService(pool);
     }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java Tue Jun  5 13:18:22 2012
@@ -22,9 +22,9 @@ import org.apache.accumulo.core.util.Add
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.fate.Repo;
-import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.EventCoordinator.Listener;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.tableOps.MasterRepo;
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
@@ -60,7 +60,7 @@ public class ShutdownTServer extends Mas
       path = ZooUtil.getRoot(m.getInstance()) + Constants.ZDEADTSERVERS + "/" + tserver;
       IZooReaderWriter zoo = ZooReaderWriter.getInstance();
       zoo.putPersistentData(path, "forced down".getBytes(), NodeExistsPolicy.OVERWRITE);
-      return new DisconnectLogger(server.getLocation().getAddress().getHostAddress());
+      return null;
     }
     
     // TODO move this to isReady() and drop while loop?
@@ -86,7 +86,7 @@ public class ShutdownTServer extends Mas
       listener.waitForEvents(1000);
     }
     
-    return new DisconnectLogger(server.getLocation().getAddress().getHostAddress());
+    return null;
   }
   
   @Override

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java Tue Jun  5 13:18:22 2012
@@ -56,7 +56,6 @@ import org.apache.accumulo.server.monito
 import org.apache.accumulo.server.monitor.servlets.GcStatusServlet;
 import org.apache.accumulo.server.monitor.servlets.JSONServlet;
 import org.apache.accumulo.server.monitor.servlets.LogServlet;
-import org.apache.accumulo.server.monitor.servlets.LoggersServlet;
 import org.apache.accumulo.server.monitor.servlets.MasterServlet;
 import org.apache.accumulo.server.monitor.servlets.OperationServlet;
 import org.apache.accumulo.server.monitor.servlets.ProblemServlet;
@@ -311,7 +310,6 @@ public class Monitor {
           UtilWaitThread.sleep(1000);
       }
       if (mmi != null) {
-        
         int majorCompactions = 0;
         int minorCompactions = 0;
         
@@ -320,7 +318,7 @@ public class Monitor {
         indexCacheRequestTracker.startingUpdates();
         dataCacheHitTracker.startingUpdates();
         dataCacheRequestTracker.startingUpdates();
-      
+
         for (TabletServerStatus server : mmi.tServerInfo) {
           TableInfo summary = Monitor.summarizeTableStats(server);
           totalIngestRate += summary.ingestRate;
@@ -368,7 +366,6 @@ public class Monitor {
         
         ingestRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestRate));
         ingestByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestByteRate));
-        recoveriesOverTime.add(new Pair<Long,Integer>(currentTime, mmi.recovery.size()));
         
         double totalLoad = 0.;
         for (TabletServerStatus status : mmi.tServerInfo) {
@@ -390,7 +387,6 @@ public class Monitor {
         calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker);
         calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker);
       }
-      
       try {
         Monitor.problemSummary = ProblemReports.getInstance().summarize();
         Monitor.problemException = null;
@@ -480,7 +476,6 @@ public class Monitor {
     server.addServlet(MasterServlet.class, "/master");
     server.addServlet(TablesServlet.class, "/tables");
     server.addServlet(TServersServlet.class, "/tservers");
-    server.addServlet(LoggersServlet.class, "/loggers");
     server.addServlet(ProblemServlet.class, "/problems");
     server.addServlet(GcStatusServlet.class, "/gc");
     server.addServlet(LogServlet.class, "/log");

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java Tue Jun  5 13:18:22 2012
@@ -176,7 +176,6 @@ abstract public class BasicServlet exten
     sb.append("<hr />\n");
     sb.append("<a href='/master'>Master&nbsp;Server</a><br />\n");
     sb.append("<a href='/tservers'>Tablet&nbsp;Servers</a><br />\n");
-    sb.append("<a href='/loggers'>Logger&nbsp;Servers</a><br />\n");
     sb.append("<a href='/vis'>Server Activity</a><br />\n");
     sb.append("<a href='/gc'>Garbage&nbsp;Collector</a><br />\n");
     sb.append("<a href='/tables'>Tables</a><br />\n");

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java Tue Jun  5 13:18:22 2012
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.master.thrift.MasterState;
 import org.apache.accumulo.core.master.thrift.RecoveryStatus;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.monitor.DedupedLogEvent;
@@ -43,9 +42,9 @@ import org.apache.accumulo.server.monito
 import org.apache.accumulo.server.monitor.util.Table;
 import org.apache.accumulo.server.monitor.util.TableRow;
 import org.apache.accumulo.server.monitor.util.celltypes.DurationType;
-import org.apache.accumulo.server.monitor.util.celltypes.LoggerLinkType;
 import org.apache.accumulo.server.monitor.util.celltypes.NumberType;
 import org.apache.accumulo.server.monitor.util.celltypes.ProgressChartType;
+import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.log4j.Level;
 
 public class MasterServlet extends BasicServlet {
@@ -57,7 +56,7 @@ public class MasterServlet extends Basic
   @Override
   protected String getTitle(HttpServletRequest req) {
     List<String> masters = Monitor.getInstance().getMasterLocations();
-    return "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0), 0).getHostName());
+    return "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0), Property.MASTER_CLIENTPORT).getHostName());
   }
   
   @Override
@@ -137,8 +136,6 @@ public class MasterServlet extends Basic
       masterStatus.addSortableColumn("#&nbsp;Online<br />Tablet&nbsp;Servers", new NumberType<Integer>((int) (slaves.size() * 0.8 + 1.0), slaves.size(),
           (int) (slaves.size() * 0.6 + 1.0), slaves.size()), "Number of tablet servers currently available");
       masterStatus.addSortableColumn("#&nbsp;Total<br />Tablet&nbsp;Servers", new NumberType<Integer>(), "The total number of tablet servers configured");
-      masterStatus.addSortableColumn("Loggers", new NumberType<Integer>((int) (slaves.size() * .8), Integer.MAX_VALUE, 1, Integer.MAX_VALUE),
-          "The number of write-ahead loggers.  This should be approximately the same as the number of tablet servers (and greater than zero).");
       masterStatus.addSortableColumn("Last&nbsp;GC", null, "The last time files were cleaned-up from HDFS.");
       masterStatus.addSortableColumn("#&nbsp;Tablets", new NumberType<Integer>(0, Integer.MAX_VALUE, 2, Integer.MAX_VALUE), null);
       masterStatus.addSortableColumn("#&nbsp;Unassigned<br />Tablets", new NumberType<Integer>(0, 0), null);
@@ -153,10 +150,9 @@ public class MasterServlet extends Basic
       masterStatus.addSortableColumn("OS&nbsp;Load", new NumberType<Double>(0., guessHighLoad * 1., 0., guessHighLoad * 3.),
           "The one-minute load average on the computer that runs the monitor web server.");
       TableRow row = masterStatus.prepareRow();
-      row.add(masters.size() == 0 ? "<div class='error'>Down</div>" : AddressUtil.parseAddress(masters.get(0), 0).getHostName());
+      row.add(masters.size() == 0 ? "<div class='error'>Down</div>" : AddressUtil.parseAddress(masters.get(0), Property.MASTER_CLIENTPORT).getHostName());
       row.add(Monitor.getMmi().tServerInfo.size());
       row.add(slaves.size());
-      row.add(Monitor.getMmi().loggers.size());
       row.add("<a href='/gc'>" + gcStatus + "</a>");
       row.add(Monitor.getTotalTabletCount());
       row.add(Monitor.getMmi().unassignedTablets);
@@ -177,25 +173,28 @@ public class MasterServlet extends Basic
   private void doRecoveryList(HttpServletRequest req, StringBuilder sb) {
     MasterMonitorInfo mmi = Monitor.getMmi();
     if (mmi != null) {
-      List<RecoveryStatus> jobs = mmi.recovery;
-      if (jobs != null && jobs.size() > 0) {
-        Table recoveryTable = new Table("logRecovery", "Log&nbsp;Recovery");
-        recoveryTable.setSubCaption("Some tablets were unloaded in an unsafe manner. Write-ahead logs are being recovered.");
-        recoveryTable.addSortableColumn("Server", new LoggerLinkType(), null);
-        recoveryTable.addSortableColumn("Log");
-        recoveryTable.addSortableColumn("Time", new DurationType(), null);
-        recoveryTable.addSortableColumn("Copy/Sort", new ProgressChartType(), null);
-        
-        for (RecoveryStatus recovery : jobs) {
-          TableRow row = recoveryTable.prepareRow();
-          row.add(recovery);
-          row.add(recovery.name);
-          row.add((long) recovery.runtime);
-          row.add(recovery.copyProgress);
-          recoveryTable.addRow(row);
+      Table recoveryTable = new Table("logRecovery", "Log&nbsp;Recovery");
+      recoveryTable.setSubCaption("Some tablets were unloaded in an unsafe manner. Write-ahead logs are being recovered.");
+      recoveryTable.addSortableColumn("Server");
+      recoveryTable.addSortableColumn("Log");
+      recoveryTable.addSortableColumn("Time", new DurationType(), null);
+      recoveryTable.addSortableColumn("Copy/Sort", new ProgressChartType(), null);
+      int rows = 0;
+      for (TabletServerStatus server : mmi.tServerInfo) {
+        if (server.logSorts != null) {
+          for (RecoveryStatus recovery : server.logSorts) {
+            TableRow row = recoveryTable.prepareRow();
+            row.add(AddressUtil.parseAddress(server.name, Property.TSERV_CLIENTPORT).getHostName());
+            row.add(recovery.name);
+            row.add((long) recovery.runtime);
+            row.add(recovery.progress);
+            recoveryTable.addRow(row);
+            rows++;
+          }
         }
-        recoveryTable.generate(req, sb);
       }
+      if (rows > 0)
+        recoveryTable.generate(req, sb);
     }
   }
   

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java Tue Jun  5 13:18:22 2012
@@ -167,8 +167,6 @@ public class OperationServlet extends Ba
       // a dead server should have a uniq address: a logger or tserver
       DeadServerList obit = new DeadServerList(ZooUtil.getRoot(inst) + Constants.ZDEADTSERVERS);
       obit.delete(server);
-      obit = new DeadServerList(ZooUtil.getRoot(inst) + Constants.ZDEADLOGGERS);
-      obit.delete(server);
     }
   }
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java Tue Jun  5 13:18:22 2012
@@ -27,7 +27,6 @@ import org.apache.accumulo.core.client.I
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.master.thrift.Compacting;
 import org.apache.accumulo.core.master.thrift.DeadServer;
-import org.apache.accumulo.core.master.thrift.LoggerStatus;
 import org.apache.accumulo.core.master.thrift.TableInfo;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -80,12 +79,6 @@ public class XMLServlet extends BasicSer
       sb.append("</compactions>\n");
       
       sb.append("<tablets>").append(summary.tablets).append("</tablets>\n");
-      if (status.loggers != null) {
-        sb.append("<loggers>");
-        for (String logger : status.loggers)
-          sb.append("<logger>" + logger + "</logger>");
-        sb.append("</loggers>");
-      }
       
       sb.append("<ingest>").append(summary.ingestRate).append("</ingest>\n");
       sb.append("<query>").append(summary.queryRate).append("</query>\n");
@@ -110,12 +103,6 @@ public class XMLServlet extends BasicSer
     }
     sb.append("\n</badTabletServers>\n");
     
-    sb.append("\n<loggers>\n");
-    for (LoggerStatus entry : Monitor.getMmi().loggers) {
-      sb.append(String.format("<logger id='%s'/>\n", entry.logger));
-    }
-    sb.append("\n</loggers>\n");
-    
     sb.append("\n<tabletServersShuttingDown>\n");
     for (String server : Monitor.getMmi().serversShuttingDown) {
       sb.append(String.format("<server id='%s'/>\n", server));

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java Tue Jun  5 13:18:22 2012
@@ -40,11 +40,11 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedKeyIterator;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.accumulo.server.util.NamingThreadFactory;
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.commons.collections.map.LRUMap;

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1346380&r1=1346379&r2=1346380&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Tue Jun  5 13:18:22 2012
@@ -111,8 +111,8 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.tabletserver.TabletServer.TservConstraintEnv;
 import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
 import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
 import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage;
 import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics;
 import org.apache.accumulo.server.trace.TraceFileSystem;
@@ -209,7 +209,7 @@ public class Tablet {
       return Tablet.this;
     }
     
-    public boolean beginUpdatingLogsUsed(ArrayList<RemoteLogger> copy, boolean mincFinish) {
+    public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
       return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish);
     }
     
@@ -1178,7 +1178,6 @@ public class Tablet {
       for (int i = 0; i < files.length; i++) {
         paths[i] = files[i].getPath();
       }
-      log.debug("fs " + fs + " files: " + Arrays.toString(paths) + " location: " + location);
       Collection<String> goodPaths = cleanUpFiles(fs, files, location, true);
       for (String path : goodPaths) {
         String filename = new Path(path).getName();
@@ -1226,17 +1225,6 @@ public class Tablet {
     return datafiles;
   }
   
-  private static Set<RemoteLogger> getCurrentLoggers(List<LogEntry> entries) {
-    Set<RemoteLogger> result = new HashSet<RemoteLogger>();
-    for (LogEntry logEntry : entries) {
-      for (String log : logEntry.logSet) {
-        String[] parts = log.split("/", 2);
-        result.add(new RemoteLogger(parts[0], parts[1], null));
-      }
-    }
-    return result;
-  }
-  
   private static List<LogEntry> lookupLogEntries(KeyExtent ke, SortedMap<Key,Value> tabletsKeyValues) {
     List<LogEntry> logEntries = new ArrayList<LogEntry>();
     
@@ -1470,7 +1458,15 @@ public class Tablet {
           throw new RuntimeException(t);
         }
       }
-      currentLogs = getCurrentLoggers(logEntries);
+      // make some closed references that represent the recovered logs
+      currentLogs = new HashSet<DfsLogger>();
+      for (LogEntry logEntry : logEntries) {
+        for (String log : logEntry.logSet) {
+          String[] parts = log.split("/", 2);
+          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, parts[1]));
+        }
+      }
+      
       log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
           + " entries created)");
     }
@@ -2241,7 +2237,7 @@ public class Tablet {
   private synchronized MinorCompactionTask prepareForMinC(long flushId) {
     CommitSession oldCommitSession = tabletMemory.prepareForMinC();
     otherLogs = currentLogs;
-    currentLogs = new HashSet<RemoteLogger>();
+    currentLogs = new HashSet<DfsLogger>();
     
     String mergeFile = datafileManager.reserveMergingMinorCompactionFile();
     
@@ -3243,12 +3239,8 @@ public class Tablet {
           log.debug("Starting MajC " + extent + " (" + reason + ") " + datafileManager.abs2rel(datafileManager.string2path(copy.keySet())) + " --> "
               + datafileManager.abs2rel(new Path(compactTmpName)));
 
-          Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true, // always
-                                                                                                                                          // propagate
-                                                                                                                                          // deletes,
-                                                                                                                                          // unless
-                                                                                                                                          // last
-                                                                                                                                          // batch
+          // always propagate deletes, unless last batch
+          Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true,
               acuTableConf, extent, cenv, compactionIterators);
           
           CompactionStats mcs = compactor.call();
@@ -3650,8 +3642,18 @@ public class Tablet {
     }
   }
   
-  private Set<RemoteLogger> currentLogs = new HashSet<RemoteLogger>();
+  private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>();
   
+  public Set<String> getCurrentLogs() {
+    Set<String> result = new HashSet<String>();
+    synchronized (currentLogs) {
+      for (DfsLogger log : currentLogs) {
+        result.add(log.toString());
+      }
+    }
+    return result;
+  }
+
   private Set<String> beginClearingUnusedLogs() {
     Set<String> doomed = new HashSet<String>();
     
@@ -3665,12 +3667,12 @@ public class Tablet {
       if (removingLogs)
         throw new IllegalStateException("Attempted to clear logs when removal of logs in progress");
       
-      for (RemoteLogger logger : otherLogs) {
+      for (DfsLogger logger : otherLogs) {
         otherLogsCopy.add(logger.toString());
         doomed.add(logger.toString());
       }
       
-      for (RemoteLogger logger : currentLogs) {
+      for (DfsLogger logger : currentLogs) {
         currentLogsCopy.add(logger.toString());
         doomed.remove(logger.toString());
       }
@@ -3698,7 +3700,7 @@ public class Tablet {
     logLock.unlock();
   }
   
-  private Set<RemoteLogger> otherLogs = Collections.emptySet();
+  private Set<DfsLogger> otherLogs = Collections.emptySet();
   private boolean removingLogs = false;
   
   // this lock is basically used to synchronize writing of log info to !METADATA
@@ -3708,7 +3710,7 @@ public class Tablet {
     return currentLogs.size();
   }
   
-  private boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<RemoteLogger> more, boolean mincFinish) {
+  private boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> more, boolean mincFinish) {
     
     boolean releaseLock = true;
     
@@ -3745,7 +3747,7 @@ public class Tablet {
         
         int numAdded = 0;
         int numContained = 0;
-        for (RemoteLogger logger : more) {
+        for (DfsLogger logger : more) {
           if (addToOther) {
             if (otherLogs.add(logger))
               numAdded++;



Mime
View raw message