Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3CBB610C33 for ; Mon, 17 Jun 2013 13:27:18 +0000 (UTC) Received: (qmail 75492 invoked by uid 500); 17 Jun 2013 13:27:16 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 75456 invoked by uid 500); 17 Jun 2013 13:27:16 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 75436 invoked by uid 99); 17 Jun 2013 13:27:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Jun 2013 13:27:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Jun 2013 13:27:05 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C690D2388A29; Mon, 17 Jun 2013 13:26:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1493756 [1/2] - in /accumulo/branches/ACCUMULO-118: core/src/main/java/org/apache/accumulo/core/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apac... Date: Mon, 17 Jun 2013 13:26:43 -0000 To: commits@accumulo.apache.org From: ecn@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130617132645.C690D2388A29@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ecn Date: Mon Jun 17 13:26:43 2013 New Revision: 1493756 URL: http://svn.apache.org/r1493756 Log: ACCUMULO-118 fixed log recovery, du Removed: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/Constants.java accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/Constants.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/Constants.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/Constants.java (original) +++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/Constants.java Mon Jun 17 13:26:43 2013 @@ -178,34 +178,9 @@ public class Constants { return conf.get(Property.INSTANCE_DFS_DIR); } - public static String getTablesDir(final AccumuloConfiguration conf) { - return getBaseDir(conf) + "/tables"; - } - - public static String getRecoveryDir(final AccumuloConfiguration conf) { - return getBaseDir(conf) + "/recovery"; - } - public static Path getDataVersionLocation(final AccumuloConfiguration conf) { return new Path(getBaseDir(conf) + "/version"); } - public static String getMetadataTableDir(final AccumuloConfiguration conf) { - return getTablesDir(conf) + "/" + METADATA_TABLE_ID; - } - - public static String getRootTabletDir(final AccumuloConfiguration conf) { - return getMetadataTableDir(conf) + ZROOT_TABLET; - } - - - /** - * @param conf - * @return The write-ahead log directory. - */ - public static String getWalDirectory(final AccumuloConfiguration conf) { - return getBaseDir(conf) + "/wal"; - } - - public static final String AUDITLOG = "Audit"; + public static final String AUDITLOG = "Audit"; } Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java (original) +++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java Mon Jun 17 13:26:43 2013 @@ -1274,7 +1274,6 @@ public class TableOperationsImpl extends try { FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration()); - ; Map props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE)); for (String propKey : props.keySet()) { Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java (original) +++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java Mon Jun 17 13:26:43 2013 @@ -229,14 +229,20 @@ class OfflineIterator implements Iterato if (currentExtent != null && !extent.isPreviousExtent(currentExtent)) throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent); - - String tablesDir = Constants.getTablesDir(instance.getConfiguration()); + + // TODO: ACCUMULO-118 needs fullpaths List absFiles = new ArrayList(); for (String relPath : relFiles) { - if (relPath.startsWith("..")) - absFiles.add(tablesDir + relPath.substring(2)); - else - absFiles.add(tablesDir + "/" + tableId + relPath); + if (relFiles.contains(":")) { + absFiles.add(relPath); + } else { + throw new RuntimeException("Unimplemented: offline scanner over relative paths"); +// if (relPath.startsWith("..")) { +// absFiles.add(fs.getFullPath(tablesDir + relPath.substring(2)); +// } else { +// absFiles.add(tablesDir + "/" + tableId + relPath); +// } + } } iter = createIterator(extent, absFiles); Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java (original) +++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java Mon Jun 17 13:26:43 2013 @@ -437,6 +437,8 @@ public class ThriftScanner { List results = new ArrayList(sr.results.size()); for (TKeyValue tkv : sr.results) results.add(new KeyValue(new Key(tkv.key), tkv.value)); + for (KeyValue r : results) + log.trace("Got result " + r); return results; Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java (original) +++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java Mon Jun 17 13:26:43 2013 @@ -306,11 +306,7 @@ public class MockTableOperations extends public List getDiskUsage(Set tables) throws AccumuloException, AccumuloSecurityException { List diskUsages = new ArrayList(); - for(String table : tables) { - TreeSet tree = new TreeSet(); - tree.add(table); - diskUsages.add(new DiskUsage(tree, 1l)); - } + diskUsages.add(new DiskUsage(new TreeSet(tables), 0l)); return diskUsages; } Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original) +++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java Mon Jun 17 13:26:43 2013 @@ -70,8 +70,7 @@ public enum Property { "A secret unique to a given instance that all servers must know in order to communicate with one another." + " Change it before initialization. To change it later use ./bin/accumulo accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd], " + " and then update conf/accumulo-site.xml everywhere."), - INSTANCE_NAMESPACES("instance.namespaces", "", PropertyType.STRING, - "A list of namespaces to use."), + INSTANCE_NAMESPACES("instance.namespaces", "", PropertyType.STRING, "A list of namespaces to use."), INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", "org.apache.accumulo.server.security.handler.ZKAuthenticator", PropertyType.CLASSNAME, "The authenticator class that accumulo will use to determine if a user has privilege to perform an action"), INSTANCE_SECURITY_AUTHORIZOR("instance.security.authorizor", "org.apache.accumulo.server.security.handler.ZKAuthorizor", PropertyType.CLASSNAME, Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java (original) +++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java Mon Jun 17 13:26:43 2013 @@ -22,17 +22,13 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.ConfigurationCopy; -import org.apache.accumulo.core.util.TableDiskUsage; -import org.apache.accumulo.core.util.TableDiskUsage.Printer; +import org.apache.accumulo.core.client.admin.DiskUsage; +import org.apache.accumulo.core.util.NumUtil; import org.apache.accumulo.core.util.shell.Shell; import org.apache.accumulo.core.util.shell.Shell.Command; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; public class DUCommand extends Command { @@ -51,22 +47,17 @@ public class DUCommand extends Command { } } } else { - shellState.checkTableState(); - tablesToFlush.add(shellState.getTableName()); + if (tablesToFlush.isEmpty()) { + shellState.checkTableState(); + tablesToFlush.add(shellState.getTableName()); + } } try { - final AccumuloConfiguration acuConf = new ConfigurationCopy(shellState.getConnector().instanceOperations().getSystemConfiguration()); - TableDiskUsage.printDiskUsage(acuConf, tablesToFlush, FileSystem.get(new Configuration()), shellState.getConnector(), new Printer() { - @Override - public void print(String line) { - try { - shellState.getReader().println(line); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - }, prettyPrint); + String valueFormat = prettyPrint ? "%9s" : "%,24d"; + for (DiskUsage usage : shellState.getConnector().tableOperations().getDiskUsage(tablesToFlush)) { + Object value = prettyPrint ? NumUtil.bigNumberForSize(usage.getUsage()) : usage.getUsage(); + shellState.getReader().println(String.format(valueFormat + " %s", value, usage.getTables())); + } } catch (Exception ex) { throw new RuntimeException(ex); } Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java Mon Jun 17 13:26:43 2013 @@ -16,23 +16,31 @@ */ package org.apache.accumulo.server; +import static org.apache.accumulo.core.Constants.METADATA_TABLE_ID; +import static org.apache.accumulo.core.Constants.ZROOT_TABLET; + import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import static org.apache.accumulo.core.Constants.*; - public class ServerConstants { + // these are functions to delay loading the Accumulo configuration unless we must public static String[] getBaseDirs() { String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR); String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_NAMESPACES); - if (ns == null) { - return new String[] { singleNamespace }; + if (ns == null || ns.isEmpty()) { + Configuration hadoopConfig = CachedConfiguration.getInstance(); + String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace; + return new String[] { fullPath }; } String namespaces[] = ns.split(","); if (namespaces.length < 2) { - return new String[] { singleNamespace }; + Configuration hadoopConfig = CachedConfiguration.getInstance(); + String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace; + return new String[] { fullPath }; } return prefix(namespaces, singleNamespace); } @@ -53,6 +61,14 @@ public class ServerConstants { return prefix(getBaseDirs(), "recovery"); } + public static String[] getWalDirs() { + return prefix(getBaseDirs(), "wal"); + } + + public static String[] getWalogArchives() { + return prefix(getBaseDirs(), "walogArchive"); + } + public static Path getInstanceIdLocation() { return new Path(ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR) + "/instance_id"); } @@ -68,5 +84,5 @@ public class ServerConstants { public static String getRootTabletDir() { return prefix(getMetadataTableDirs(), ZROOT_TABLET)[0]; } - + } Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java Mon Jun 17 13:26:43 2013 @@ -52,10 +52,10 @@ import org.apache.accumulo.core.security import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.util.TableDiskUsage; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.util.TableDiskUsage; import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; import org.apache.accumulo.trace.thrift.TInfo; @@ -339,7 +339,7 @@ public class ClientServiceHandler implem FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), conf); // use the same set of tableIds that were validated above to avoid race conditions - Map,Long> diskUsage = TableDiskUsage.getDiskUsage(new ServerConfiguration(instance).getConfiguration(), tableIds, fs, conn, false); + Map,Long> diskUsage = TableDiskUsage.getDiskUsage(new ServerConfiguration(instance).getConfiguration(), tableIds, fs, conn); List retUsages = new ArrayList(); for (Map.Entry,Long> usageItem : diskUsage.entrySet()) { retUsages.add(new TDiskUsage(new ArrayList(usageItem.getKey()), usageItem.getValue())); Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java Mon Jun 17 13:26:43 2013 @@ -32,7 +32,7 @@ public class FileRef implements Comparab public FileRef(FileSystem fs, Key key) { metaReference = key.getColumnQualifier().toString(); - fullReference = new Path(fs.getFullPath(key)); + fullReference = fs.getFullPath(key); } public FileRef(String metaReference, Path fullReference) { Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java Mon Jun 17 13:26:43 2013 @@ -109,10 +109,10 @@ public interface FileSystem { FileStatus[] globStatus(Path path) throws IOException; // Convert a file or directory !METADATA reference into a path - String getFullPath(Key key); + Path getFullPath(Key key); // Given a filename, figure out the qualified path given multiple namespaces - String getFullPath(String paths[], String fileName) throws IOException; + Path getFullPath(String paths[], String fileName) throws IOException; // forward to the appropriate FileSystem object ContentSummary getContentSummary(String dir); Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java Mon Jun 17 13:26:43 2013 @@ -23,15 +23,17 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.commons.lang.NotImplementedException; @@ -307,9 +309,24 @@ public class FileSystemImpl implements o AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance()); return get(conf); } + + static private final String DEFAULT = ""; public static org.apache.accumulo.server.fs.FileSystem get(AccumuloConfiguration conf) throws IOException { - return new FileSystemImpl(Collections.singletonMap("", FileSystem.get(CachedConfiguration.getInstance())), "", conf); + Map fileSystems = new HashMap(); + Configuration hadoopConf = CachedConfiguration.getInstance(); + fileSystems.put(DEFAULT, FileSystem.get(hadoopConf)); + String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_NAMESPACES); + if (ns != null) { + for (String space : ns.split(",")) { + if (space.contains(":")) { + fileSystems.put(space, new Path(space).getFileSystem(hadoopConf)); + } else { + fileSystems.put(space, FileSystem.get(hadoopConf)); + } + } + } + return new FileSystemImpl(fileSystems, "", conf); } @Override @@ -364,11 +381,11 @@ public class FileSystemImpl implements o } @Override - public String getFullPath(Key key) { + public Path getFullPath(Key key) { String relPath = key.getColumnQualifierData().toString(); if (relPath.contains(":")) - return relPath; + return new Path(relPath); byte [] tableId = KeyExtent.tableOfMetadataRow(key.getRow()); @@ -376,10 +393,9 @@ public class FileSystemImpl implements o relPath = relPath.substring(2); else relPath = "/" + new String(tableId) + relPath; - String fullPath = Constants.getTablesDir(conf) + relPath; + String fullPath = ServerConstants.getTablesDirs()[0] + relPath; FileSystem ns = getFileSystemByPath(fullPath); - String result = ns.makeQualified(new Path(fullPath)).toString(); - return result; + return ns.makeQualified(new Path(fullPath)); } @Override @@ -404,20 +420,26 @@ public class FileSystemImpl implements o } @Override - public String getFullPath(String[] paths, String fileName) throws IOException { + public Path getFullPath(String[] paths, String fileName) throws IOException { if (fileName.contains(":")) - return fileName; + return new Path(fileName); + // TODO: ACCUMULO-118 + // How do we want it to work? Find it somewhere? or find it in the default file system? // old-style name, on one of many possible "root" paths: if (fileName.startsWith("../")) fileName = fileName.substring(2); for (String path : paths) { - String fullPath = path + fileName; + String fullPath; + if (path.endsWith("/") || fileName.startsWith("/")) + fullPath = path + fileName; + else + fullPath = path + "/" + fileName; FileSystem ns = getFileSystemByPath(fullPath); Path exists = new Path(fullPath); if (ns.exists(exists)) - return ns.makeQualified(exists).toString(); + return ns.makeQualified(exists); } - throw new RuntimeException("Could not find file " + fileName + " in " + Arrays.asList(paths)); + throw new IOException("Could not find file " + fileName + " in " + Arrays.asList(paths)); } @Override Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Mon Jun 17 13:26:43 2013 @@ -38,6 +38,7 @@ import org.apache.accumulo.core.tabletse import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.fs.FileSystem; import org.apache.accumulo.server.security.SecurityConstants; import org.apache.accumulo.server.util.AddressUtil; @@ -71,11 +72,11 @@ public class GarbageCollectWriteAheadLog Span span = Trace.start("scanServers"); try { - Set sortedWALogs = getSortedWALogs(); + Set sortedWALogs = getSortedWALogs(); status.currentLog.started = System.currentTimeMillis(); - Map fileToServerMap = new HashMap(); + Map fileToServerMap = new HashMap(); int count = scanServers(fileToServerMap); long fileScanStop = System.currentTimeMillis(); log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count, @@ -97,7 +98,7 @@ public class GarbageCollectWriteAheadLog log.info(String.format("%d log entries scanned in %.2f seconds", count, (logEntryScanStop - fileScanStop) / 1000.)); span = Trace.start("removeFiles"); - Map> serverToFileMap = mapServersToFiles(fileToServerMap); + Map> serverToFileMap = mapServersToFiles(fileToServerMap); count = removeFiles(serverToFileMap, sortedWALogs, status); @@ -127,39 +128,36 @@ public class GarbageCollectWriteAheadLog } } - private int removeFiles(Map> serverToFileMap, Set sortedWALogs, final GCStatus status) { + private int removeFiles(Map> serverToFileMap, Set sortedWALogs, final GCStatus status) { AccumuloConfiguration conf = instance.getConfiguration(); - for (Entry> entry : serverToFileMap.entrySet()) { - if (entry.getKey().length() == 0) { + for (Entry> entry : serverToFileMap.entrySet()) { + if (entry.getKey().isEmpty()) { // old-style log entry, just remove it - for (String filename : entry.getValue()) { - log.debug("Removing old-style WAL " + entry.getValue()); + for (Path path : entry.getValue()) { + log.debug("Removing old-style WAL " + path); try { - Path path = new Path(Constants.getWalDirectory(conf), filename); if (!useTrash || !fs.moveToTrash(path)) fs.deleteRecursively(path); status.currentLog.deleted++; } catch (FileNotFoundException ex) { // ignored } catch (IOException ex) { - log.error("Unable to delete wal " + filename + ": " + ex); + log.error("Unable to delete wal " + path + ": " + ex); } } } else { InetSocketAddress address = AddressUtil.parseAddress(entry.getKey()); if (!holdsLock(address)) { - Path serverPath = new Path(Constants.getWalDirectory(conf), entry.getKey()); - for (String filename : entry.getValue()) { - log.debug("Removing WAL for offline server " + filename); + for (Path path : entry.getValue()) { + log.debug("Removing WAL for offline server " + path); try { - Path path = new Path(serverPath, filename); if (!useTrash || !fs.moveToTrash(path)) fs.deleteRecursively(path); status.currentLog.deleted++; } catch (FileNotFoundException ex) { // ignored } catch (IOException ex) { - log.error("Unable to delete wal " + filename + ": " + ex); + log.error("Unable to delete wal " + path + ": " + ex); } } continue; @@ -167,7 +165,7 @@ public class GarbageCollectWriteAheadLog Client tserver = null; try { tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), entry.getValue()); + tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), paths2strings(entry.getValue())); log.debug("deleted " + entry.getValue() + " from " + entry.getKey()); status.currentLog.deleted += entry.getValue().size(); } catch (TException e) { @@ -180,11 +178,8 @@ public class GarbageCollectWriteAheadLog } } - Path recoveryDir = new Path(Constants.getRecoveryDir(conf)); - - for (String sortedWALog : sortedWALogs) { - log.debug("Removing sorted WAL " + sortedWALog); - Path swalog = new Path(recoveryDir, sortedWALog); + for (Path swalog : sortedWALogs) { + log.debug("Removing sorted WAL " + swalog); try { if (!useTrash || !fs.moveToTrash(swalog)) { fs.deleteRecursively(swalog); @@ -194,10 +189,10 @@ public class GarbageCollectWriteAheadLog } catch (IOException ioe) { try { if (fs.exists(swalog)) { - log.error("Unable to delete sorted walog " + sortedWALog + ": " + ioe); + log.error("Unable to delete sorted walog " + swalog + ": " + ioe); } } catch (IOException ex) { - log.error("Unable to check for the existence of " + sortedWALog, ex); + log.error("Unable to check for the existence of " + swalog, ex); } } } @@ -205,30 +200,42 @@ public class GarbageCollectWriteAheadLog return 0; } - private static Map> mapServersToFiles(Map fileToServerMap) { - Map> serverToFileMap = new HashMap>(); - for (Entry fileServer : fileToServerMap.entrySet()) { - ArrayList files = serverToFileMap.get(fileServer.getValue()); + private List paths2strings(ArrayList paths) { + List result = new ArrayList(paths.size()); + for (Path path : paths) + result.add(path.toString()); + return result; + } + + private static Map> mapServersToFiles(Map fileToServerMap) { + Map> result = new HashMap>(); + for (Entry fileServer : fileToServerMap.entrySet()) { + ArrayList files = result.get(fileServer.getValue()); if (files == null) { - files = new ArrayList(); - serverToFileMap.put(fileServer.getValue(), files); + files = new ArrayList(); + result.put(fileServer.getValue(), files); } files.add(fileServer.getKey()); } - return serverToFileMap; + return result; } - private static int removeMetadataEntries(Map fileToServerMap, Set sortedWALogs, GCStatus status) throws IOException, KeeperException, + private static int removeMetadataEntries(Map fileToServerMap, Set sortedWALogs, GCStatus status) throws IOException, KeeperException, InterruptedException { int count = 0; Iterator iterator = MetadataTable.getLogEntries(SecurityConstants.getSystemCredentials()); while (iterator.hasNext()) { for (String filename : iterator.next().logSet) { - filename = filename.split("/", 2)[1]; - if (fileToServerMap.remove(filename) != null) + Path path; + if (filename.contains(":")) + path = new Path(filename); + else + path = new Path(ServerConstants.getWalDirs()[0] + filename); + + if (fileToServerMap.remove(path) != null) status.currentLog.inUse++; - sortedWALogs.remove(filename); + sortedWALogs.remove(path); count++; } @@ -236,48 +243,52 @@ public class GarbageCollectWriteAheadLog return count; } - private int scanServers(Map fileToServerMap) throws Exception { - AccumuloConfiguration conf = instance.getConfiguration(); - Path walRoot = new Path(Constants.getWalDirectory(conf)); - for (FileStatus status : fs.listStatus(walRoot)) { - String name = status.getPath().getName(); - if (status.isDir()) { - for (FileStatus file : fs.listStatus(new Path(walRoot, name))) { - if (isUUID(file.getPath().getName())) - fileToServerMap.put(file.getPath().getName(), name); - else { - log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid"); + private int scanServers(Map fileToServerMap) throws Exception { + Set servers = new HashSet(); + for (String walDir : ServerConstants.getWalDirs()) { + Path walRoot = new Path(walDir); + FileStatus[] listing = fs.listStatus(walRoot); + if (listing == null) + continue; + for (FileStatus status : listing) { + String server = status.getPath().getName(); + servers.add(server); + if (status.isDir()) { + for (FileStatus file : fs.listStatus(new Path(walRoot, server))) { + if (isUUID(file.getPath().getName())) + fileToServerMap.put(file.getPath(), server); + else { + log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid"); + } } + } else if (isUUID(server)) { + // old-style WAL are not under a directory + fileToServerMap.put(status.getPath(), ""); + } else { + log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid"); } - } else if (isUUID(name)) { - // old-style WAL are not under a directory - fileToServerMap.put(name, ""); - } else { - log.info("Ignoring file " + name + " because it doesn't look like a uuid"); } } - - int count = 0; - return count; + return servers.size(); } - private Set getSortedWALogs() throws IOException { - AccumuloConfiguration conf = instance.getConfiguration(); - Path recoveryDir = new Path(Constants.getRecoveryDir(conf)); - - Set sortedWALogs = new HashSet(); + private Set getSortedWALogs() throws IOException { + Set result = new HashSet(); - if (fs.exists(recoveryDir)) { - for (FileStatus status : fs.listStatus(recoveryDir)) { - if (isUUID(status.getPath().getName())) { - sortedWALogs.add(status.getPath().getName()); - } else { - log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid"); + for (String dir : ServerConstants.getRecoveryDirs()) { + Path recoveryDir = new Path(dir); + + if (fs.exists(recoveryDir)) { + for (FileStatus status : fs.listStatus(recoveryDir)) { + if (isUUID(status.getPath().getName())) { + result.add(status.getPath()); + } else { + log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid"); + } } } } - - return sortedWALogs; + return result; } /** Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Mon Jun 17 13:26:43 2013 @@ -572,7 +572,10 @@ public class SimpleGarbageCollector impl delete = cf.substring(2); } else { String table = new String(KeyExtent.tableOfMetadataRow(entry.getKey().getRow())); - delete = "/" + table + cf; + if (cf.startsWith("/")) + delete = "/" + table + cf; + else + delete = "/" + table + "/" + cf; } // WARNING: This line is EXTREMELY IMPORTANT. // You MUST REMOVE candidates that are still in use @@ -662,25 +665,27 @@ public class SimpleGarbageCollector impl boolean removeFlag; try { - String fullPath = fs.getFullPath(ServerConstants.getTablesDirs(), delete); + Path fullPath; + if (delete.contains(":")) + fullPath = new Path(delete.split("/", 3)[2]); + else + fullPath = fs.getFullPath(ServerConstants.getTablesDirs(), delete); log.debug("Deleting " + fullPath); - Path p = new Path(fullPath); - - if (moveToTrash(p) || fs.deleteRecursively(p)) { + if (moveToTrash(fullPath) || fs.deleteRecursively(fullPath)) { // delete succeeded, still want to delete removeFlag = true; synchronized (SimpleGarbageCollector.this) { ++status.current.deleted; } - } else if (fs.exists(p)) { + } else if (fs.exists(fullPath)) { // leave the entry in the METADATA table; we'll try again // later removeFlag = false; synchronized (SimpleGarbageCollector.this) { ++status.current.errors; } - log.warn("File exists, but was not deleted for an unknown reason: " + p); + log.warn("File exists, but was not deleted for an unknown reason: " + fullPath); } else { // this failure, we still want to remove the METADATA table // entry @@ -697,7 +702,7 @@ public class SimpleGarbageCollector impl if (tableState != null && tableState != TableState.DELETING) { // clone directories don't always exist if (!tabletDir.startsWith("c-")) - log.warn("File doesn't exist: " + p); + log.warn("File doesn't exist: " + fullPath); } } else { log.warn("Very strange path name: " + delete); Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Mon Jun 17 13:26:43 2013 @@ -113,7 +113,7 @@ public class LogReader { } } else { // read the log entries sorted in a map file - MultiReader input = new MultiReader(fs, file); + MultiReader input = new MultiReader(fs, path); while (input.next(key, value)) { printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations); } Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java Mon Jun 17 13:26:43 2013 @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -34,10 +35,10 @@ import org.apache.accumulo.core.conf.Pro import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.master.Master; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.ZooCache; -import org.apache.accumulo.server.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; @@ -66,38 +67,39 @@ public class RecoveryManager { } private class LogSortTask implements Runnable { - private String filename; - private String host; + private String source; + private String destination; + private String sortId; private LogCloser closer; - public LogSortTask(LogCloser closer, String host, String filename) { + public LogSortTask(LogCloser closer, String source, String destination, String sortId) { this.closer = closer; - this.host = host; - this.filename = filename; + this.source = source; + this.destination = destination; + this.sortId = sortId; } @Override public void run() { boolean rescheduled = false; try { - FileSystem localFs = master.getFileSystem(); - long time = closer.close(master, localFs, getSource(host, filename)); + long time = closer.close(master, master.getFileSystem(), new Path(source)); if (time > 0) { executor.schedule(this, time, TimeUnit.MILLISECONDS); rescheduled = true; } else { - initiateSort(host, filename); + initiateSort(sortId, source, destination); } } catch (FileNotFoundException e) { - log.debug("Unable to initate log sort for " + filename + ": " + e); + log.debug("Unable to initate log sort for " + source + ": " + e); } catch (Exception e) { - log.warn("Failed to initiate log sort " + filename, e); + log.warn("Failed to initiate log sort " + source, e); } finally { if (!rescheduled) { synchronized (RecoveryManager.this) { - closeTasksQueued.remove(filename); + closeTasksQueued.remove(sortId); } } } @@ -105,62 +107,62 @@ public class RecoveryManager { } - private void initiateSort(String host, final String file) throws KeeperException, InterruptedException { - String source = getSource(host, file).toString(); - new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(file, source.getBytes()); + private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException { + String work = source + "|" + destination; + new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes()); synchronized (this) { - sortsQueued.add(file); + sortsQueued.add(sortId); } - final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file; - log.info("Created zookeeper entry " + path + " with data " + source); + final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId; + log.info("Created zookeeper entry " + path + " with data " + work); } - private Path getSource(String server, String file) { - String source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + server + "/" + file; - if (server.contains(":")) { - // old-style logger log, copied from local file systems by tservers, unsorted into the wal base dir - source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + file; - } - return new Path(source); - } + Random random = new Random(); public boolean recoverLogs(KeyExtent extent, Collection> walogs) throws IOException { boolean recoveryNeeded = false; for (Collection logs : walogs) { for (String walog : logs) { - String parts[] = walog.split("/"); - String host = parts[0]; - String filename = parts[1]; + String hostFilename[] = walog.split("/", 2); + String host = hostFilename[0]; + String filename = hostFilename[1]; + String parts[] = filename.split("/"); + String sortId = parts[parts.length - 1]; + // TODO: ACCUMULO-118: choose recovery directory with extension + String[] dirs = ServerConstants.getRecoveryDirs(); + String recoveryDir = dirs[random.nextInt(dirs.length)]; + String dest = recoveryDir + "/" + sortId; + log.debug("Recovering " + filename + " to " + dest + " using sortId " + sortId); boolean sortQueued; synchronized (this) { - sortQueued = sortsQueued.contains(filename); + sortQueued = sortsQueued.contains(sortId); } - if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + filename) == null) { + if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) { synchronized (this) { - sortsQueued.remove(filename); + sortsQueued.remove(sortId); } } - if (master.getFileSystem().exists(new Path(Constants.getRecoveryDir(master.getSystemConfiguration()) + "/" + filename + "/finished"))) { + if (master.getFileSystem().exists(new Path(dest, "finished"))) { synchronized (this) { - closeTasksQueued.remove(filename); - recoveryDelay.remove(filename); - sortsQueued.remove(filename); + closeTasksQueued.remove(sortId); + recoveryDelay.remove(sortId); + sortsQueued.remove(sortId); } continue; } recoveryNeeded = true; synchronized (this) { - if (!closeTasksQueued.contains(filename) && !sortsQueued.contains(filename)) { + if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) { AccumuloConfiguration aconf = master.getConfiguration().getConfiguration(); LogCloser closer = Master.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class, new HadoopLogCloser()); - Long delay = recoveryDelay.get(filename); + Long delay = recoveryDelay.get(sortId); if (delay == null) { delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY); } else { @@ -169,9 +171,9 @@ public class RecoveryManager { log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference"); - executor.schedule(new LogSortTask(closer, host, filename), delay, TimeUnit.MILLISECONDS); - closeTasksQueued.add(filename); - recoveryDelay.put(filename, delay); + executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); + closeTasksQueued.add(sortId); + recoveryDelay.put(sortId, delay); } } } Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java Mon Jun 17 13:26:43 2013 @@ -218,7 +218,7 @@ class WriteExportFiles extends MasterRep entry.getValue().write(dataOut); if (entry.getKey().getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) { - String path = fs.getFullPath(entry.getKey()); + String path = fs.getFullPath(entry.getKey()).toString(); String tokens[] = path.split("/"); if (tokens.length < 1) { throw new RuntimeException("Illegal path " + path); Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java Mon Jun 17 13:26:43 2013 @@ -52,6 +52,7 @@ import org.apache.accumulo.fate.Repo; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.fs.FileSystem; import org.apache.accumulo.server.master.Master; import org.apache.accumulo.server.master.state.tables.TableManager; import org.apache.accumulo.server.security.AuditedSecurityOperation; @@ -61,7 +62,6 @@ import org.apache.accumulo.server.tablet import org.apache.accumulo.server.util.MetadataTable; import org.apache.accumulo.server.util.TablePropUtil; import org.apache.hadoop.fs.FileStatus; -import org.apache.accumulo.server.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; @@ -314,7 +314,7 @@ class MapImportFileNames extends MasterR FileSystem fs = environment.getFileSystem(); fs.mkdirs(new Path(tableInfo.importDir)); - + FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir)); UniqueNameAllocator namer = UniqueNameAllocator.getInstance(); @@ -323,7 +323,7 @@ class MapImportFileNames extends MasterR for (FileStatus fileStatus : files) { String fileName = fileStatus.getPath().getName(); - + log.info("filename " + fileStatus.getPath().toString()); String sa[] = fileName.split("\\."); String extension = ""; if (sa.length > 1) { Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java Mon Jun 17 13:26:43 2013 @@ -16,23 +16,27 @@ */ package org.apache.accumulo.server.master.tableOps; +import java.util.Map.Entry; + import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.master.Master; import org.apache.accumulo.server.master.state.MergeInfo; import org.apache.accumulo.server.master.state.MergeInfo.Operation; import org.apache.accumulo.server.master.state.MergeState; import org.apache.accumulo.server.util.MetadataTable; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; /** @@ -57,13 +61,14 @@ class MakeDeleteEntries extends MasterRe public Repo call(long tid, Master master) throws Exception { log.info("creating delete entries for merged metadata tablets"); Connector conn = master.getConnector(); + Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); + scanner.setRange(Constants.METADATA_ROOT_TABLET_KEYSPACE); + scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY); BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig()); - String tableDir = Constants.getMetadataTableDir(master.getConfiguration().getConfiguration()); - for (FileStatus fs : master.getFileSystem().listStatus(new Path(tableDir))) { + for (Entry entry : scanner) { // TODO: add the entries only if there are no !METADATA table references - ACCUMULO-1308 - if (fs.isDir() && fs.getPath().getName().matches("^" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + ".*")) { - bw.addMutation(MetadataTable.createDeleteMutation(Constants.METADATA_TABLE_ID, "/" + fs.getPath().getName())); - } + FileRef ref = new FileRef(master.getFileSystem(), entry.getKey()); + bw.addMutation(MetadataTable.createDeleteMutation(Constants.METADATA_TABLE_ID, ref.path().toString())); } bw.close(); return null; Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java Mon Jun 17 13:26:43 2013 @@ -51,6 +51,7 @@ import org.apache.accumulo.server.proble import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; @@ -305,10 +306,10 @@ public class FileManager { // open any files that need to be opened for (String file : filesToOpen) { try { - // log.debug("Opening "+file); - String path = fs.getFullPath(ServerConstants.getTablesDirs(), file); + Path path = fs.getFullPath(ServerConstants.getTablesDirs(), file); org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path); - FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()), + //log.debug("Opening "+file + " path " + path); + FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()), dataCache, indexCache); reservedFiles.add(reader); readersReserved.put(reader, file); Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Mon Jun 17 13:26:43 2013 @@ -771,7 +771,7 @@ public class Tablet { mergingMinorCompactionFile = null; } - void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) { + void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) throws IOException { IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); if (extent.isRootTablet()) { @@ -1137,7 +1137,7 @@ public class Tablet { break; } - FileRef ref = new FileRef(entry.getKey().getColumnQualifier().toString(), new Path(fs.getFullPath(entry.getKey()))); + FileRef ref = new FileRef(entry.getKey().getColumnQualifier().toString(), fs.getFullPath(entry.getKey())); datafiles.put(ref, new DataFileValue(entry.getValue().get())); } } @@ -1178,8 +1178,8 @@ public class Tablet { Key key = entry.getKey(); if (key.getRow().equals(row) && key.getColumnFamily().equals(Constants.METADATA_SCANFILE_COLUMN_FAMILY)) { String meta = key.getColumnQualifier().toString(); - String path = fs.getFullPath(ServerConstants.getTablesDirs(), meta); - scanFiles.add(new FileRef(meta, new Path(path))); + Path path = fs.getFullPath(ServerConstants.getTablesDirs(), meta); + scanFiles.add(new FileRef(meta, path)); } } @@ -1376,7 +1376,8 @@ public class Tablet { for (LogEntry logEntry : logEntries) { for (String log : logEntry.logSet) { String[] parts = log.split("/", 2); - currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, parts[1])); + Path file = fs.getFullPath(ServerConstants.getWalDirs(), parts[1]); + currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, file)); } } @@ -2061,9 +2062,9 @@ public class Tablet { commitSession, flushId); span.stop(); return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()); - } catch (RuntimeException E) { + } catch (Exception E) { failed = true; - throw E; + throw new RuntimeException(E); } catch (Error E) { // Weird errors like "OutOfMemoryError" when trying to create the thread for the compaction failed = true; Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Mon Jun 17 13:26:43 2013 @@ -2054,10 +2054,10 @@ public class TabletServer extends Abstra public void removeLogs(TInfo tinfo, TCredentials credentials, List filenames) throws TException { String myname = getClientAddressString(); myname = myname.replace(':', '+'); - Path logDir = new Path(Constants.getWalDirectory(acuConf), myname); Set loggers = new HashSet(); logger.getLoggers(loggers); nextFile: for (String filename : filenames) { + // skip any log we're currently using for (String logger : loggers) { if (logger.contains(filename)) continue nextFile; @@ -2074,28 +2074,30 @@ public class TabletServer extends Abstra } } } + try { - String source = logDir + "/" + filename; + Path source = new Path(filename); if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) { - String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive"; - fs.mkdirs(new Path(walogArchive)); - String dest = walogArchive + "/" + filename; + Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives()); + fs.mkdirs(walogArchive); + Path dest = new Path(walogArchive, source.getName()); log.info("Archiving walog " + source + " to " + dest); - if (!fs.rename(new Path(source), new Path(dest))) + if (!fs.rename(source, dest)) log.error("rename is unsuccessful"); } else { log.info("Deleting walog " + filename); - Path sourcePath = new Path(source); + Path sourcePath = new Path(filename); if (!fs.moveToTrash(sourcePath) && !fs.deleteRecursively(sourcePath)) log.warn("Failed to delete walog " + source); - Path recoveryPath = new Path(Constants.getRecoveryDir(acuConf), filename); - try { - if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath)) - log.info("Deleted any recovery log " + filename); - } catch (FileNotFoundException ex) { - // ignore + for (String recovery : ServerConstants.getRecoveryDirs()) { + Path recoveryPath = new Path(recovery, source.getName()); + try { + if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath)) + log.info("Deleted any recovery log " + filename); + } catch (FileNotFoundException ex) { + // ignore + } } - } } catch (IOException e) { log.warn("Error attempting to delete write-ahead log " + filename + ": " + e); @@ -3243,7 +3245,7 @@ public class TabletServer extends Abstra } public void recover(FileSystem fs, Tablet tablet, List logEntries, Set tabletFiles, MutationReceiver mutationReceiver) throws IOException { - List recoveryLogs = new ArrayList(); + List recoveryLogs = new ArrayList(); List sorted = new ArrayList(logEntries); Collections.sort(sorted, new Comparator() { @Override @@ -3252,14 +3254,13 @@ public class TabletServer extends Abstra } }); for (LogEntry entry : sorted) { - String recovery = null; + Path recovery = null; for (String log : entry.logSet) { String[] parts = log.split("/"); // "host:port/filename" - log = fs.getFullPath(ServerConstants.getRecoveryDirs(), parts[1]); - Path finished = new Path(log + "/finished"); + Path finished = new Path(fs.getFullPath(ServerConstants.getRecoveryDirs(), parts[parts.length - 1]), "finished"); TabletServer.log.info("Looking for " + finished); if (fs.exists(finished)) { - recovery = log; + recovery = finished.getParent(); break; } } Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Mon Jun 17 13:26:43 2013 @@ -31,18 +31,19 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.StringUtil; +import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.fs.FileSystem; import org.apache.accumulo.server.logger.LogFileKey; import org.apache.accumulo.server.logger.LogFileValue; @@ -211,10 +212,10 @@ public class DfsLogger { this.conf = conf; } - public DfsLogger(ServerResources conf, String logger, String filename) throws IOException { + public DfsLogger(ServerResources conf, String logger, Path filename) throws IOException { this.conf = conf; this.logger = logger; - this.logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename); + this.logPath = filename; } public static FSDataInputStream readHeader(FileSystem fs, Path path, Map opts) throws IOException { @@ -241,13 +242,17 @@ public class DfsLogger { } } + // TODO: ACCUMULO-118 + static final Random random = new Random(); + public synchronized void open(String address) throws IOException { String filename = UUID.randomUUID().toString(); logger = StringUtil.join(Arrays.asList(address.split(":")), "+"); log.debug("DfsLogger.open() begin"); + String[] wals = ServerConstants.getWalDirs(); - logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()) + "/" + logger + "/" + filename); + logPath = new Path(wals[random.nextInt(wals.length)] + "/" + logger + "/" + filename); try { FileSystem fs = conf.getFileSystem(); short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION); @@ -334,7 +339,7 @@ public class DfsLogger { } public String getFileName() { - return logPath.getName(); + return logPath.toString(); } public void close() throws IOException { Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Mon Jun 17 13:26:43 2013 @@ -74,21 +74,24 @@ public class LogSorter { @Override public void process(String child, byte[] data) { - String dest = Constants.getRecoveryDir(conf) + "/" + child; - String src = new String(data); - String name = new Path(src).getName(); + String work = new String(data); + String[] parts = work.split("\\|"); + String src = parts[0]; + String dest = parts[1]; + String sortId = new Path(src).getName(); + log.debug("Sorting " + src + " to " + dest + " using sortId " + sortId); synchronized (currentWork) { - if (currentWork.containsKey(name)) + if (currentWork.containsKey(sortId)) return; - currentWork.put(name, this); + currentWork.put(sortId, this); } try { log.info("Copying " + src + " to " + dest); - sort(name, new Path(src), dest); + sort(sortId, new Path(src), dest); } finally { - currentWork.remove(name); + currentWork.remove(sortId); } } Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java Mon Jun 17 13:26:43 2013 @@ -86,9 +86,9 @@ public class MultiReader { private PriorityBuffer heap = new PriorityBuffer(); - public MultiReader(FileSystem fs, String directory) throws IOException { + public MultiReader(FileSystem fs, Path directory) throws IOException { boolean foundFinish = false; - for (FileStatus child : fs.listStatus(new Path(directory))) { + for (FileStatus child : fs.listStatus(directory)) { if (child.getPath().getName().startsWith("_")) continue; if (child.getPath().getName().equals("finished")) { Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java Mon Jun 17 13:26:43 2013 @@ -32,6 +32,7 @@ import org.apache.accumulo.core.data.Mut import org.apache.accumulo.server.fs.FileSystem; import org.apache.accumulo.server.logger.LogFileKey; import org.apache.accumulo.server.logger.LogFileValue; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; /** @@ -88,11 +89,11 @@ public class SortedLogRecovery { } } - public void recover(KeyExtent extent, List recoveryLogs, Set tabletFiles, MutationReceiver mr) throws IOException { + public void recover(KeyExtent extent, List recoveryLogs, Set tabletFiles, MutationReceiver mr) throws IOException { int[] tids = new int[recoveryLogs.size()]; LastStartToFinish lastStartToFinish = new LastStartToFinish(); for (int i = 0; i < recoveryLogs.size(); i++) { - String logfile = recoveryLogs.get(i); + Path logfile = recoveryLogs.get(i); log.info("Looking at mutations from " + logfile + " for " + extent); MultiReader reader = new MultiReader(fs, logfile); try { @@ -119,7 +120,7 @@ public class SortedLogRecovery { throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) not followed by successful minor compaction"); for (int i = 0; i < recoveryLogs.size(); i++) { - String logfile = recoveryLogs.get(i); + Path logfile = recoveryLogs.get(i); MultiReader reader = new MultiReader(fs, logfile); try { playbackMutations(reader, tids[i], lastStartToFinish, mr); @@ -188,6 +189,7 @@ public class SortedLogRecovery { lastStartToFinish.update(fileno, key.seq); // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table. + log.error("filename in compaction start " + key.filename); if (tabletFiles.contains(key.filename)) lastStartToFinish.update(-1); } else if (key.event == COMPACTION_FINISH) { Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Mon Jun 17 13:26:43 2013 @@ -40,6 +40,7 @@ import org.apache.accumulo.server.tablet import org.apache.accumulo.server.tabletserver.TabletMutations; import org.apache.accumulo.server.tabletserver.TabletServer; import org.apache.accumulo.server.tabletserver.log.DfsLogger.LoggerOperation; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; /** @@ -414,7 +415,7 @@ public class TabletServerLogger { return seq; } - public void recover(FileSystem fs, Tablet tablet, List logs, Set tabletFiles, MutationReceiver mr) throws IOException { + public void recover(FileSystem fs, Tablet tablet, List logs, Set tabletFiles, MutationReceiver mr) throws IOException { if (!enabled(tablet)) return; try { Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java Mon Jun 17 13:26:43 2013 @@ -503,6 +503,7 @@ public class Initialize { SecurityUtil.serverLogin(); Configuration conf = CachedConfiguration.getInstance(); + @SuppressWarnings("deprecation") FileSystem fs = FileSystemImpl.get(SiteConfiguration.getSiteConfiguration()); if (opts.resetSecurity) { Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java Mon Jun 17 13:26:43 2013 @@ -61,7 +61,7 @@ public class LocalityCheck { files.clear(); } else if (key.compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) { - files.add(fs.getFullPath(key)); + files.add(fs.getFullPath(key).toString()); } } System.out.println(" Server %local total blocks"); Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Mon Jun 17 13:26:43 2013 @@ -161,7 +161,7 @@ public class MetadataTable extends org.a boolean foundEntry = false; for (String entry : unusedWalLogs) { String[] parts = entry.split("/"); - String zpath = root + "/" + parts[1]; + String zpath = root + "/" + parts[parts.length - 1]; while (true) { try { if (zk.exists(zpath)) { @@ -427,12 +427,12 @@ public class MetadataTable extends org.a } public static void replaceDatafiles(KeyExtent extent, Set datafilesToDelete, Set scanFiles, FileRef path, Long compactionId, - DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock) { + DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock) throws IOException { replaceDatafiles(extent, datafilesToDelete, scanFiles, path, compactionId, size, credentials, address, lastLocation, zooLock, true); } public static void replaceDatafiles(KeyExtent extent, Set datafilesToDelete, Set scanFiles, FileRef path, Long compactionId, - DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock, boolean insertDeleteFlags) { + DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock, boolean insertDeleteFlags) throws IOException { if (insertDeleteFlags) { // add delete flags for those paths before the data file reference is removed @@ -464,30 +464,32 @@ public class MetadataTable extends org.a update(credentials, zooLock, m); } - public static void addDeleteEntries(KeyExtent extent, Set datafilesToDelete, TCredentials credentials) { + public static void addDeleteEntries(KeyExtent extent, Set datafilesToDelete, TCredentials credentials) throws IOException { String tableId = extent.getTableId().toString(); // TODO could use batch writer,would need to handle failure and retry like update does - ACCUMULO-1294 - for (FileRef pathToRemove : datafilesToDelete) - update(credentials, createDeleteMutation(tableId, pathToRemove.meta().toString())); + for (FileRef pathToRemove : datafilesToDelete) { + update(credentials, createDeleteMutation(tableId, pathToRemove.path().toString())); + } } - public static void addDeleteEntry(String tableId, String path) { + public static void addDeleteEntry(String tableId, String path) throws IOException { update(SecurityConstants.getSystemCredentials(), createDeleteMutation(tableId, path)); } - public static Mutation createDeleteMutation(String tableId, String pathToRemove) { - Mutation delFlag; + public static Mutation createDeleteMutation(String tableId, String pathToRemove) throws IOException { String prefix = Constants.METADATA_DELETE_FLAG_PREFIX; if (tableId.equals(Constants.METADATA_TABLE_ID)) prefix = Constants.METADATA_DELETE_FLAG_FOR_METADATA_PREFIX; if (pathToRemove.startsWith("../")) - delFlag = new Mutation(new Text(prefix + pathToRemove.substring(2))); + pathToRemove = pathToRemove.substring(2); else - delFlag = new Mutation(new Text(prefix + "/" + tableId + pathToRemove)); + pathToRemove = "/" + tableId + "/" + pathToRemove; + Path path = FileSystemImpl.get().getFullPath(ServerConstants.getTablesDirs(), pathToRemove); + Mutation delFlag = new Mutation(new Text(prefix + path.toString())); delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {})); return delFlag; } @@ -634,7 +636,7 @@ public class MetadataTable extends org.a return fixSplit(table, metadataEntry, metadataPrevEndRow, oper, splitRatio, tserver, credentials, time.toString(), initFlushID, initCompactID, lock); } - public static void deleteTable(String tableId, boolean insertDeletes, TCredentials credentials, ZooLock lock) throws AccumuloException { + public static void deleteTable(String tableId, boolean insertDeletes, TCredentials credentials, ZooLock lock) throws AccumuloException, IOException { Scanner ms = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS); Text tableIdText = new Text(tableId); BatchWriter bw = new BatchWriterImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, new BatchWriterConfig().setMaxMemory(1000000) @@ -654,10 +656,8 @@ public class MetadataTable extends org.a Key key = cell.getKey(); if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) { - String relPath = key.getColumnQualifier().toString(); - // only insert deletes for files owned by this table - if (!relPath.startsWith("../")) - bw.addMutation(createDeleteMutation(tableId, relPath)); + FileRef ref = new FileRef(FileSystemImpl.get(), key); + bw.addMutation(createDeleteMutation(tableId, ref.meta().toString())); } if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) { @@ -716,7 +716,7 @@ public class MetadataTable extends org.a extent.write(out); out.writeLong(timestamp); out.writeUTF(server); - out.writeUTF(filename); + out.writeUTF(filename.toString()); out.write(tabletId); out.write(logSet.size()); for (String s : logSet) { @@ -753,8 +753,11 @@ public class MetadataTable extends org.a while (true) { try { IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - if (zoo.isLockHeld(zooLock.getLockID())) - zoo.putPersistentData(root + "/" + entry.filename, entry.toBytes(), NodeExistsPolicy.OVERWRITE); + if (zoo.isLockHeld(zooLock.getLockID())) { + String[] parts = entry.filename.split("/"); + String uniqueId = parts[parts.length - 1]; + zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE); + } break; } catch (KeeperException e) { log.error(e, e); @@ -776,7 +779,7 @@ public class MetadataTable extends org.a public static LogEntry entryFromKeyValue(Key key, Value value) { MetadataTable.LogEntry e = new MetadataTable.LogEntry(); e.extent = new KeyExtent(key.getRow(), EMPTY_TEXT); - String[] parts = key.getColumnQualifier().toString().split("/"); + String[] parts = key.getColumnQualifier().toString().split("/", 2); e.server = parts[0]; e.filename = parts[1]; parts = value.toString().split("\\|"); @@ -905,6 +908,8 @@ public class MetadataTable extends org.a try { Scanner scanner = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), CredentialHelper.extractToken(creds)) .createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); + log.info("Setting range to " + Constants.NON_ROOT_METADATA_KEYSPACE); + scanner.setRange(Constants.NON_ROOT_METADATA_KEYSPACE); scanner.fetchColumnFamily(Constants.METADATA_LOG_COLUMN_FAMILY); metadataEntries = scanner.iterator(); } catch (Exception ex) { @@ -923,6 +928,7 @@ public class MetadataTable extends org.a return rootTabletEntries.next(); } Entry entry = metadataEntries.next(); + log.info("entry " + entry + " in range " + Constants.NON_ROOT_METADATA_KEYSPACE.contains(entry.getKey())); return entryFromKeyValue(entry.getKey(), entry.getValue()); } Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java?rev=1493756&r1=1493755&r2=1493756&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java (original) +++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java Mon Jun 17 13:26:43 2013 @@ -151,7 +151,7 @@ public class OfflineMetadataScanner exte while (ssi.hasTop()) { if (ssi.getTopKey().compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) { - allFiles.add(fs.getFullPath(ssi.getTopKey())); + allFiles.add(fs.getFullPath(ssi.getTopKey()).toString()); } else { walogs++; }