accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1451386 - in /accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server: Accumulo.java gc/GarbageCollectWriteAheadLogs.java
Date Thu, 28 Feb 2013 22:07:59 GMT
Author: kturner
Date: Thu Feb 28 22:07:58 2013
New Revision: 1451386

URL: http://svn.apache.org/r1451386
Log:
ACCUMULO-1126 Made AGC clean up unrefed sorted walogs.  Also made it delete unrefed walogs
of offline servers.

Modified:
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java?rev=1451386&r1=1451385&r2=1451386&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java Thu
Feb 28 22:07:58 2013
@@ -165,7 +165,7 @@ public class Accumulo {
           log.error(t, t);
         }
       }
-    }, 1000, 10 * 1000);
+    }, 1000, 10 * 60 * 1000);
   }
   
   public static String getLocalAddress(String[] args) throws UnknownHostException {

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1451386&r1=1451385&r2=1451386&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
(original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
Thu Feb 28 22:07:58 2013
@@ -20,10 +20,12 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.accumulo.core.Constants;
@@ -72,6 +74,9 @@ public class GarbageCollectWriteAheadLog
     
     Span span = Trace.start("scanServers");
     try {
+      
+      Set<String> sortedWALogs = getSortedWALogs();
+
       status.currentLog.started = System.currentTimeMillis();
       
       Map<String,String> fileToServerMap = new HashMap<String,String>();
@@ -84,7 +89,7 @@ public class GarbageCollectWriteAheadLog
       
       span = Trace.start("removeMetadataEntries");
       try {
-        count = removeMetadataEntries(fileToServerMap, status);
+        count = removeMetadataEntries(fileToServerMap, sortedWALogs, status);
       } catch (Exception ex) {
         log.error("Unable to scan metadata table", ex);
         return;
@@ -98,7 +103,7 @@ public class GarbageCollectWriteAheadLog
       span = Trace.start("removeFiles");
       Map<String,ArrayList<String>> serverToFileMap = mapServersToFiles(fileToServerMap);
       
-      count = removeFiles(serverToFileMap, status);
+      count = removeFiles(serverToFileMap, sortedWALogs, status);
       
       long removeStop = System.currentTimeMillis();
       log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count,
serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.));
@@ -126,7 +131,7 @@ public class GarbageCollectWriteAheadLog
     }
   }
 
-  private int removeFiles(Map<String,ArrayList<String>> serverToFileMap, final
GCStatus status) {
+  private int removeFiles(Map<String,ArrayList<String>> serverToFileMap, Set<String>
sortedWALogs, final GCStatus status) {
     AccumuloConfiguration conf = instance.getConfiguration();
     for (Entry<String,ArrayList<String>> entry : serverToFileMap.entrySet())
{
       if (entry.getKey().length() == 0) {
@@ -143,22 +148,50 @@ public class GarbageCollectWriteAheadLog
         }
       } else {
         InetSocketAddress address = AddressUtil.parseAddress(entry.getKey(), Property.TSERV_CLIENTPORT);
-        if (!holdsLock(address))
+        if (!holdsLock(address)) {
+          Path serverPath = new Path(Constants.getWalDirectory(conf), entry.getKey());
+          for (String filename : entry.getValue()) {
+            log.debug("Removing WAL for offline server " + filename);
+            try {
+              Path path = new Path(serverPath, filename);
+              if (trash == null || !trash.moveToTrash(path))
+                fs.delete(path, true);
+            } catch (IOException ex) {
+              log.error("Unable to delete wal " + filename + ": " + ex);
+            }
+          }
           continue;
-        Client tserver = null;
-        try {
-          tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address,
conf);
-          tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(),
entry.getValue());
-          log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
-          status.currentLog.deleted += entry.getValue().size();
-        } catch (TException e) {
-          log.warn("Error talking to " + address + ": " + e);
-        } finally {
-          if (tserver != null)
-            ThriftUtil.returnClient(tserver);
+        } else {
+          Client tserver = null;
+          try {
+            tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address,
conf);
+            tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(),
entry.getValue());
+            log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
+            status.currentLog.deleted += entry.getValue().size();
+          } catch (TException e) {
+            log.warn("Error talking to " + address + ": " + e);
+          } finally {
+            if (tserver != null)
+              ThriftUtil.returnClient(tserver);
+          }
+        }
+      }
+    }
+    
+    Path recoveryDir = new Path(Constants.getRecoveryDir(conf));
+    
+    for (String sortedWALog : sortedWALogs) {
+      log.debug("Removing sorted WAL " + sortedWALog);
+      try {
+        Path swalog = new Path(recoveryDir, sortedWALog);
+        if (trash == null || !trash.moveToTrash(swalog)) {
+          fs.delete(swalog, true);
         }
+      } catch (IOException ioe) {
+        log.error("Unable to delete sorted walog " + sortedWALogs + ": " + ioe);
       }
     }
+
     return 0;
   }
   
@@ -175,7 +208,8 @@ public class GarbageCollectWriteAheadLog
     return serverToFileMap;
   }
   
-  private static int removeMetadataEntries(Map<String,String> fileToServerMap, GCStatus
status) throws IOException, KeeperException, InterruptedException {
+  private static int removeMetadataEntries(Map<String,String> fileToServerMap, Set<String>
sortedWALogs, GCStatus status) throws IOException, KeeperException,
+      InterruptedException {
     int count = 0;
     Iterator<LogEntry> iterator = MetadataTable.getLogEntries(SecurityConstants.getSystemCredentials());
     while (iterator.hasNext()) {
@@ -183,6 +217,9 @@ public class GarbageCollectWriteAheadLog
         filename = filename.split("/", 2)[1];
         if (fileToServerMap.remove(filename) != null)
           status.currentLog.inUse++;
+        
+        sortedWALogs.remove(filename);
+
         count++;
       }
     }
@@ -214,6 +251,25 @@ public class GarbageCollectWriteAheadLog
     return count;
   }
   
+  private Set<String> getSortedWALogs() throws IOException {
+    AccumuloConfiguration conf = instance.getConfiguration();
+    Path recoveryDir = new Path(Constants.getRecoveryDir(conf));
+    
+    Set<String> sortedWALogs = new HashSet<String>();
+
+    if (fs.exists(recoveryDir)) {
+      for (FileStatus status : fs.listStatus(recoveryDir)) {
+        if (isUUID(status.getPath().getName())) {
+          sortedWALogs.add(status.getPath().getName());
+        } else {
+          log.debug("Ignoring file " + status.getPath() + " because it doesn't look like
a uuid");
+        }
+      }
+    }
+    
+    return sortedWALogs;
+  }
+
   /**
    * @param name
    * @return



Mime
View raw message