accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 1.9 updated: fixes #432 Made GC clean up recovery logs (#444)
Date Fri, 04 May 2018 14:35:52 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new 5bf5180  fixes #432 Made GC clean up recovery logs (#444)
5bf5180 is described below

commit 5bf5180063886603d3e7f85d6e779c8f8866b401
Author: Keith Turner <keith@deenlo.com>
AuthorDate: Fri May 4 10:35:49 2018 -0400

    fixes #432 Made GC clean up recovery logs (#444)
---
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java  | 91 +++++++++++++++++-----
 .../gc/GarbageCollectWriteAheadLogsTest.java       | 31 +++++++-
 2 files changed, 98 insertions(+), 24 deletions(-)

diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index d2acb58..405a7a0 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
@@ -59,6 +60,7 @@ import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TabletLocationState;
 import org.apache.accumulo.server.master.state.TabletState;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
@@ -142,6 +144,8 @@ public class GarbageCollectWriteAheadLogs {
     try {
       status.currentLog.started = System.currentTimeMillis();
 
+      Map<UUID,Path> recoveryLogs = getSortedWALogs();
+
       Map<TServerInstance,Set<UUID>> logsByServer = new HashMap<>();
       Map<UUID,Pair<WalState,Path>> logsState = new HashMap<>();
       // Scan for log file info first: the order is important
@@ -164,7 +168,7 @@ public class GarbageCollectWriteAheadLogs {
       Map<UUID,TServerInstance> uuidToTServer;
       span = Trace.start("removeEntriesInUse");
       try {
-        uuidToTServer = removeEntriesInUse(logsByServer, currentServers, logsState);
+        uuidToTServer = removeEntriesInUse(logsByServer, currentServers, logsState, recoveryLogs);
         count = uuidToTServer.size();
       } catch (Exception ex) {
         log.error("Unable to scan metadata table", ex);
@@ -199,6 +203,9 @@ public class GarbageCollectWriteAheadLogs {
       long removeStop = System.currentTimeMillis();
       log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count,
           logsByServer.size(), (removeStop - logEntryScanStop) / 1000.));
+
+      count = removeFiles(recoveryLogs.values());
+      log.info("{} recovery logs removed", count);
       span.stop();
 
       span = Trace.start("removeMarkers");
@@ -244,36 +251,53 @@ public class GarbageCollectWriteAheadLogs {
     return result;
   }
 
+  private long removeFile(Path path) {
+    try {
+      if (!useTrash || !fs.moveToTrash(path)) {
+        fs.deleteRecursively(path);
+      }
+      return 1;
+    } catch (FileNotFoundException ex) {
+      log.debug("Attempted to delete WAL {} that did not exists : {}", path, ex.getMessage());
+    } catch (IOException ex) {
+      log.error("Unable to delete wal {}", path, ex);
+    }
+
+    return 0;
+  }
+
   private long removeFiles(Collection<Pair<WalState,Path>> collection, final
GCStatus status) {
     for (Pair<WalState,Path> stateFile : collection) {
       Path path = stateFile.getSecond();
-      log.debug("Removing " + stateFile.getFirst() + " WAL " + path);
-      try {
-        if (!useTrash || !fs.moveToTrash(path)) {
-          fs.deleteRecursively(path);
-        }
-        status.currentLog.deleted++;
-      } catch (FileNotFoundException ex) {
-        // ignored
-      } catch (IOException ex) {
-        log.error("Unable to delete wal " + path + ": " + ex);
-      }
+      log.debug("Removing {} WAL {}", stateFile.getFirst(), path);
+      status.currentLog.deleted += removeFile(path);
     }
     return status.currentLog.deleted;
   }
 
+  private long removeFiles(Collection<Path> values) {
+    long count = 0;
+    for (Path path : values) {
+      log.debug("Removing recovery log {}", path);
+      count += removeFile(path);
+    }
+    return count;
+  }
+
   private UUID path2uuid(Path path) {
     return UUID.fromString(path.getName());
   }
 
   private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUID>>
candidates,
-      Set<TServerInstance> liveServers, Map<UUID,Pair<WalState,Path>> logsState)
-      throws IOException, KeeperException, InterruptedException {
+      Set<TServerInstance> liveServers, Map<UUID,Pair<WalState,Path>> logsState,
+      Map<UUID,Path> recoveryLogs) throws IOException, KeeperException, InterruptedException
{
 
     Map<UUID,TServerInstance> result = new HashMap<>();
     for (Entry<TServerInstance,Set<UUID>> entry : candidates.entrySet()) {
       for (UUID id : entry.getValue()) {
-        result.put(id, entry.getKey());
+        if (result.put(id, entry.getKey()) != null) {
+          throw new IllegalArgumentException("WAL " + id + " owned by multiple tservers");
+        }
       }
     }
 
@@ -287,9 +311,8 @@ public class GarbageCollectWriteAheadLogs {
       if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
         Set<UUID> idsToIgnore = candidates.remove(state.current);
         if (idsToIgnore != null) {
-          for (UUID id : idsToIgnore) {
-            result.remove(id);
-          }
+          result.keySet().removeAll(idsToIgnore);
+          recoveryLogs.keySet().removeAll(idsToIgnore);
         }
       }
       // Tablet is being recovered and has WAL references, remove all the WALs for the dead
server
@@ -301,9 +324,8 @@ public class GarbageCollectWriteAheadLogs {
           // There's a reference to a log file, so skip that server's logs
           Set<UUID> idsToIgnore = candidates.remove(dead);
           if (idsToIgnore != null) {
-            for (UUID id : idsToIgnore) {
-              result.remove(id);
-            }
+            result.keySet().removeAll(idsToIgnore);
+            recoveryLogs.keySet().removeAll(idsToIgnore);
           }
         }
       }
@@ -320,6 +342,8 @@ public class GarbageCollectWriteAheadLogs {
             result.remove(id);
           }
         }
+
+        recoveryLogs.keySet().removeAll(idsForServer);
       }
     }
     return result;
@@ -384,4 +408,29 @@ public class GarbageCollectWriteAheadLogs {
     }
     return result;
   }
+
+  /**
+   * Looks for write-ahead logs in recovery directories.
+   *
+   * @return map of log uuids to paths
+   */
+  protected Map<UUID,Path> getSortedWALogs() throws IOException {
+    Map<UUID,Path> result = new HashMap<>();
+
+    for (String dir : ServerConstants.getRecoveryDirs()) {
+      Path recoveryDir = new Path(dir);
+      if (fs.exists(recoveryDir)) {
+        for (FileStatus status : fs.listStatus(recoveryDir)) {
+          try {
+            UUID logId = path2uuid(status.getPath());
+            result.put(logId, status.getPath());
+          } catch (IllegalArgumentException iae) {
+            log.debug("Ignoring file " + status.getPath() + " because it doesn't look like
a uuid");
+          }
+
+        }
+      }
+    }
+    return result;
+  }
 }
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 556a336..58c9089 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -106,6 +106,11 @@ public class GarbageCollectWriteAheadLogsTest {
           throws IOException, KeeperException, InterruptedException {
         return 0;
       }
+
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() throws IOException {
+        return Collections.emptyMap();
+      }
     };
     gc.collect(status);
     EasyMock.verify(context, fs, marker, tserverSet);
@@ -131,6 +136,11 @@ public class GarbageCollectWriteAheadLogsTest {
           throws IOException, KeeperException, InterruptedException {
         return 0;
       }
+
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() throws IOException {
+        return Collections.emptyMap();
+      }
     };
     gc.collect(status);
     EasyMock.verify(context, marker, tserverSet, fs);
@@ -172,7 +182,12 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expectLastCall().once();
     EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
     GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer1List);
+        tserverSet, marker, tabletOnServer1List) {
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() throws IOException {
+        return Collections.emptyMap();
+      }
+    };
     gc.collect(status);
     EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
   }
@@ -208,7 +223,12 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
     EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
     GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer2List);
+        tserverSet, marker, tabletOnServer2List) {
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() throws IOException {
+        return Collections.emptyMap();
+      }
+    };
     gc.collect(status);
     EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
   }
@@ -250,7 +270,12 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
     EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
     GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer1List);
+        tserverSet, marker, tabletOnServer1List) {
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() throws IOException {
+        return Collections.emptyMap();
+      }
+    };
     gc.collect(status);
     EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
   }

-- 
To stop receiving notification emails like this one, please contact
kturner@apache.org.

Mime
View raw message