Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D0B32EB1A for ; Thu, 28 Feb 2013 22:08:20 +0000 (UTC) Received: (qmail 58925 invoked by uid 500); 28 Feb 2013 22:08:20 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 58892 invoked by uid 500); 28 Feb 2013 22:08:20 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 58884 invoked by uid 99); 28 Feb 2013 22:08:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Feb 2013 22:08:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Feb 2013 22:08:19 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 290C423888EA; Thu, 28 Feb 2013 22:08:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1451386 - in /accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server: Accumulo.java gc/GarbageCollectWriteAheadLogs.java Date: Thu, 28 Feb 2013 22:07:59 -0000 To: commits@accumulo.apache.org From: kturner@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130228220800.290C423888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kturner Date: Thu Feb 28 22:07:58 2013 New Revision: 1451386 URL: http://svn.apache.org/r1451386 Log: ACCUMULO-1126 Made AGC clean up unrefed sorted walogs. Also made it delete unrefed walogs of offline servers. Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java?rev=1451386&r1=1451385&r2=1451386&view=diff ============================================================================== --- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java (original) +++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/Accumulo.java Thu Feb 28 22:07:58 2013 @@ -165,7 +165,7 @@ public class Accumulo { log.error(t, t); } } - }, 1000, 10 * 1000); + }, 1000, 10 * 60 * 1000); } public static String getLocalAddress(String[] args) throws UnknownHostException { Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1451386&r1=1451385&r2=1451386&view=diff ============================================================================== --- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original) +++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Thu Feb 28 22:07:58 2013 @@ -20,10 +20,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.Constants; @@ -72,6 +74,9 @@ public class GarbageCollectWriteAheadLog Span span = Trace.start("scanServers"); try { + + Set sortedWALogs = getSortedWALogs(); + status.currentLog.started = System.currentTimeMillis(); Map fileToServerMap = new HashMap(); @@ -84,7 +89,7 @@ public class GarbageCollectWriteAheadLog span = Trace.start("removeMetadataEntries"); try { - count = removeMetadataEntries(fileToServerMap, status); + count = removeMetadataEntries(fileToServerMap, sortedWALogs, status); } catch (Exception ex) { log.error("Unable to scan metadata table", ex); return; @@ -98,7 +103,7 @@ public class GarbageCollectWriteAheadLog span = Trace.start("removeFiles"); Map> serverToFileMap = mapServersToFiles(fileToServerMap); - count = removeFiles(serverToFileMap, status); + count = removeFiles(serverToFileMap, sortedWALogs, status); long removeStop = System.currentTimeMillis(); log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.)); @@ -126,7 +131,7 @@ public class GarbageCollectWriteAheadLog } } - private int removeFiles(Map> serverToFileMap, final GCStatus status) { + private int removeFiles(Map> serverToFileMap, Set sortedWALogs, final GCStatus status) { AccumuloConfiguration conf = instance.getConfiguration(); for (Entry> entry : serverToFileMap.entrySet()) { if (entry.getKey().length() == 0) { @@ -143,22 +148,50 @@ public class GarbageCollectWriteAheadLog } } else { InetSocketAddress address = AddressUtil.parseAddress(entry.getKey(), Property.TSERV_CLIENTPORT); - if (!holdsLock(address)) + if (!holdsLock(address)) { + Path serverPath = new Path(Constants.getWalDirectory(conf), entry.getKey()); + for (String filename : entry.getValue()) { + log.debug("Removing WAL for offline server " + filename); + try { + Path path = new Path(serverPath, filename); + if (trash == null || !trash.moveToTrash(path)) + fs.delete(path, true); + } catch (IOException ex) { + log.error("Unable to delete wal " + filename + ": " + ex); + } + } continue; - Client tserver = null; - try { - tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), entry.getValue()); - log.debug("deleted " + entry.getValue() + " from " + entry.getKey()); - status.currentLog.deleted += entry.getValue().size(); - } catch (TException e) { - log.warn("Error talking to " + address + ": " + e); - } finally { - if (tserver != null) - ThriftUtil.returnClient(tserver); + } else { + Client tserver = null; + try { + tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), entry.getValue()); + log.debug("deleted " + entry.getValue() + " from " + entry.getKey()); + status.currentLog.deleted += entry.getValue().size(); + } catch (TException e) { + log.warn("Error talking to " + address + ": " + e); + } finally { + if (tserver != null) + ThriftUtil.returnClient(tserver); + } + } + } + } + + Path recoveryDir = new Path(Constants.getRecoveryDir(conf)); + + for (String sortedWALog : sortedWALogs) { + log.debug("Removing sorted WAL " + sortedWALog); + try { + Path swalog = new Path(recoveryDir, sortedWALog); + if (trash == null || !trash.moveToTrash(swalog)) { + fs.delete(swalog, true); } + } catch (IOException ioe) { + log.error("Unable to delete sorted walog " + sortedWALogs + ": " + ioe); } } + return 0; } @@ -175,7 +208,8 @@ public class GarbageCollectWriteAheadLog return serverToFileMap; } - private static int removeMetadataEntries(Map fileToServerMap, GCStatus status) throws IOException, KeeperException, InterruptedException { + private static int removeMetadataEntries(Map fileToServerMap, Set sortedWALogs, GCStatus status) throws IOException, KeeperException, + InterruptedException { int count = 0; Iterator iterator = MetadataTable.getLogEntries(SecurityConstants.getSystemCredentials()); while (iterator.hasNext()) { @@ -183,6 +217,9 @@ public class GarbageCollectWriteAheadLog filename = filename.split("/", 2)[1]; if (fileToServerMap.remove(filename) != null) status.currentLog.inUse++; + + sortedWALogs.remove(filename); + count++; } } @@ -214,6 +251,25 @@ public class GarbageCollectWriteAheadLog return count; } + private Set getSortedWALogs() throws IOException { + AccumuloConfiguration conf = instance.getConfiguration(); + Path recoveryDir = new Path(Constants.getRecoveryDir(conf)); + + Set sortedWALogs = new HashSet(); + + if (fs.exists(recoveryDir)) { + for (FileStatus status : fs.listStatus(recoveryDir)) { + if (isUUID(status.getPath().getName())) { + sortedWALogs.add(status.getPath().getName()); + } else { + log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid"); + } + } + } + + return sortedWALogs; + } + /** * @param name * @return